streams.go 910 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package proxy
  2. import (
  3. "github.com/containerd/containerd/log"
  4. "github.com/google/uuid"
  5. "google.golang.org/grpc/metadata"
  6. streamsv1 "github.com/docker/api/protos/streams/v1"
  7. "github.com/docker/api/server/proxy/streams"
  8. )
  9. func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
  10. var (
  11. ctx = stream.Context()
  12. id = uuid.New().String()
  13. )
  14. md := metadata.New(map[string]string{
  15. "id": id,
  16. })
  17. // return the id of the stream to the client
  18. if err := stream.SendHeader(md); err != nil {
  19. return err
  20. }
  21. errc := make(chan error)
  22. p.mu.Lock()
  23. p.streams[id] = &streams.Stream{
  24. Streaming_NewStreamServer: stream,
  25. ErrChan: errc,
  26. }
  27. p.mu.Unlock()
  28. defer func() {
  29. p.mu.Lock()
  30. delete(p.streams, id)
  31. p.mu.Unlock()
  32. }()
  33. select {
  34. case err := <-errc:
  35. return err
  36. case <-ctx.Done():
  37. log.G(ctx).Debug("client context canceled")
  38. return ctx.Err()
  39. }
  40. }