streams.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package proxy
  2. import (
  3. "sync"
  4. "github.com/containerd/containerd/log"
  5. "github.com/golang/protobuf/ptypes"
  6. "github.com/google/uuid"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/metadata"
  9. streamsv1 "github.com/docker/api/protos/streams/v1"
  10. )
  11. // Stream is a bidirectional stream for container IO
  12. type Stream struct {
  13. streamsv1.Streaming_NewStreamServer
  14. errm sync.Mutex
  15. errChan chan<- error
  16. }
  17. // CloseWithError sends the result of an action to the errChan or nil
  18. // if no erros
  19. func (s *Stream) CloseWithError(err error) error {
  20. s.errm.Lock()
  21. defer s.errm.Unlock()
  22. if s.errChan != nil {
  23. if err != nil {
  24. s.errChan <- err
  25. }
  26. close(s.errChan)
  27. s.errChan = nil
  28. }
  29. return nil
  30. }
  31. func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
  32. var (
  33. ctx = stream.Context()
  34. id = uuid.New().String()
  35. )
  36. md := metadata.New(map[string]string{
  37. "id": id,
  38. })
  39. // return the id of the stream to the client
  40. if err := stream.SendHeader(md); err != nil {
  41. return err
  42. }
  43. errc := make(chan error)
  44. p.mu.Lock()
  45. p.streams[id] = &Stream{
  46. Streaming_NewStreamServer: stream,
  47. errChan: errc,
  48. }
  49. p.mu.Unlock()
  50. defer func() {
  51. p.mu.Lock()
  52. delete(p.streams, id)
  53. p.mu.Unlock()
  54. }()
  55. select {
  56. case err := <-errc:
  57. return err
  58. case <-ctx.Done():
  59. log.G(ctx).Debug("client context canceled")
  60. return ctx.Err()
  61. }
  62. }
  63. // io.Reader that forwards everything to the stream
  64. type reader struct {
  65. stream *Stream
  66. }
  67. func (r reader) Read(p []byte) (int, error) {
  68. a, err := r.stream.Recv()
  69. if err != nil {
  70. return 0, err
  71. }
  72. var m streamsv1.BytesMessage
  73. err = ptypes.UnmarshalAny(a, &m)
  74. if err != nil {
  75. return 0, err
  76. }
  77. return copy(p, m.Value), nil
  78. }
  79. // io.Writer that writes
  80. type writer struct {
  81. stream grpc.ServerStream
  82. }
  83. func (w *writer) Write(p []byte) (n int, err error) {
  84. if len(p) == 0 {
  85. return 0, nil
  86. }
  87. message := streamsv1.BytesMessage{
  88. Type: streamsv1.IOStream_STDOUT,
  89. Value: p,
  90. }
  91. m, err := ptypes.MarshalAny(&message)
  92. if err != nil {
  93. return 0, err
  94. }
  95. return len(message.Value), w.stream.SendMsg(m)
  96. }