aci.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package azure
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. tm "github.com/buger/goterm"
  25. "github.com/gobwas/ws"
  26. "github.com/gobwas/ws/wsutil"
  27. "github.com/morikuni/aec"
  28. "github.com/pkg/errors"
  29. "github.com/docker/api/azure/convert"
  30. "github.com/docker/api/azure/login"
  31. "github.com/docker/api/containers"
  32. "github.com/docker/api/context/store"
  33. "github.com/docker/api/progress"
  34. )
  35. const aciDockerUserAgent = "docker-cli"
  36. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  37. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  38. if err != nil {
  39. return errors.Wrapf(err, "cannot get container group client")
  40. }
  41. // Check if the container group already exists
  42. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  43. if err != nil {
  44. if err, ok := err.(autorest.DetailedError); ok {
  45. if err.StatusCode != http.StatusNotFound {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  53. }
  54. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  55. }
  56. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  57. w := progress.ContextWriter(ctx)
  58. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  59. if err != nil {
  60. return errors.Wrapf(err, "cannot get container group client")
  61. }
  62. w.Event(progress.Event{
  63. ID: *groupDefinition.Name,
  64. Status: progress.Working,
  65. StatusText: "Waiting",
  66. })
  67. future, err := containerGroupsClient.CreateOrUpdate(
  68. ctx,
  69. aciContext.ResourceGroup,
  70. *groupDefinition.Name,
  71. groupDefinition,
  72. )
  73. if err != nil {
  74. return err
  75. }
  76. w.Event(progress.Event{
  77. ID: *groupDefinition.Name,
  78. Status: progress.Done,
  79. StatusText: "Created",
  80. })
  81. for _, c := range *groupDefinition.Containers {
  82. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  83. w.Event(progress.Event{
  84. ID: *c.Name,
  85. Status: progress.Working,
  86. StatusText: "Waiting",
  87. })
  88. }
  89. }
  90. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  91. if err != nil {
  92. return err
  93. }
  94. for _, c := range *groupDefinition.Containers {
  95. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  96. w.Event(progress.Event{
  97. ID: *c.Name,
  98. Status: progress.Done,
  99. StatusText: "Done",
  100. })
  101. }
  102. }
  103. return err
  104. }
  105. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  106. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  107. if err != nil {
  108. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  109. }
  110. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  111. }
  112. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  113. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  114. if err != nil {
  115. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  116. }
  117. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  118. }
  119. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  120. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  121. if err != nil {
  122. return c, errors.Wrapf(err, "cannot get container client")
  123. }
  124. rows, cols := getTermSize()
  125. containerExecRequest := containerinstance.ContainerExecRequest{
  126. Command: to.StringPtr(command),
  127. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  128. Rows: rows,
  129. Cols: cols,
  130. },
  131. }
  132. return containerClient.ExecuteCommand(
  133. ctx,
  134. aciContext.ResourceGroup,
  135. containerGroup,
  136. containerName,
  137. containerExecRequest)
  138. }
  139. func getTermSize() (*int32, *int32) {
  140. rows := tm.Height()
  141. cols := tm.Width()
  142. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  143. }
  144. func exec(ctx context.Context, address string, password string, request containers.ExecRequest) error {
  145. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  146. if err != nil {
  147. return err
  148. }
  149. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  150. if err != nil {
  151. return err
  152. }
  153. downstreamChannel := make(chan error, 10)
  154. upstreamChannel := make(chan error, 10)
  155. go func() {
  156. for {
  157. msg, _, err := wsutil.ReadServerData(conn)
  158. if err != nil {
  159. if err == io.EOF {
  160. downstreamChannel <- nil
  161. return
  162. }
  163. downstreamChannel <- err
  164. return
  165. }
  166. fmt.Fprint(request.Stdout, string(msg))
  167. }
  168. }()
  169. if request.Interactive {
  170. go func() {
  171. for {
  172. // We send each byte, byte-per-byte over the
  173. // websocket because the console is in raw mode
  174. buffer := make([]byte, 1)
  175. n, err := request.Stdin.Read(buffer)
  176. if err != nil {
  177. if err == io.EOF {
  178. upstreamChannel <- nil
  179. return
  180. }
  181. upstreamChannel <- err
  182. return
  183. }
  184. if n > 0 {
  185. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  186. if err != nil {
  187. upstreamChannel <- err
  188. return
  189. }
  190. }
  191. }
  192. }()
  193. }
  194. for {
  195. select {
  196. case err := <-downstreamChannel:
  197. return errors.Wrap(err, "failed to read input from container")
  198. case err := <-upstreamChannel:
  199. return errors.Wrap(err, "failed to send input to container")
  200. }
  201. }
  202. }
  203. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  204. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  205. if err != nil {
  206. return "", errors.Wrapf(err, "cannot get container client")
  207. }
  208. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  209. if err != nil {
  210. return "", fmt.Errorf("cannot get container logs: %v", err)
  211. }
  212. return *logs.Content, err
  213. }
  214. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, req containers.LogsRequest) error {
  215. numLines := 0
  216. for {
  217. select {
  218. case <-ctx.Done():
  219. return nil
  220. default:
  221. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  222. if err != nil {
  223. return err
  224. }
  225. logLines := strings.Split(logs, "\n")
  226. currentOutput := len(logLines)
  227. // Note: a backend should not do this normally, this breaks the log
  228. // streaming over gRPC but this is the only thing we can do with
  229. // the kind of logs ACI is giving us. Hopefully Azue will give us
  230. // a real logs streaming api soon.
  231. b := aec.EmptyBuilder
  232. b = b.Up(uint(numLines))
  233. fmt.Fprint(req.Writer, b.Column(0).ANSI)
  234. numLines = getBacktrackLines(logLines, req.Width)
  235. for i := 0; i < currentOutput-1; i++ {
  236. fmt.Fprintln(req.Writer, logLines[i])
  237. }
  238. select {
  239. case <-ctx.Done():
  240. return nil
  241. case <-time.After(2 * time.Second):
  242. }
  243. }
  244. }
  245. }
  246. func getBacktrackLines(lines []string, terminalWidth int) int {
  247. if terminalWidth == 0 { // no terminal width has been set, do not divide by zero
  248. return len(lines)
  249. }
  250. numLines := 0
  251. for i := 0; i < len(lines)-1; i++ {
  252. numLines++
  253. if len(lines[i]) > terminalWidth {
  254. numLines += len(lines[i]) / terminalWidth
  255. }
  256. }
  257. return numLines
  258. }
  259. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  260. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  261. err := setupClient(&containerGroupsClient.Client)
  262. if err != nil {
  263. return containerinstance.ContainerGroupsClient{}, err
  264. }
  265. containerGroupsClient.PollingDelay = 5 * time.Second
  266. containerGroupsClient.RetryAttempts = 30
  267. containerGroupsClient.RetryDuration = 1 * time.Second
  268. return containerGroupsClient, nil
  269. }
  270. func setupClient(aciClient *autorest.Client) error {
  271. aciClient.UserAgent = aciDockerUserAgent
  272. auth, err := login.NewAuthorizerFromLogin()
  273. if err != nil {
  274. return err
  275. }
  276. aciClient.Authorizer = auth
  277. return nil
  278. }
  279. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  280. containerClient := containerinstance.NewContainerClient(subscriptionID)
  281. err := setupClient(&containerClient.Client)
  282. if err != nil {
  283. return containerinstance.ContainerClient{}, err
  284. }
  285. return containerClient, nil
  286. }