aci.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package aci
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/docker/api/errdefs"
  22. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  23. "github.com/Azure/go-autorest/autorest"
  24. "github.com/Azure/go-autorest/autorest/to"
  25. tm "github.com/buger/goterm"
  26. "github.com/gobwas/ws"
  27. "github.com/gobwas/ws/wsutil"
  28. "github.com/morikuni/aec"
  29. "github.com/pkg/errors"
  30. "github.com/docker/api/aci/convert"
  31. "github.com/docker/api/aci/login"
  32. "github.com/docker/api/containers"
  33. "github.com/docker/api/context/store"
  34. "github.com/docker/api/progress"
  35. )
  36. const aciDockerUserAgent = "docker-cli"
  37. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  38. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  39. if err != nil {
  40. return errors.Wrapf(err, "cannot get container group client")
  41. }
  42. // Check if the container group already exists
  43. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  44. if err != nil {
  45. if err, ok := err.(autorest.DetailedError); ok {
  46. if err.StatusCode != http.StatusNotFound {
  47. return err
  48. }
  49. } else {
  50. return err
  51. }
  52. } else {
  53. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  54. }
  55. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  56. }
  57. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  58. w := progress.ContextWriter(ctx)
  59. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  60. if err != nil {
  61. return errors.Wrapf(err, "cannot get container group client")
  62. }
  63. groupDisplay := "Group " + *groupDefinition.Name
  64. w.Event(progress.Event{
  65. ID: groupDisplay,
  66. Status: progress.Working,
  67. StatusText: "Waiting",
  68. })
  69. future, err := containerGroupsClient.CreateOrUpdate(
  70. ctx,
  71. aciContext.ResourceGroup,
  72. *groupDefinition.Name,
  73. groupDefinition,
  74. )
  75. if err != nil {
  76. return err
  77. }
  78. w.Event(progress.Event{
  79. ID: groupDisplay,
  80. Status: progress.Done,
  81. StatusText: "Created",
  82. })
  83. for _, c := range *groupDefinition.Containers {
  84. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  85. w.Event(progress.Event{
  86. ID: *c.Name,
  87. Status: progress.Working,
  88. StatusText: "Waiting",
  89. })
  90. }
  91. }
  92. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  93. if err != nil {
  94. return err
  95. }
  96. for _, c := range *groupDefinition.Containers {
  97. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  98. w.Event(progress.Event{
  99. ID: *c.Name,
  100. Status: progress.Done,
  101. StatusText: "Done",
  102. })
  103. }
  104. }
  105. return err
  106. }
  107. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  108. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  109. if err != nil {
  110. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  111. }
  112. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  113. }
  114. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  115. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  116. if err != nil {
  117. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  118. }
  119. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  120. }
  121. func stopACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) error {
  122. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  123. if err != nil {
  124. return fmt.Errorf("cannot get container group client: %v", err)
  125. }
  126. result, err := containerGroupsClient.Stop(ctx, aciContext.ResourceGroup, containerGroupName)
  127. if result.StatusCode == http.StatusNotFound {
  128. return errdefs.ErrNotFound
  129. }
  130. return err
  131. }
  132. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  133. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  134. if err != nil {
  135. return c, errors.Wrapf(err, "cannot get container client")
  136. }
  137. rows, cols := getTermSize()
  138. containerExecRequest := containerinstance.ContainerExecRequest{
  139. Command: to.StringPtr(command),
  140. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  141. Rows: rows,
  142. Cols: cols,
  143. },
  144. }
  145. return containerClient.ExecuteCommand(
  146. ctx,
  147. aciContext.ResourceGroup,
  148. containerGroup,
  149. containerName,
  150. containerExecRequest)
  151. }
  152. func getTermSize() (*int32, *int32) {
  153. rows := tm.Height()
  154. cols := tm.Width()
  155. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  156. }
  157. func exec(ctx context.Context, address string, password string, request containers.ExecRequest) error {
  158. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  159. if err != nil {
  160. return err
  161. }
  162. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  163. if err != nil {
  164. return err
  165. }
  166. downstreamChannel := make(chan error, 10)
  167. upstreamChannel := make(chan error, 10)
  168. go func() {
  169. for {
  170. msg, _, err := wsutil.ReadServerData(conn)
  171. if err != nil {
  172. if err == io.EOF {
  173. downstreamChannel <- nil
  174. return
  175. }
  176. downstreamChannel <- err
  177. return
  178. }
  179. fmt.Fprint(request.Stdout, string(msg))
  180. }
  181. }()
  182. if request.Interactive {
  183. go func() {
  184. for {
  185. // We send each byte, byte-per-byte over the
  186. // websocket because the console is in raw mode
  187. buffer := make([]byte, 1)
  188. n, err := request.Stdin.Read(buffer)
  189. if err != nil {
  190. if err == io.EOF {
  191. upstreamChannel <- nil
  192. return
  193. }
  194. upstreamChannel <- err
  195. return
  196. }
  197. if n > 0 {
  198. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  199. if err != nil {
  200. upstreamChannel <- err
  201. return
  202. }
  203. }
  204. }
  205. }()
  206. }
  207. for {
  208. select {
  209. case err := <-downstreamChannel:
  210. return errors.Wrap(err, "failed to read input from container")
  211. case err := <-upstreamChannel:
  212. return errors.Wrap(err, "failed to send input to container")
  213. }
  214. }
  215. }
  216. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  217. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  218. if err != nil {
  219. return "", errors.Wrapf(err, "cannot get container client")
  220. }
  221. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  222. if err != nil {
  223. return "", fmt.Errorf("cannot get container logs: %v", err)
  224. }
  225. return *logs.Content, err
  226. }
  227. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, req containers.LogsRequest) error {
  228. numLines := 0
  229. for {
  230. select {
  231. case <-ctx.Done():
  232. return nil
  233. default:
  234. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  235. if err != nil {
  236. return err
  237. }
  238. logLines := strings.Split(logs, "\n")
  239. currentOutput := len(logLines)
  240. // Note: a backend should not do this normally, this breaks the log
  241. // streaming over gRPC but this is the only thing we can do with
  242. // the kind of logs ACI is giving us. Hopefully Azue will give us
  243. // a real logs streaming api soon.
  244. b := aec.EmptyBuilder
  245. b = b.Up(uint(numLines))
  246. fmt.Fprint(req.Writer, b.Column(0).ANSI)
  247. numLines = getBacktrackLines(logLines, req.Width)
  248. for i := 0; i < currentOutput-1; i++ {
  249. fmt.Fprintln(req.Writer, logLines[i])
  250. }
  251. select {
  252. case <-ctx.Done():
  253. return nil
  254. case <-time.After(2 * time.Second):
  255. }
  256. }
  257. }
  258. }
  259. func getBacktrackLines(lines []string, terminalWidth int) int {
  260. if terminalWidth == 0 { // no terminal width has been set, do not divide by zero
  261. return len(lines)
  262. }
  263. numLines := 0
  264. for i := 0; i < len(lines)-1; i++ {
  265. numLines++
  266. if len(lines[i]) > terminalWidth {
  267. numLines += len(lines[i]) / terminalWidth
  268. }
  269. }
  270. return numLines
  271. }
  272. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  273. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  274. err := setupClient(&containerGroupsClient.Client)
  275. if err != nil {
  276. return containerinstance.ContainerGroupsClient{}, err
  277. }
  278. containerGroupsClient.PollingDelay = 5 * time.Second
  279. containerGroupsClient.RetryAttempts = 30
  280. containerGroupsClient.RetryDuration = 1 * time.Second
  281. return containerGroupsClient, nil
  282. }
  283. func setupClient(aciClient *autorest.Client) error {
  284. aciClient.UserAgent = aciDockerUserAgent
  285. auth, err := login.NewAuthorizerFromLogin()
  286. if err != nil {
  287. return err
  288. }
  289. aciClient.Authorizer = auth
  290. return nil
  291. }
  292. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  293. containerClient := containerinstance.NewContainerClient(subscriptionID)
  294. err := setupClient(&containerClient.Client)
  295. if err != nil {
  296. return containerinstance.ContainerClient{}, err
  297. }
  298. return containerClient, nil
  299. }