aci.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package aci
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2019-12-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. tm "github.com/buger/goterm"
  25. "github.com/compose-spec/compose-go/types"
  26. "github.com/gobwas/ws"
  27. "github.com/gobwas/ws/wsutil"
  28. "github.com/morikuni/aec"
  29. "github.com/pkg/errors"
  30. "github.com/docker/compose-cli/aci/convert"
  31. "github.com/docker/compose-cli/aci/login"
  32. "github.com/docker/compose-cli/api/client"
  33. "github.com/docker/compose-cli/api/containers"
  34. "github.com/docker/compose-cli/context/store"
  35. "github.com/docker/compose-cli/errdefs"
  36. "github.com/docker/compose-cli/progress"
  37. )
  38. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  39. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  40. if err != nil {
  41. return errors.Wrapf(err, "cannot get container group client")
  42. }
  43. // Check if the container group already exists
  44. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  45. if err != nil {
  46. if err, ok := err.(autorest.DetailedError); ok {
  47. if err.StatusCode != http.StatusNotFound {
  48. return err
  49. }
  50. } else {
  51. return err
  52. }
  53. } else {
  54. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  55. }
  56. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  57. }
  58. func autocreateFileshares(ctx context.Context, project *types.Project) error {
  59. clt, err := client.New(ctx)
  60. if err != nil {
  61. return err
  62. }
  63. for _, v := range project.Volumes {
  64. if v.Driver != convert.AzureFileDriverName {
  65. return fmt.Errorf("cannot use ACI volume, required driver is %q, found %q", convert.AzureFileDriverName, v.Driver)
  66. }
  67. shareName, ok := v.DriverOpts[convert.VolumeDriveroptsShareNameKey]
  68. if !ok {
  69. return fmt.Errorf("cannot retrieve fileshare name for Azure file share")
  70. }
  71. accountName, ok := v.DriverOpts[convert.VolumeDriveroptsAccountNameKey]
  72. if !ok {
  73. return fmt.Errorf("cannot retrieve account name for Azure file share")
  74. }
  75. _, err = clt.VolumeService().Inspect(ctx, fmt.Sprintf("%s/%s", accountName, shareName))
  76. if err != nil { // Not found, autocreate fileshare
  77. if !errdefs.IsNotFoundError(err) {
  78. return err
  79. }
  80. aciVolumeOpts := &VolumeCreateOptions{
  81. Account: accountName,
  82. }
  83. if _, err = clt.VolumeService().Create(ctx, shareName, aciVolumeOpts); err != nil {
  84. return err
  85. }
  86. }
  87. }
  88. return nil
  89. }
  90. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  91. w := progress.ContextWriter(ctx)
  92. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  93. if err != nil {
  94. return errors.Wrapf(err, "cannot get container group client")
  95. }
  96. groupDisplay := "Group " + *groupDefinition.Name
  97. w.Event(progress.Event{
  98. ID: groupDisplay,
  99. Status: progress.Working,
  100. StatusText: "Waiting",
  101. })
  102. future, err := containerGroupsClient.CreateOrUpdate(
  103. ctx,
  104. aciContext.ResourceGroup,
  105. *groupDefinition.Name,
  106. groupDefinition,
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. w.Event(progress.Event{
  112. ID: groupDisplay,
  113. Status: progress.Done,
  114. StatusText: "Created",
  115. })
  116. for _, c := range *groupDefinition.Containers {
  117. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  118. w.Event(progress.Event{
  119. ID: *c.Name,
  120. Status: progress.Working,
  121. StatusText: "Waiting",
  122. })
  123. }
  124. }
  125. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  126. if err != nil {
  127. return err
  128. }
  129. for _, c := range *groupDefinition.Containers {
  130. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  131. w.Event(progress.Event{
  132. ID: *c.Name,
  133. Status: progress.Done,
  134. StatusText: "Done",
  135. })
  136. }
  137. }
  138. return err
  139. }
  140. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  141. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  142. if err != nil {
  143. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  144. }
  145. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  146. }
  147. func getACIContainerGroups(ctx context.Context, subscriptionID string, resourceGroup string) ([]containerinstance.ContainerGroup, error) {
  148. groupsClient, err := login.NewContainerGroupsClient(subscriptionID)
  149. if err != nil {
  150. return nil, err
  151. }
  152. var containerGroups []containerinstance.ContainerGroup
  153. result, err := groupsClient.ListByResourceGroup(ctx, resourceGroup)
  154. if err != nil {
  155. return nil, err
  156. }
  157. for result.NotDone() {
  158. containerGroups = append(containerGroups, result.Values()...)
  159. if err := result.NextWithContext(ctx); err != nil {
  160. return nil, err
  161. }
  162. }
  163. var groups []containerinstance.ContainerGroup
  164. for _, group := range containerGroups {
  165. group, err := groupsClient.Get(ctx, resourceGroup, *group.Name)
  166. if err != nil {
  167. return nil, err
  168. }
  169. groups = append(groups, group)
  170. }
  171. return groups, nil
  172. }
  173. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  174. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  175. if err != nil {
  176. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  177. }
  178. result, err := containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  179. if err != nil {
  180. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot delete container group: %v", err)
  181. }
  182. return result.Result(containerGroupsClient)
  183. }
  184. func stopACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) error {
  185. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  186. if err != nil {
  187. return fmt.Errorf("cannot get container group client: %v", err)
  188. }
  189. result, err := containerGroupsClient.Stop(ctx, aciContext.ResourceGroup, containerGroupName)
  190. if result.IsHTTPStatus(http.StatusNotFound) {
  191. return errdefs.ErrNotFound
  192. }
  193. return err
  194. }
  195. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  196. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  197. if err != nil {
  198. return c, errors.Wrapf(err, "cannot get container client")
  199. }
  200. rows, cols := getTermSize()
  201. containerExecRequest := containerinstance.ContainerExecRequest{
  202. Command: to.StringPtr(command),
  203. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  204. Rows: rows,
  205. Cols: cols,
  206. },
  207. }
  208. return containerClient.ExecuteCommand(
  209. ctx,
  210. aciContext.ResourceGroup,
  211. containerGroup,
  212. containerName,
  213. containerExecRequest)
  214. }
  215. func getTermSize() (*int32, *int32) {
  216. rows := tm.Height()
  217. cols := tm.Width()
  218. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  219. }
  220. func exec(ctx context.Context, address string, password string, request containers.ExecRequest) error {
  221. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  222. if err != nil {
  223. return err
  224. }
  225. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  226. if err != nil {
  227. return err
  228. }
  229. downstreamChannel := make(chan error, 10)
  230. upstreamChannel := make(chan error, 10)
  231. go func() {
  232. for {
  233. msg, _, err := wsutil.ReadServerData(conn)
  234. if err != nil {
  235. if err == io.EOF {
  236. downstreamChannel <- nil
  237. return
  238. }
  239. downstreamChannel <- err
  240. return
  241. }
  242. fmt.Fprint(request.Stdout, string(msg))
  243. }
  244. }()
  245. if request.Interactive {
  246. go func() {
  247. for {
  248. // We send each byte, byte-per-byte over the
  249. // websocket because the console is in raw mode
  250. buffer := make([]byte, 1)
  251. n, err := request.Stdin.Read(buffer)
  252. if err != nil {
  253. if err == io.EOF {
  254. upstreamChannel <- nil
  255. return
  256. }
  257. upstreamChannel <- err
  258. return
  259. }
  260. if n > 0 {
  261. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  262. if err != nil {
  263. upstreamChannel <- err
  264. return
  265. }
  266. }
  267. }
  268. }()
  269. }
  270. for {
  271. select {
  272. case err := <-downstreamChannel:
  273. return errors.Wrap(err, "failed to read input from container")
  274. case err := <-upstreamChannel:
  275. return errors.Wrap(err, "failed to send input to container")
  276. }
  277. }
  278. }
  279. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  280. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  281. if err != nil {
  282. return "", errors.Wrapf(err, "cannot get container client")
  283. }
  284. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  285. if err != nil {
  286. return "", fmt.Errorf("cannot get container logs: %v", err)
  287. }
  288. if logs.Content == nil {
  289. return "", nil
  290. }
  291. return *logs.Content, err
  292. }
  293. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, req containers.LogsRequest) error {
  294. numLines := 0
  295. previousLogLines := ""
  296. firstDisplay := true // optimization to exit sooner in cases like docker run hello-world, do not wait another 2 secs.
  297. for {
  298. select {
  299. case <-ctx.Done():
  300. return nil
  301. default:
  302. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  303. if err != nil {
  304. return err
  305. }
  306. logLines := strings.Split(logs, "\n")
  307. currentOutput := len(logLines)
  308. // Note: a backend should not do this normally, this breaks the log
  309. // streaming over gRPC but this is the only thing we can do with
  310. // the kind of logs ACI is giving us. Hopefully Azue will give us
  311. // a real logs streaming api soon.
  312. b := aec.EmptyBuilder
  313. b = b.Up(uint(numLines))
  314. fmt.Fprint(req.Writer, b.Column(0).ANSI)
  315. numLines = getBacktrackLines(logLines, req.Width)
  316. for i := 0; i < currentOutput-1; i++ {
  317. fmt.Fprintln(req.Writer, logLines[i])
  318. }
  319. if (firstDisplay || previousLogLines == logs) && !isContainerRunning(ctx, aciContext, containerGroupName, containerName) {
  320. return nil
  321. }
  322. firstDisplay = false
  323. previousLogLines = logs
  324. select {
  325. case <-ctx.Done():
  326. return nil
  327. case <-time.After(2 * time.Second):
  328. }
  329. }
  330. }
  331. }
  332. func isContainerRunning(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) bool {
  333. group, err := getACIContainerGroup(ctx, aciContext, containerGroupName)
  334. if err != nil {
  335. return false // group has disappeared
  336. }
  337. for _, container := range *group.Containers {
  338. if *container.Name == containerName {
  339. if convert.GetStatus(container, group) == convert.StatusRunning {
  340. return true
  341. }
  342. }
  343. }
  344. return false
  345. }
  346. func getBacktrackLines(lines []string, terminalWidth int) int {
  347. if terminalWidth == 0 { // no terminal width has been set, do not divide by zero
  348. return len(lines)
  349. }
  350. numLines := 0
  351. for i := 0; i < len(lines)-1; i++ {
  352. numLines++
  353. if len(lines[i]) > terminalWidth {
  354. numLines += len(lines[i]) / terminalWidth
  355. }
  356. }
  357. return numLines
  358. }