aci.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package azure
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "io/ioutil"
  19. "net/http"
  20. "strings"
  21. "time"
  22. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  23. "github.com/Azure/go-autorest/autorest"
  24. "github.com/Azure/go-autorest/autorest/to"
  25. tm "github.com/buger/goterm"
  26. "github.com/gobwas/ws"
  27. "github.com/gobwas/ws/wsutil"
  28. "github.com/pkg/errors"
  29. "github.com/docker/api/azure/login"
  30. "github.com/docker/api/context/store"
  31. "github.com/docker/api/progress"
  32. )
  33. const aciDockerUserAgent = "docker-cli"
  34. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  35. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  36. if err != nil {
  37. return errors.Wrapf(err, "cannot get container group client")
  38. }
  39. // Check if the container group already exists
  40. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  41. if err != nil {
  42. if err, ok := err.(autorest.DetailedError); ok {
  43. if err.StatusCode != http.StatusNotFound {
  44. return err
  45. }
  46. } else {
  47. return err
  48. }
  49. } else {
  50. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  51. }
  52. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  53. }
  54. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  55. w := progress.ContextWriter(ctx)
  56. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  57. if err != nil {
  58. return errors.Wrapf(err, "cannot get container group client")
  59. }
  60. w.Event(progress.Event{
  61. ID: *groupDefinition.Name,
  62. Status: progress.Working,
  63. StatusText: "Waiting",
  64. })
  65. future, err := containerGroupsClient.CreateOrUpdate(
  66. ctx,
  67. aciContext.ResourceGroup,
  68. *groupDefinition.Name,
  69. groupDefinition,
  70. )
  71. if err != nil {
  72. return err
  73. }
  74. w.Event(progress.Event{
  75. ID: *groupDefinition.Name,
  76. Status: progress.Done,
  77. StatusText: "Created",
  78. })
  79. for _, c := range *groupDefinition.Containers {
  80. w.Event(progress.Event{
  81. ID: *c.Name,
  82. Status: progress.Working,
  83. StatusText: "Waiting",
  84. })
  85. }
  86. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  87. if err != nil {
  88. return err
  89. }
  90. containerGroup, err := future.Result(containerGroupsClient)
  91. if err != nil {
  92. return err
  93. }
  94. for _, c := range *groupDefinition.Containers {
  95. w.Event(progress.Event{
  96. ID: *c.Name,
  97. Status: progress.Done,
  98. StatusText: "Done",
  99. })
  100. }
  101. if len(*containerGroup.Containers) > 1 {
  102. var commands []string
  103. for _, container := range *containerGroup.Containers {
  104. commands = append(commands, fmt.Sprintf("echo 127.0.0.1 %s >> /etc/hosts", *container.Name))
  105. }
  106. commands = append(commands, "exit")
  107. containers := *containerGroup.Containers
  108. container := containers[0]
  109. response, err := execACIContainer(ctx, aciContext, "/bin/sh", *containerGroup.Name, *container.Name)
  110. if err != nil {
  111. return err
  112. }
  113. if err = execCommands(
  114. ctx,
  115. *response.WebSocketURI,
  116. *response.Password,
  117. commands,
  118. ); err != nil {
  119. return err
  120. }
  121. }
  122. return err
  123. }
  124. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  125. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  126. if err != nil {
  127. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  128. }
  129. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  130. }
  131. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  132. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  133. if err != nil {
  134. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  135. }
  136. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  137. }
  138. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  139. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  140. if err != nil {
  141. return c, errors.Wrapf(err, "cannot get container client")
  142. }
  143. rows, cols := getTermSize()
  144. containerExecRequest := containerinstance.ContainerExecRequest{
  145. Command: to.StringPtr(command),
  146. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  147. Rows: rows,
  148. Cols: cols,
  149. },
  150. }
  151. return containerClient.ExecuteCommand(
  152. ctx,
  153. aciContext.ResourceGroup,
  154. containerGroup,
  155. containerName,
  156. containerExecRequest)
  157. }
  158. func getTermSize() (*int32, *int32) {
  159. rows := tm.Height()
  160. cols := tm.Width()
  161. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  162. }
  163. type commandSender struct {
  164. commands string
  165. }
  166. func (cs *commandSender) Read(p []byte) (int, error) {
  167. if len(cs.commands) == 0 {
  168. return 0, io.EOF
  169. }
  170. var command string
  171. if len(p) >= len(cs.commands) {
  172. command = cs.commands
  173. cs.commands = ""
  174. } else {
  175. command = cs.commands[:len(p)]
  176. cs.commands = cs.commands[len(p):]
  177. }
  178. copy(p, command)
  179. return len(command), nil
  180. }
  181. func execCommands(ctx context.Context, address string, password string, commands []string) error {
  182. writer := ioutil.Discard
  183. reader := &commandSender{
  184. commands: strings.Join(commands, "\n"),
  185. }
  186. return exec(ctx, address, password, reader, writer)
  187. }
  188. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  189. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  190. if err != nil {
  191. return err
  192. }
  193. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  194. if err != nil {
  195. return err
  196. }
  197. downstreamChannel := make(chan error, 10)
  198. upstreamChannel := make(chan error, 10)
  199. go func() {
  200. for {
  201. msg, _, err := wsutil.ReadServerData(conn)
  202. if err != nil {
  203. if err == io.EOF {
  204. downstreamChannel <- nil
  205. return
  206. }
  207. downstreamChannel <- err
  208. return
  209. }
  210. fmt.Fprint(writer, string(msg))
  211. }
  212. }()
  213. go func() {
  214. for {
  215. // We send each byte, byte-per-byte over the
  216. // websocket because the console is in raw mode
  217. buffer := make([]byte, 1)
  218. n, err := reader.Read(buffer)
  219. if err != nil {
  220. if err == io.EOF {
  221. upstreamChannel <- nil
  222. return
  223. }
  224. upstreamChannel <- err
  225. return
  226. }
  227. if n > 0 {
  228. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  229. if err != nil {
  230. upstreamChannel <- err
  231. return
  232. }
  233. }
  234. }
  235. }()
  236. for {
  237. select {
  238. case err := <-downstreamChannel:
  239. return errors.Wrap(err, "failed to read input from container")
  240. case err := <-upstreamChannel:
  241. return errors.Wrap(err, "failed to send input to container")
  242. }
  243. }
  244. }
  245. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) (string, error) {
  246. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  247. if err != nil {
  248. return "", errors.Wrapf(err, "cannot get container client")
  249. }
  250. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, nil)
  251. if err != nil {
  252. return "", fmt.Errorf("cannot get container logs: %v", err)
  253. }
  254. return *logs.Content, err
  255. }
  256. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  257. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  258. err := setupClient(&containerGroupsClient.Client)
  259. if err != nil {
  260. return containerinstance.ContainerGroupsClient{}, err
  261. }
  262. containerGroupsClient.PollingDelay = 5 * time.Second
  263. containerGroupsClient.RetryAttempts = 30
  264. containerGroupsClient.RetryDuration = 1 * time.Second
  265. return containerGroupsClient, nil
  266. }
  267. func setupClient(aciClient *autorest.Client) error {
  268. aciClient.UserAgent = aciDockerUserAgent
  269. auth, err := login.NewAuthorizerFromLogin()
  270. if err != nil {
  271. return err
  272. }
  273. aciClient.Authorizer = auth
  274. return nil
  275. }
  276. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  277. containerClient := containerinstance.NewContainerClient(subscriptionID)
  278. err := setupClient(&containerClient.Client)
  279. if err != nil {
  280. return containerinstance.ContainerClient{}, err
  281. }
  282. return containerClient, nil
  283. }