aci.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. /*
  2. Copyright 2020 Docker, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package azure
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "strings"
  20. "time"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  22. "github.com/Azure/go-autorest/autorest"
  23. "github.com/Azure/go-autorest/autorest/to"
  24. "github.com/buger/goterm"
  25. tm "github.com/buger/goterm"
  26. "github.com/gobwas/ws"
  27. "github.com/gobwas/ws/wsutil"
  28. "github.com/morikuni/aec"
  29. "github.com/pkg/errors"
  30. "github.com/docker/api/azure/convert"
  31. "github.com/docker/api/azure/login"
  32. "github.com/docker/api/context/store"
  33. "github.com/docker/api/progress"
  34. )
  35. const aciDockerUserAgent = "docker-cli"
  36. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  37. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  38. if err != nil {
  39. return errors.Wrapf(err, "cannot get container group client")
  40. }
  41. // Check if the container group already exists
  42. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  43. if err != nil {
  44. if err, ok := err.(autorest.DetailedError); ok {
  45. if err.StatusCode != http.StatusNotFound {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  53. }
  54. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  55. }
  56. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  57. w := progress.ContextWriter(ctx)
  58. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  59. if err != nil {
  60. return errors.Wrapf(err, "cannot get container group client")
  61. }
  62. w.Event(progress.Event{
  63. ID: *groupDefinition.Name,
  64. Status: progress.Working,
  65. StatusText: "Waiting",
  66. })
  67. future, err := containerGroupsClient.CreateOrUpdate(
  68. ctx,
  69. aciContext.ResourceGroup,
  70. *groupDefinition.Name,
  71. groupDefinition,
  72. )
  73. if err != nil {
  74. return err
  75. }
  76. w.Event(progress.Event{
  77. ID: *groupDefinition.Name,
  78. Status: progress.Done,
  79. StatusText: "Created",
  80. })
  81. for _, c := range *groupDefinition.Containers {
  82. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  83. w.Event(progress.Event{
  84. ID: *c.Name,
  85. Status: progress.Working,
  86. StatusText: "Waiting",
  87. })
  88. }
  89. }
  90. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  91. if err != nil {
  92. return err
  93. }
  94. for _, c := range *groupDefinition.Containers {
  95. if c.Name != nil && *c.Name != convert.ComposeDNSSidecarName {
  96. w.Event(progress.Event{
  97. ID: *c.Name,
  98. Status: progress.Done,
  99. StatusText: "Done",
  100. })
  101. }
  102. }
  103. return err
  104. }
  105. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  106. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  107. if err != nil {
  108. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  109. }
  110. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  111. }
  112. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  113. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  114. if err != nil {
  115. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  116. }
  117. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  118. }
  119. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  120. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  121. if err != nil {
  122. return c, errors.Wrapf(err, "cannot get container client")
  123. }
  124. rows, cols := getTermSize()
  125. containerExecRequest := containerinstance.ContainerExecRequest{
  126. Command: to.StringPtr(command),
  127. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  128. Rows: rows,
  129. Cols: cols,
  130. },
  131. }
  132. return containerClient.ExecuteCommand(
  133. ctx,
  134. aciContext.ResourceGroup,
  135. containerGroup,
  136. containerName,
  137. containerExecRequest)
  138. }
  139. func getTermSize() (*int32, *int32) {
  140. rows := tm.Height()
  141. cols := tm.Width()
  142. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  143. }
  144. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  145. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  146. if err != nil {
  147. return err
  148. }
  149. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  150. if err != nil {
  151. return err
  152. }
  153. downstreamChannel := make(chan error, 10)
  154. upstreamChannel := make(chan error, 10)
  155. go func() {
  156. for {
  157. msg, _, err := wsutil.ReadServerData(conn)
  158. if err != nil {
  159. if err == io.EOF {
  160. downstreamChannel <- nil
  161. return
  162. }
  163. downstreamChannel <- err
  164. return
  165. }
  166. fmt.Fprint(writer, string(msg))
  167. }
  168. }()
  169. go func() {
  170. for {
  171. // We send each byte, byte-per-byte over the
  172. // websocket because the console is in raw mode
  173. buffer := make([]byte, 1)
  174. n, err := reader.Read(buffer)
  175. if err != nil {
  176. if err == io.EOF {
  177. upstreamChannel <- nil
  178. return
  179. }
  180. upstreamChannel <- err
  181. return
  182. }
  183. if n > 0 {
  184. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  185. if err != nil {
  186. upstreamChannel <- err
  187. return
  188. }
  189. }
  190. }
  191. }()
  192. for {
  193. select {
  194. case err := <-downstreamChannel:
  195. return errors.Wrap(err, "failed to read input from container")
  196. case err := <-upstreamChannel:
  197. return errors.Wrap(err, "failed to send input to container")
  198. }
  199. }
  200. }
  201. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, tail *int32) (string, error) {
  202. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  203. if err != nil {
  204. return "", errors.Wrapf(err, "cannot get container client")
  205. }
  206. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, tail)
  207. if err != nil {
  208. return "", fmt.Errorf("cannot get container logs: %v", err)
  209. }
  210. return *logs.Content, err
  211. }
  212. func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, out io.Writer) error {
  213. terminalWidth := goterm.Width()
  214. numLines := 0
  215. for {
  216. select {
  217. case <-ctx.Done():
  218. return nil
  219. default:
  220. logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
  221. if err != nil {
  222. return err
  223. }
  224. logLines := strings.Split(logs, "\n")
  225. currentOutput := len(logLines)
  226. // Note: a backend should not do this normally, this breaks the log
  227. // streaming over gRPC but this is the only thing we can do with
  228. // the kind of logs ACI is giving us. Hopefully Azue will give us
  229. // a real logs streaming api soon.
  230. b := aec.EmptyBuilder
  231. b = b.Up(uint(numLines))
  232. fmt.Fprint(out, b.Column(0).ANSI)
  233. for i := 0; i < currentOutput-1; i++ {
  234. fmt.Fprintln(out, logLines[i])
  235. }
  236. numLines = 0
  237. for i := 0; i < currentOutput-1; i++ {
  238. numLines++
  239. if len(logLines[i]) > terminalWidth {
  240. numLines += len(logLines[i]) / terminalWidth
  241. }
  242. }
  243. time.Sleep(2 * time.Second)
  244. }
  245. }
  246. }
  247. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  248. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  249. err := setupClient(&containerGroupsClient.Client)
  250. if err != nil {
  251. return containerinstance.ContainerGroupsClient{}, err
  252. }
  253. containerGroupsClient.PollingDelay = 5 * time.Second
  254. containerGroupsClient.RetryAttempts = 30
  255. containerGroupsClient.RetryDuration = 1 * time.Second
  256. return containerGroupsClient, nil
  257. }
  258. func setupClient(aciClient *autorest.Client) error {
  259. aciClient.UserAgent = aciDockerUserAgent
  260. auth, err := login.NewAuthorizerFromLogin()
  261. if err != nil {
  262. return err
  263. }
  264. aciClient.Authorizer = auth
  265. return nil
  266. }
  267. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  268. containerClient := containerinstance.NewContainerClient(subscriptionID)
  269. err := setupClient(&containerClient.Client)
  270. if err != nil {
  271. return containerinstance.ContainerClient{}, err
  272. }
  273. return containerClient, nil
  274. }