|
|
@@ -96,7 +96,6 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
|
|
|
for _, pod := range pods.Items {
|
|
|
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
|
|
|
service := pod.Labels[compose.ServiceTag]
|
|
|
-
|
|
|
w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
|
|
|
consumer.Log(event.Name, event.Service, event.Source, event.Line)
|
|
|
})
|
|
|
@@ -115,56 +114,28 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
|
|
|
return eg.Wait()
|
|
|
}
|
|
|
|
|
|
-// WaitForRunningPodState blocks until pods are in running state
|
|
|
+// WaitForPodState blocks until pods reach desired state
|
|
|
func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
|
|
|
- var timeout time.Duration = time.Duration(60) * time.Second
|
|
|
+ var timeout time.Duration = time.Minute
|
|
|
if opts.Timeout != nil {
|
|
|
timeout = *opts.Timeout
|
|
|
}
|
|
|
|
|
|
- selector := fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName)
|
|
|
- waitingForPhase := corev1.PodRunning
|
|
|
-
|
|
|
- switch opts.Status {
|
|
|
- case compose.STARTING:
|
|
|
- waitingForPhase = corev1.PodPending
|
|
|
- case compose.UNKNOWN:
|
|
|
- waitingForPhase = corev1.PodUnknown
|
|
|
- }
|
|
|
-
|
|
|
errch := make(chan error, 1)
|
|
|
done := make(chan bool)
|
|
|
- status := opts.Status
|
|
|
go func() {
|
|
|
for {
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
|
|
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
|
|
|
- LabelSelector: selector,
|
|
|
+ LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName),
|
|
|
})
|
|
|
if err != nil {
|
|
|
errch <- err
|
|
|
}
|
|
|
-
|
|
|
- servicePods := map[string]string{}
|
|
|
- stateReached := true
|
|
|
- for _, pod := range pods.Items {
|
|
|
- service := pod.Labels[compose.ServiceTag]
|
|
|
- if opts.Services == nil || utils.StringContains(opts.Services, service) {
|
|
|
- servicePods[service] = pod.Status.Message
|
|
|
- }
|
|
|
- if status == compose.REMOVING {
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- if pod.Status.Phase != waitingForPhase {
|
|
|
- stateReached = false
|
|
|
- }
|
|
|
- }
|
|
|
- if status == compose.REMOVING {
|
|
|
- if len(servicePods) > 0 {
|
|
|
- stateReached = false
|
|
|
- }
|
|
|
+ stateReached, servicePods, err := checkPodsState(opts.Services, pods.Items, opts.Status)
|
|
|
+ if err != nil {
|
|
|
+ errch <- err
|
|
|
}
|
|
|
if opts.Log != nil {
|
|
|
for p, m := range servicePods {
|