aci.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package aci
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2019-12-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. tm "github.com/buger/goterm"
  25. "github.com/compose-spec/compose-go/types"
  26. "github.com/gobwas/ws"
  27. "github.com/gobwas/ws/wsutil"
  28. "github.com/morikuni/aec"
  29. "github.com/pkg/errors"
  30. "github.com/docker/compose-cli/aci/convert"
  31. "github.com/docker/compose-cli/aci/login"
  32. "github.com/docker/compose-cli/api/client"
  33. "github.com/docker/compose-cli/api/containers"
  34. "github.com/docker/compose-cli/api/context/store"
  35. "github.com/docker/compose-cli/api/errdefs"
  36. "github.com/docker/compose-cli/api/progress"
  37. )
  38. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  39. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  40. if err != nil {
  41. return errors.Wrapf(err, "cannot get container group client")
  42. }
  43. // Check if the container group already exists
  44. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  45. if err != nil {
  46. if err, ok := err.(autorest.DetailedError); ok {
  47. if err.StatusCode != http.StatusNotFound {
  48. return err
  49. }
  50. } else {
  51. return err
  52. }
  53. } else {
  54. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  55. }
  56. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  57. }
  58. func autocreateFileshares(ctx context.Context, project *types.Project) error {
  59. clt, err := client.New(ctx)
  60. if err != nil {
  61. return err
  62. }
  63. for _, v := range project.Volumes {
  64. if v.Driver != convert.AzureFileDriverName {
  65. return fmt.Errorf("cannot use ACI volume, required driver is %q, found %q", convert.AzureFileDriverName, v.Driver)
  66. }
  67. shareName, ok := v.DriverOpts[convert.VolumeDriveroptsShareNameKey]
  68. if !ok {
  69. return fmt.Errorf("cannot retrieve fileshare name for Azure file share")
  70. }
  71. accountName, ok := v.DriverOpts[convert.VolumeDriveroptsAccountNameKey]
  72. if !ok {
  73. return fmt.Errorf("cannot retrieve account name for Azure file share")
  74. }
  75. _, err = clt.VolumeService().Inspect(ctx, fmt.Sprintf("%s/%s", accountName, shareName))
  76. if err != nil { // Not found, autocreate fileshare
  77. if !errdefs.IsNotFoundError(err) {
  78. return err
  79. }
  80. aciVolumeOpts := &VolumeCreateOptions{
  81. Account: accountName,
  82. }
  83. if _, err = clt.VolumeService().Create(ctx, shareName, aciVolumeOpts); err != nil {
  84. return err
  85. }
  86. }
  87. }
  88. return nil
  89. }
  90. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  91. w := progress.ContextWriter(ctx)
  92. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  93. if err != nil {
  94. return errors.Wrapf(err, "cannot get container group client")
  95. }
  96. groupDisplay := "Group " + *groupDefinition.Name
  97. w.Event(progress.CreatingEvent(groupDisplay))
  98. future, err := containerGroupsClient.CreateOrUpdate(
  99. ctx,
  100. aciContext.ResourceGroup,
  101. *groupDefinition.Name,
  102. groupDefinition,
  103. )
  104. if err != nil {
  105. w.Event(progress.ErrorEvent(groupDisplay))
  106. return err
  107. }
  108. w.Event(progress.CreatedEvent(groupDisplay))
  109. for _, c := range *groupDefinition.Containers {
  110. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  111. w.Event(progress.CreatingEvent(*c.Name))
  112. }
  113. }
  114. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  115. if err != nil {
  116. return err
  117. }
  118. for _, c := range *groupDefinition.Containers {
  119. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  120. w.Event(progress.CreatedEvent(*c.Name))
  121. }
  122. }
  123. return err
  124. }
  125. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  126. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  127. if err != nil {
  128. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  129. }
  130. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  131. }
  132. func getACIContainerGroups(ctx context.Context, subscriptionID string, resourceGroup string) ([]containerinstance.ContainerGroup, error) {
  133. groupsClient, err := login.NewContainerGroupsClient(subscriptionID)
  134. if err != nil {
  135. return nil, err
  136. }
  137. var containerGroups []containerinstance.ContainerGroup
  138. result, err := groupsClient.ListByResourceGroup(ctx, resourceGroup)
  139. if err != nil {
  140. return nil, err
  141. }
  142. for result.NotDone() {
  143. containerGroups = append(containerGroups, result.Values()...)
  144. if err := result.NextWithContext(ctx); err != nil {
  145. return nil, err
  146. }
  147. }
  148. var groups []containerinstance.ContainerGroup
  149. for _, group := range containerGroups {
  150. group, err := groupsClient.Get(ctx, resourceGroup, *group.Name)
  151. if err != nil {
  152. return nil, err
  153. }
  154. groups = append(groups, group)
  155. }
  156. return groups, nil
  157. }
  158. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  159. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  160. if err != nil {
  161. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  162. }
  163. result, err := containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  164. if err != nil {
  165. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot delete container group: %v", err)
  166. }
  167. return result.Result(containerGroupsClient)
  168. }
  169. func stopACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) error {
  170. containerGroupsClient, err := login.NewContainerGroupsClient(aciContext.SubscriptionID)
  171. if err != nil {
  172. return fmt.Errorf("cannot get container group client: %v", err)
  173. }
  174. result, err := containerGroupsClient.Stop(ctx, aciContext.ResourceGroup, containerGroupName)
  175. if result.IsHTTPStatus(http.StatusNotFound) {
  176. return errdefs.ErrNotFound
  177. }
  178. return err
  179. }
  180. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  181. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  182. if err != nil {
  183. return c, errors.Wrapf(err, "cannot get container client")
  184. }
  185. rows, cols := getTermSize()
  186. containerExecRequest := containerinstance.ContainerExecRequest{
  187. Command: to.StringPtr(command),
  188. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  189. Rows: rows,
  190. Cols: cols,
  191. },
  192. }
  193. return containerClient.ExecuteCommand(
  194. ctx,
  195. aciContext.ResourceGroup,
  196. containerGroup,
  197. containerName,
  198. containerExecRequest)
  199. }
  200. func getTermSize() (*int32, *int32) {
  201. rows := tm.Height()
  202. cols := tm.Width()
  203. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  204. }
  205. func exec(ctx context.Context, address string, password string, request containers.ExecRequest) error {
  206. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  207. if err != nil {
  208. return err
  209. }
  210. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  211. if err != nil {
  212. return err
  213. }
  214. downstreamChannel := make(chan error, 10)
  215. upstreamChannel := make(chan error, 10)
  216. go func() {
  217. for {
  218. msg, _, err := wsutil.ReadServerData(conn)
  219. if err != nil {
  220. if err == io.EOF {
  221. downstreamChannel <- nil
  222. return
  223. }
  224. downstreamChannel <- err
  225. return
  226. }
  227. fmt.Fprint(request.Stdout, string(msg))
  228. }
  229. }()
  230. if request.Interactive {
  231. go func() {
  232. for {
  233. // We send each byte, byte-per-byte over the
  234. // websocket because the console is in raw mode
  235. buffer := make([]byte, 1)
  236. n, err := request.Stdin.Read(buffer)
  237. if err != nil {
  238. if err == io.EOF {
  239. upstreamChannel <- nil
  240. return
  241. }
  242. upstreamChannel <- err
  243. return
  244. }
  245. if n > 0 {
  246. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  247. if err != nil {
  248. upstreamChannel <- err
  249. return
  250. }
  251. }
  252. }
  253. }()
  254. }
  255. for {
  256. select {
  257. case err := <-downstreamChannel:
  258. return errors.Wrap(err, "failed to read input from container")
  259. case err := <-upstreamChannel:
  260. return errors.Wrap(err, "failed to send input to container")
  261. }
  262. }
  263. }
  264. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  265. containerClient, err := login.NewContainerClient(aciContext.SubscriptionID)
  266. if err != nil {
  267. return "", errors.Wrapf(err, "cannot get container client")
  268. }
  269. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  270. if err != nil {
  271. return "", fmt.Errorf("cannot get container logs: %v", err)
  272. }
  273. if logs.Content == nil {
  274. return "", nil
  275. }
  276. return *logs.Content, err
  277. }
  278. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, req containers.LogsRequest) error {
  279. numLines := 0
  280. previousLogLines := ""
  281. firstDisplay := true // optimization to exit sooner in cases like docker run hello-world, do not wait another 2 secs.
  282. for {
  283. select {
  284. case <-ctx.Done():
  285. return nil
  286. default:
  287. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  288. if err != nil {
  289. return err
  290. }
  291. logLines := strings.Split(logs, "\n")
  292. currentOutput := len(logLines)
  293. // Note: a backend should not do this normally, this breaks the log
  294. // streaming over gRPC but this is the only thing we can do with
  295. // the kind of logs ACI is giving us. Hopefully Azue will give us
  296. // a real logs streaming api soon.
  297. b := aec.EmptyBuilder
  298. b = b.Up(uint(numLines))
  299. fmt.Fprint(req.Writer, b.Column(0).ANSI)
  300. numLines = getBacktrackLines(logLines, req.Width)
  301. for i := 0; i < currentOutput-1; i++ {
  302. fmt.Fprintln(req.Writer, logLines[i])
  303. }
  304. if (firstDisplay || previousLogLines == logs) && !isContainerRunning(ctx, aciContext, containerGroupName, containerName) {
  305. return nil
  306. }
  307. firstDisplay = false
  308. previousLogLines = logs
  309. select {
  310. case <-ctx.Done():
  311. return nil
  312. case <-time.After(2 * time.Second):
  313. }
  314. }
  315. }
  316. }
  317. func isContainerRunning(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) bool {
  318. group, err := getACIContainerGroup(ctx, aciContext, containerGroupName)
  319. if err != nil {
  320. return false // group has disappeared
  321. }
  322. for _, container := range *group.Containers {
  323. if *container.Name == containerName {
  324. if convert.GetStatus(container, group) == convert.StatusRunning {
  325. return true
  326. }
  327. }
  328. }
  329. return false
  330. }
  331. func getBacktrackLines(lines []string, terminalWidth int) int {
  332. if terminalWidth == 0 { // no terminal width has been set, do not divide by zero
  333. return len(lines)
  334. }
  335. numLines := 0
  336. for i := 0; i < len(lines)-1; i++ {
  337. numLines++
  338. if len(lines[i]) > terminalWidth {
  339. numLines += len(lines[i]) / terminalWidth
  340. }
  341. }
  342. return numLines
  343. }