aci.go 6.9 KB


  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "time"
  8. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  9. "github.com/Azure/go-autorest/autorest"
  10. "github.com/Azure/go-autorest/autorest/to"
  11. tm "github.com/buger/goterm"
  12. "github.com/gobwas/ws"
  13. "github.com/gobwas/ws/wsutil"
  14. "github.com/pkg/errors"
  15. "github.com/docker/api/azure/login"
  16. "github.com/docker/api/context/store"
  17. "github.com/docker/api/progress"
  18. )
  19. const aciDockerUserAgent = "docker-cli"
  20. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  21. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  22. if err != nil {
  23. return errors.Wrapf(err, "cannot get container group client")
  24. }
  25. // Check if the container group already exists
  26. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  27. if err != nil {
  28. if err, ok := err.(autorest.DetailedError); ok {
  29. if err.StatusCode != http.StatusNotFound {
  30. return err
  31. }
  32. } else {
  33. return err
  34. }
  35. } else {
  36. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  37. }
  38. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  39. }
  40. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  41. w := progress.ContextWriter(ctx)
  42. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  43. if err != nil {
  44. return errors.Wrapf(err, "cannot get container group client")
  45. }
  46. w.Event(progress.Event{
  47. ID: *groupDefinition.Name,
  48. Status: progress.Working,
  49. StatusText: "Waiting",
  50. })
  51. future, err := containerGroupsClient.CreateOrUpdate(
  52. ctx,
  53. aciContext.ResourceGroup,
  54. *groupDefinition.Name,
  55. groupDefinition,
  56. )
  57. if err != nil {
  58. return err
  59. }
  60. w.Event(progress.Event{
  61. ID: *groupDefinition.Name,
  62. Status: progress.Done,
  63. StatusText: "Created",
  64. })
  65. for _, c := range *groupDefinition.Containers {
  66. w.Event(progress.Event{
  67. ID: *c.Name,
  68. Status: progress.Working,
  69. StatusText: "Waiting",
  70. })
  71. }
  72. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  73. if err != nil {
  74. return err
  75. }
  76. for _, c := range *groupDefinition.Containers {
  77. w.Event(progress.Event{
  78. ID: *c.Name,
  79. Status: progress.Done,
  80. StatusText: "Done",
  81. })
  82. }
  83. return err
  84. }
  85. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  86. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  87. if err != nil {
  88. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  89. }
  90. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  91. }
  92. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  93. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  94. if err != nil {
  95. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  96. }
  97. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  98. }
  99. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  100. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  101. if err != nil {
  102. return c, errors.Wrapf(err, "cannot get container client")
  103. }
  104. rows, cols := getTermSize()
  105. containerExecRequest := containerinstance.ContainerExecRequest{
  106. Command: to.StringPtr(command),
  107. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  108. Rows: rows,
  109. Cols: cols,
  110. },
  111. }
  112. return containerClient.ExecuteCommand(
  113. ctx,
  114. aciContext.ResourceGroup,
  115. containerGroup,
  116. containerName,
  117. containerExecRequest)
  118. }
  119. func getTermSize() (*int32, *int32) {
  120. rows := tm.Height()
  121. cols := tm.Width()
  122. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  123. }
  124. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  125. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  126. if err != nil {
  127. return err
  128. }
  129. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  130. if err != nil {
  131. return err
  132. }
  133. downstreamChannel := make(chan error, 10)
  134. upstreamChannel := make(chan error, 10)
  135. go func() {
  136. for {
  137. msg, _, err := wsutil.ReadServerData(conn)
  138. if err != nil {
  139. if err == io.EOF {
  140. downstreamChannel <- nil
  141. return
  142. }
  143. downstreamChannel <- err
  144. return
  145. }
  146. fmt.Fprint(writer, string(msg))
  147. }
  148. }()
  149. go func() {
  150. for {
  151. // We send each byte, byte-per-byte over the
  152. // websocket because the console is in raw mode
  153. buffer := make([]byte, 1)
  154. n, err := reader.Read(buffer)
  155. if err != nil {
  156. if err == io.EOF {
  157. upstreamChannel <- nil
  158. return
  159. }
  160. upstreamChannel <- err
  161. return
  162. }
  163. if n > 0 {
  164. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  165. if err != nil {
  166. upstreamChannel <- err
  167. return
  168. }
  169. }
  170. }
  171. }()
  172. for {
  173. select {
  174. case err := <-downstreamChannel:
  175. return errors.Wrap(err, "failed to read input from container")
  176. case err := <-upstreamChannel:
  177. return errors.Wrap(err, "failed to send input to container")
  178. }
  179. }
  180. }
  181. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) (string, error) {
  182. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  183. if err != nil {
  184. return "", errors.Wrapf(err, "cannot get container client")
  185. }
  186. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, nil)
  187. if err != nil {
  188. return "", fmt.Errorf("cannot get container logs: %v", err)
  189. }
  190. return *logs.Content, err
  191. }
  192. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  193. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  194. err := setupClient(&containerGroupsClient.Client)
  195. if err != nil {
  196. return containerinstance.ContainerGroupsClient{}, err
  197. }
  198. containerGroupsClient.PollingDelay = 5 * time.Second
  199. containerGroupsClient.RetryAttempts = 30
  200. containerGroupsClient.RetryDuration = 1 * time.Second
  201. return containerGroupsClient, nil
  202. }
  203. func setupClient(aciClient *autorest.Client) error {
  204. aciClient.UserAgent = aciDockerUserAgent
  205. auth, err := login.NewAuthorizerFromLogin()
  206. if err != nil {
  207. return err
  208. }
  209. aciClient.Authorizer = auth
  210. return nil
  211. }
  212. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  213. containerClient := containerinstance.NewContainerClient(subscriptionID)
  214. err := setupClient(&containerClient.Client)
  215. if err != nil {
  216. return containerinstance.ContainerClient{}, err
  217. }
  218. return containerClient, nil
  219. }