aci.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package azure
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "time"
  20. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  21. "github.com/Azure/go-autorest/autorest"
  22. "github.com/Azure/go-autorest/autorest/to"
  23. tm "github.com/buger/goterm"
  24. "github.com/gobwas/ws"
  25. "github.com/gobwas/ws/wsutil"
  26. "github.com/pkg/errors"
  27. "github.com/docker/api/azure/login"
  28. "github.com/docker/api/context/store"
  29. "github.com/docker/api/progress"
  30. )
  31. const aciDockerUserAgent = "docker-cli"
  32. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  33. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  34. if err != nil {
  35. return errors.Wrapf(err, "cannot get container group client")
  36. }
  37. // Check if the container group already exists
  38. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  39. if err != nil {
  40. if err, ok := err.(autorest.DetailedError); ok {
  41. if err.StatusCode != http.StatusNotFound {
  42. return err
  43. }
  44. } else {
  45. return err
  46. }
  47. } else {
  48. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  49. }
  50. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  51. }
  52. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  53. w := progress.ContextWriter(ctx)
  54. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  55. if err != nil {
  56. return errors.Wrapf(err, "cannot get container group client")
  57. }
  58. w.Event(progress.Event{
  59. ID: *groupDefinition.Name,
  60. Status: progress.Working,
  61. StatusText: "Waiting",
  62. })
  63. future, err := containerGroupsClient.CreateOrUpdate(
  64. ctx,
  65. aciContext.ResourceGroup,
  66. *groupDefinition.Name,
  67. groupDefinition,
  68. )
  69. if err != nil {
  70. return err
  71. }
  72. w.Event(progress.Event{
  73. ID: *groupDefinition.Name,
  74. Status: progress.Done,
  75. StatusText: "Created",
  76. })
  77. for _, c := range *groupDefinition.Containers {
  78. w.Event(progress.Event{
  79. ID: *c.Name,
  80. Status: progress.Working,
  81. StatusText: "Waiting",
  82. })
  83. }
  84. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  85. if err != nil {
  86. return err
  87. }
  88. for _, c := range *groupDefinition.Containers {
  89. w.Event(progress.Event{
  90. ID: *c.Name,
  91. Status: progress.Done,
  92. StatusText: "Done",
  93. })
  94. }
  95. return err
  96. }
  97. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  98. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  99. if err != nil {
  100. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  101. }
  102. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  103. }
  104. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  105. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  106. if err != nil {
  107. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  108. }
  109. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  110. }
  111. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  112. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  113. if err != nil {
  114. return c, errors.Wrapf(err, "cannot get container client")
  115. }
  116. rows, cols := getTermSize()
  117. containerExecRequest := containerinstance.ContainerExecRequest{
  118. Command: to.StringPtr(command),
  119. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  120. Rows: rows,
  121. Cols: cols,
  122. },
  123. }
  124. return containerClient.ExecuteCommand(
  125. ctx,
  126. aciContext.ResourceGroup,
  127. containerGroup,
  128. containerName,
  129. containerExecRequest)
  130. }
  131. func getTermSize() (*int32, *int32) {
  132. rows := tm.Height()
  133. cols := tm.Width()
  134. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  135. }
  136. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  137. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  138. if err != nil {
  139. return err
  140. }
  141. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  142. if err != nil {
  143. return err
  144. }
  145. downstreamChannel := make(chan error, 10)
  146. upstreamChannel := make(chan error, 10)
  147. go func() {
  148. for {
  149. msg, _, err := wsutil.ReadServerData(conn)
  150. if err != nil {
  151. if err == io.EOF {
  152. downstreamChannel <- nil
  153. return
  154. }
  155. downstreamChannel <- err
  156. return
  157. }
  158. fmt.Fprint(writer, string(msg))
  159. }
  160. }()
  161. go func() {
  162. for {
  163. // We send each byte, byte-per-byte over the
  164. // websocket because the console is in raw mode
  165. buffer := make([]byte, 1)
  166. n, err := reader.Read(buffer)
  167. if err != nil {
  168. if err == io.EOF {
  169. upstreamChannel <- nil
  170. return
  171. }
  172. upstreamChannel <- err
  173. return
  174. }
  175. if n > 0 {
  176. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  177. if err != nil {
  178. upstreamChannel <- err
  179. return
  180. }
  181. }
  182. }
  183. }()
  184. for {
  185. select {
  186. case err := <-downstreamChannel:
  187. return errors.Wrap(err, "failed to read input from container")
  188. case err := <-upstreamChannel:
  189. return errors.Wrap(err, "failed to send input to container")
  190. }
  191. }
  192. }
  193. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  194. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  195. if err != nil {
  196. return "", errors.Wrapf(err, "cannot get container client")
  197. }
  198. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  199. if err != nil {
  200. return "", fmt.Errorf("cannot get container logs: %v", err)
  201. }
  202. return *logs.Content, err
  203. }
  204. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  205. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  206. err := setupClient(&containerGroupsClient.Client)
  207. if err != nil {
  208. return containerinstance.ContainerGroupsClient{}, err
  209. }
  210. containerGroupsClient.PollingDelay = 5 * time.Second
  211. containerGroupsClient.RetryAttempts = 30
  212. containerGroupsClient.RetryDuration = 1 * time.Second
  213. return containerGroupsClient, nil
  214. }
  215. func setupClient(aciClient *autorest.Client) error {
  216. aciClient.UserAgent = aciDockerUserAgent
  217. auth, err := login.NewAuthorizerFromLogin()
  218. if err != nil {
  219. return err
  220. }
  221. aciClient.Authorizer = auth
  222. return nil
  223. }
  224. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  225. containerClient := containerinstance.NewContainerClient(subscriptionID)
  226. err := setupClient(&containerClient.Client)
  227. if err != nil {
  228. return containerinstance.ContainerClient{}, err
  229. }
  230. return containerClient, nil
  231. }