aci.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package aci
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. tm "github.com/buger/goterm"
  25. "github.com/gobwas/ws"
  26. "github.com/gobwas/ws/wsutil"
  27. "github.com/morikuni/aec"
  28. "github.com/pkg/errors"
  29. "github.com/docker/compose-cli/aci/convert"
  30. "github.com/docker/compose-cli/aci/login"
  31. "github.com/docker/compose-cli/api/containers"
  32. "github.com/docker/compose-cli/context/store"
  33. "github.com/docker/compose-cli/errdefs"
  34. "github.com/docker/compose-cli/progress"
  35. )
  36. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  37. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  38. if err != nil {
  39. return errors.Wrapf(err, "cannot get container group client")
  40. }
  41. // Check if the container group already exists
  42. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  43. if err != nil {
  44. if err, ok := err.(autorest.DetailedError); ok {
  45. if err.StatusCode != http.StatusNotFound {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  53. }
  54. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  55. }
  56. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  57. w := progress.ContextWriter(ctx)
  58. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  59. if err != nil {
  60. return errors.Wrapf(err, "cannot get container group client")
  61. }
  62. groupDisplay := "Group " + *groupDefinition.Name
  63. w.Event(progress.Event{
  64. ID: groupDisplay,
  65. Status: progress.Working,
  66. StatusText: "Waiting",
  67. })
  68. future, err := containerGroupsClient.CreateOrUpdate(
  69. ctx,
  70. aciContext.ResourceGroup,
  71. *groupDefinition.Name,
  72. groupDefinition,
  73. )
  74. if err != nil {
  75. return err
  76. }
  77. w.Event(progress.Event{
  78. ID: groupDisplay,
  79. Status: progress.Done,
  80. StatusText: "Created",
  81. })
  82. for _, c := range *groupDefinition.Containers {
  83. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  84. w.Event(progress.Event{
  85. ID: *c.Name,
  86. Status: progress.Working,
  87. StatusText: "Waiting",
  88. })
  89. }
  90. }
  91. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  92. if err != nil {
  93. return err
  94. }
  95. for _, c := range *groupDefinition.Containers {
  96. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  97. w.Event(progress.Event{
  98. ID: *c.Name,
  99. Status: progress.Done,
  100. StatusText: "Done",
  101. })
  102. }
  103. }
  104. return err
  105. }
  106. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  107. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  108. if err != nil {
  109. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  110. }
  111. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  112. }
  113. func getACIContainerGroups(ctx context.Context, subscriptionID string, resourceGroup string) ([]containerinstance.ContainerGroup, error) {
  114. groupsClient, err := login.NewContainerGroupsClient(subscriptionID)
  115. if err != nil {
  116. return nil, err
  117. }
  118. var containerGroups []containerinstance.ContainerGroup
  119. result, err := groupsClient.ListByResourceGroup(ctx, resourceGroup)
  120. if err != nil {
  121. return nil, err
  122. }
  123. for result.NotDone() {
  124. containerGroups = append(containerGroups, result.Values()...)
  125. if err := result.NextWithContext(ctx); err != nil {
  126. return nil, err
  127. }
  128. }
  129. var groups []containerinstance.ContainerGroup
  130. for _, group := range containerGroups {
  131. group, err := groupsClient.Get(ctx, resourceGroup, *group.Name)
  132. if err != nil {
  133. return nil, err
  134. }
  135. groups = append(groups, group)
  136. }
  137. return groups, nil
  138. }
  139. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  140. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  141. if err != nil {
  142. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  143. }
  144. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  145. }
  146. func stopACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) error {
  147. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  148. if err != nil {
  149. return fmt.Errorf("cannot get container group client: %v", err)
  150. }
  151. result, err := containerGroupsClient.Stop(ctx, aciContext.ResourceGroup, containerGroupName)
  152. if result.IsHTTPStatus(http.StatusNotFound) {
  153. return errdefs.ErrNotFound
  154. }
  155. return err
  156. }
  157. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  158. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  159. if err != nil {
  160. return c, errors.Wrapf(err, "cannot get container client")
  161. }
  162. rows, cols := getTermSize()
  163. containerExecRequest := containerinstance.ContainerExecRequest{
  164. Command: to.StringPtr(command),
  165. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  166. Rows: rows,
  167. Cols: cols,
  168. },
  169. }
  170. return containerClient.ExecuteCommand(
  171. ctx,
  172. aciContext.ResourceGroup,
  173. containerGroup,
  174. containerName,
  175. containerExecRequest)
  176. }
  177. func getTermSize() (*int32, *int32) {
  178. rows := tm.Height()
  179. cols := tm.Width()
  180. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  181. }
  182. func exec(ctx context.Context, address string, password string, request containers.ExecRequest) error {
  183. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  184. if err != nil {
  185. return err
  186. }
  187. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  188. if err != nil {
  189. return err
  190. }
  191. downstreamChannel := make(chan error, 10)
  192. upstreamChannel := make(chan error, 10)
  193. go func() {
  194. for {
  195. msg, _, err := wsutil.ReadServerData(conn)
  196. if err != nil {
  197. if err == io.EOF {
  198. downstreamChannel <- nil
  199. return
  200. }
  201. downstreamChannel <- err
  202. return
  203. }
  204. fmt.Fprint(request.Stdout, string(msg))
  205. }
  206. }()
  207. if request.Interactive {
  208. go func() {
  209. for {
  210. // We send each byte, byte-per-byte over the
  211. // websocket because the console is in raw mode
  212. buffer := make([]byte, 1)
  213. n, err := request.Stdin.Read(buffer)
  214. if err != nil {
  215. if err == io.EOF {
  216. upstreamChannel <- nil
  217. return
  218. }
  219. upstreamChannel <- err
  220. return
  221. }
  222. if n > 0 {
  223. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  224. if err != nil {
  225. upstreamChannel <- err
  226. return
  227. }
  228. }
  229. }
  230. }()
  231. }
  232. for {
  233. select {
  234. case err := <-downstreamChannel:
  235. return errors.Wrap(err, "failed to read input from container")
  236. case err := <-upstreamChannel:
  237. return errors.Wrap(err, "failed to send input to container")
  238. }
  239. }
  240. }
  241. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  242. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  243. if err != nil {
  244. return "", errors.Wrapf(err, "cannot get container client")
  245. }
  246. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  247. if err != nil {
  248. return "", fmt.Errorf("cannot get container logs: %v", err)
  249. }
  250. if logs.Content == nil {
  251. return "", nil
  252. }
  253. return *logs.Content, err
  254. }
  255. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, req containers.LogsRequest) error {
  256. numLines := 0
  257. previousLogLines := ""
  258. firstDisplay := true // optimization to exit sooner in cases like docker run hello-world, do not wait another 2 secs.
  259. for {
  260. select {
  261. case <-ctx.Done():
  262. return nil
  263. default:
  264. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  265. if err != nil {
  266. return err
  267. }
  268. logLines := strings.Split(logs, "\n")
  269. currentOutput := len(logLines)
  270. // Note: a backend should not do this normally, this breaks the log
  271. // streaming over gRPC but this is the only thing we can do with
  272. // the kind of logs ACI is giving us. Hopefully Azue will give us
  273. // a real logs streaming api soon.
  274. b := aec.EmptyBuilder
  275. b = b.Up(uint(numLines))
  276. fmt.Fprint(req.Writer, b.Column(0).ANSI)
  277. numLines = getBacktrackLines(logLines, req.Width)
  278. for i := 0; i < currentOutput-1; i++ {
  279. fmt.Fprintln(req.Writer, logLines[i])
  280. }
  281. if (firstDisplay || previousLogLines == logs) && !isContainerRunning(ctx, aciContext, containerGroupName, containerName) {
  282. return nil
  283. }
  284. firstDisplay = false
  285. previousLogLines = logs
  286. select {
  287. case <-ctx.Done():
  288. return nil
  289. case <-time.After(2 * time.Second):
  290. }
  291. }
  292. }
  293. }
  294. func isContainerRunning(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) bool {
  295. group, err := getACIContainerGroup(ctx, aciContext, containerGroupName)
  296. if err != nil {
  297. return false // group has disappeared
  298. }
  299. for _, container := range *group.Containers {
  300. if *container.Name == containerName {
  301. if convert.GetStatus(container, group) == convert.StatusRunning {
  302. return true
  303. }
  304. }
  305. }
  306. return false
  307. }
  308. func getBacktrackLines(lines []string, terminalWidth int) int {
  309. if terminalWidth == 0 { // no terminal width has been set, do not divide by zero
  310. return len(lines)
  311. }
  312. numLines := 0
  313. for i := 0; i < len(lines)-1; i++ {
  314. numLines++
  315. if len(lines[i]) > terminalWidth {
  316. numLines += len(lines[i]) / terminalWidth
  317. }
  318. }
  319. return numLines
  320. }