Prechádzať zdrojové kódy

Wait for pods to be running/terminated on compose up/down

Signed-off-by: aiordache <[email protected]>
aiordache 4 rokov pred
rodič
commit
48928811df
3 zmenil súbory, kde vykonal 73 pridanie a 29 odobranie
  1. 20 18
      kube/client/client.go
  2. 9 7
      kube/client/utils.go
  3. 44 4
      kube/compose.go

+ 20 - 18
kube/client/client.go

@@ -116,17 +116,16 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
 }
 
 // WaitForRunningPodState blocks until pods are in running state
-func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, services []string, status string, timeout int) error {
-	var t time.Duration = 60
-
-	if timeout > 0 {
-		t = time.Duration(timeout) * time.Second
+func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
+	var timeout time.Duration = time.Duration(60) * time.Second
+	if opts.Timeout > 0 {
+		timeout = time.Duration(opts.Timeout) * time.Second
 	}
 
-	selector := fmt.Sprintf("%s=%s", compose.ProjectTag, projectName)
+	selector := fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName)
 	waitingForPhase := corev1.PodRunning
 
-	switch status {
+	switch opts.Status {
 	case compose.STARTING:
 		waitingForPhase = corev1.PodPending
 	case compose.UNKNOWN:
@@ -135,7 +134,7 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se
 
 	errch := make(chan error, 1)
 	done := make(chan bool)
-
+	status := opts.Status
 	go func() {
 		for {
 			time.Sleep(500 * time.Millisecond)
@@ -147,28 +146,31 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se
 				errch <- err
 			}
 
-			servicePods := 0
+			servicePods := map[string]string{}
 			stateReached := true
 			for _, pod := range pods.Items {
-
+				service := pod.Labels[compose.ServiceTag]
+				if opts.Services == nil || utils.StringContains(opts.Services, service) {
+					servicePods[service] = pod.Status.Message
+				}
 				if status == compose.REMOVING {
-					if contains(services, pod.Labels[compose.ServiceTag]) {
-						servicePods = servicePods + 1
-					}
 					continue
 				}
 
 				if pod.Status.Phase != waitingForPhase {
 					stateReached = false
-
 				}
 			}
-
 			if status == compose.REMOVING {
-				if servicePods > 0 {
+				if len(servicePods) > 0 {
 					stateReached = false
 				}
 			}
+			if opts.Log != nil {
+				for p, m := range servicePods {
+					opts.Log(p, stateReached, m)
+				}
+			}
 
 			if stateReached {
 				done <- true
@@ -177,8 +179,8 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se
 	}()
 
 	select {
-	case <-time.After(t):
-		return fmt.Errorf("timeout: pods did not reach expected state.")
+	case <-time.After(timeout):
+		return fmt.Errorf("timeout: pods did not reach expected state")
 	case err := <-errch:
 		if err != nil {
 			return err

+ 9 - 7
kube/client/utils.go

@@ -33,11 +33,13 @@ func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
 	}
 }
 
-func contains(slice []string, item string) bool {
-	for _, v := range slice {
-		if v == item {
-			return true
-		}
-	}
-	return false
+type LogFunc func(pod string, stateReached bool, message string)
+
+// ServiceStatus hold status about a service
+type WaitForStatusOptions struct {
+	ProjectName string
+	Services    []string
+	Status      string
+	Timeout     int
+	Log         LogFunc
 }

+ 44 - 4
kube/compose.go

@@ -92,9 +92,21 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 
 	w.Event(progress.NewEvent(eventName, progress.Done, ""))
 
-	eventName = "Wait for pods to be running"
+	logF := func(pod string, stateReached bool, message string) {
+		state := progress.Done
+		if !stateReached {
+			state = progress.Working
+		}
+		w.Event(progress.NewEvent(pod, state, message))
+	}
 
-	return s.client.WaitForPodState(ctx, project.Name, project.ServiceNames(), compose.RUNNING, 10)
+	return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
+		ProjectName: project.Name,
+		Services:    project.ServiceNames(),
+		Status:      compose.RUNNING,
+		Timeout:     60,
+		Log:         logF,
+	})
 }
 
 // Down executes the equivalent to a `compose down`
@@ -116,9 +128,37 @@ func (s *composeService) Down(ctx context.Context, projectName string, options c
 		w.Event(progress.NewEvent(eventName, progress.Working, message))
 	}
 	err := s.sdk.Uninstall(projectName, logger)
-	w.Event(progress.NewEvent(eventName, progress.Done, ""))
+	if err != nil {
+		return err
+	}
 
-	return err
+	events := []string{}
+	logF := func(pod string, stateReached bool, message string) {
+		state := progress.Done
+		if !stateReached {
+			state = progress.Working
+		}
+		w.Event(progress.NewEvent(pod, state, message))
+		if !utils.StringContains(events, pod) {
+			events = append(events, pod)
+		}
+	}
+
+	err = s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
+		ProjectName: projectName,
+		Services:    nil,
+		Status:      compose.REMOVING,
+		Timeout:     60,
+		Log:         logF,
+	})
+	if err != nil {
+		return err
+	}
+	for _, e := range events {
+		w.Event(progress.NewEvent(e, progress.Done, ""))
+	}
+	w.Event(progress.NewEvent(eventName, progress.Done, ""))
+	return nil
 }
 
 // List executes the equivalent to a `docker stack ls`