aci.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package azure
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. tm "github.com/buger/goterm"
  25. "github.com/gobwas/ws"
  26. "github.com/gobwas/ws/wsutil"
  27. "github.com/morikuni/aec"
  28. "github.com/pkg/errors"
  29. "github.com/docker/api/azure/login"
  30. "github.com/docker/api/context/store"
  31. "github.com/docker/api/progress"
  32. )
  33. const aciDockerUserAgent = "docker-cli"
  34. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  35. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  36. if err != nil {
  37. return errors.Wrapf(err, "cannot get container group client")
  38. }
  39. // Check if the container group already exists
  40. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  41. if err != nil {
  42. if err, ok := err.(autorest.DetailedError); ok {
  43. if err.StatusCode != http.StatusNotFound {
  44. return err
  45. }
  46. } else {
  47. return err
  48. }
  49. } else {
  50. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  51. }
  52. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  53. }
  54. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  55. w := progress.ContextWriter(ctx)
  56. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  57. if err != nil {
  58. return errors.Wrapf(err, "cannot get container group client")
  59. }
  60. w.Event(progress.Event{
  61. ID: *groupDefinition.Name,
  62. Status: progress.Working,
  63. StatusText: "Waiting",
  64. })
  65. future, err := containerGroupsClient.CreateOrUpdate(
  66. ctx,
  67. aciContext.ResourceGroup,
  68. *groupDefinition.Name,
  69. groupDefinition,
  70. )
  71. if err != nil {
  72. return err
  73. }
  74. w.Event(progress.Event{
  75. ID: *groupDefinition.Name,
  76. Status: progress.Done,
  77. StatusText: "Created",
  78. })
  79. for _, c := range *groupDefinition.Containers {
  80. w.Event(progress.Event{
  81. ID: *c.Name,
  82. Status: progress.Working,
  83. StatusText: "Waiting",
  84. })
  85. }
  86. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  87. if err != nil {
  88. return err
  89. }
  90. for _, c := range *groupDefinition.Containers {
  91. w.Event(progress.Event{
  92. ID: *c.Name,
  93. Status: progress.Done,
  94. StatusText: "Done",
  95. })
  96. }
  97. return err
  98. }
  99. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  100. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  101. if err != nil {
  102. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  103. }
  104. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  105. }
  106. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  107. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  108. if err != nil {
  109. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  110. }
  111. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  112. }
  113. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  114. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  115. if err != nil {
  116. return c, errors.Wrapf(err, "cannot get container client")
  117. }
  118. rows, cols := getTermSize()
  119. containerExecRequest := containerinstance.ContainerExecRequest{
  120. Command: to.StringPtr(command),
  121. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  122. Rows: rows,
  123. Cols: cols,
  124. },
  125. }
  126. return containerClient.ExecuteCommand(
  127. ctx,
  128. aciContext.ResourceGroup,
  129. containerGroup,
  130. containerName,
  131. containerExecRequest)
  132. }
  133. func getTermSize() (*int32, *int32) {
  134. rows := tm.Height()
  135. cols := tm.Width()
  136. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  137. }
  138. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  139. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  140. if err != nil {
  141. return err
  142. }
  143. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  144. if err != nil {
  145. return err
  146. }
  147. downstreamChannel := make(chan error, 10)
  148. upstreamChannel := make(chan error, 10)
  149. go func() {
  150. for {
  151. msg, _, err := wsutil.ReadServerData(conn)
  152. if err != nil {
  153. if err == io.EOF {
  154. downstreamChannel <- nil
  155. return
  156. }
  157. downstreamChannel <- err
  158. return
  159. }
  160. fmt.Fprint(writer, string(msg))
  161. }
  162. }()
  163. go func() {
  164. for {
  165. // We send each byte, byte-per-byte over the
  166. // websocket because the console is in raw mode
  167. buffer := make([]byte, 1)
  168. n, err := reader.Read(buffer)
  169. if err != nil {
  170. if err == io.EOF {
  171. upstreamChannel <- nil
  172. return
  173. }
  174. upstreamChannel <- err
  175. return
  176. }
  177. if n > 0 {
  178. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  179. if err != nil {
  180. upstreamChannel <- err
  181. return
  182. }
  183. }
  184. }
  185. }()
  186. for {
  187. select {
  188. case err := <-downstreamChannel:
  189. return errors.Wrap(err, "failed to read input from container")
  190. case err := <-upstreamChannel:
  191. return errors.Wrap(err, "failed to send input to container")
  192. }
  193. }
  194. }
  195. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  196. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  197. if err != nil {
  198. return "", errors.Wrapf(err, "cannot get container client")
  199. }
  200. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  201. if err != nil {
  202. return "", fmt.Errorf("cannot get container logs: %v", err)
  203. }
  204. return *logs.Content, err
  205. }
  206. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, out io.Writer) error {
  207. lastOutput := 0
  208. for {
  209. select {
  210. case <-ctx.Done():
  211. return nil
  212. default:
  213. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  214. if err != nil {
  215. return err
  216. }
  217. logLines := strings.Split(logs, "\n")
  218. currentOutput := len(logLines)
  219. // Note: a backend should not do this normally, this breaks the log
  220. // streaming over gRPC but this is the only thing we can do with
  221. // the kind of logs ACI is giving us. Hopefully Azue will give us
  222. // a real logs streaming api soon.
  223. b := aec.EmptyBuilder
  224. b = b.Up(uint(lastOutput))
  225. fmt.Fprint(out, b.Column(0).ANSI)
  226. for i := 0; i < currentOutput-1; i++ {
  227. fmt.Fprintln(out, logLines[i])
  228. }
  229. lastOutput = currentOutput - 1
  230. time.Sleep(2 * time.Second)
  231. }
  232. }
  233. }
  234. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  235. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  236. err := setupClient(&containerGroupsClient.Client)
  237. if err != nil {
  238. return containerinstance.ContainerGroupsClient{}, err
  239. }
  240. containerGroupsClient.PollingDelay = 5 * time.Second
  241. containerGroupsClient.RetryAttempts = 30
  242. containerGroupsClient.RetryDuration = 1 * time.Second
  243. return containerGroupsClient, nil
  244. }
  245. func setupClient(aciClient *autorest.Client) error {
  246. aciClient.UserAgent = aciDockerUserAgent
  247. auth, err := login.NewAuthorizerFromLogin()
  248. if err != nil {
  249. return err
  250. }
  251. aciClient.Authorizer = auth
  252. return nil
  253. }
  254. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  255. containerClient := containerinstance.NewContainerClient(subscriptionID)
  256. err := setupClient(&containerClient.Client)
  257. if err != nil {
  258. return containerinstance.ContainerClient{}, err
  259. }
  260. return containerClient, nil
  261. }