aci.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  11. "github.com/Azure/go-autorest/autorest"
  12. "github.com/Azure/go-autorest/autorest/to"
  13. tm "github.com/buger/goterm"
  14. "github.com/gobwas/ws"
  15. "github.com/gobwas/ws/wsutil"
  16. "github.com/pkg/errors"
  17. "github.com/docker/api/azure/login"
  18. "github.com/docker/api/context/store"
  19. "github.com/docker/api/progress"
  20. )
  21. const aciDockerUserAgent = "docker-cli"
  22. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  23. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  24. if err != nil {
  25. return errors.Wrapf(err, "cannot get container group client")
  26. }
  27. // Check if the container group already exists
  28. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  29. if err != nil {
  30. if err, ok := err.(autorest.DetailedError); ok {
  31. if err.StatusCode != http.StatusNotFound {
  32. return err
  33. }
  34. } else {
  35. return err
  36. }
  37. } else {
  38. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  39. }
  40. return createOrUpdateACIContainers(ctx, aciContext, groupDefinition)
  41. }
  42. func createOrUpdateACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  43. w := progress.ContextWriter(ctx)
  44. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  45. if err != nil {
  46. return errors.Wrapf(err, "cannot get container group client")
  47. }
  48. w.Event(progress.Event{
  49. ID: *groupDefinition.Name,
  50. Status: progress.Working,
  51. StatusText: "Waiting",
  52. })
  53. future, err := containerGroupsClient.CreateOrUpdate(
  54. ctx,
  55. aciContext.ResourceGroup,
  56. *groupDefinition.Name,
  57. groupDefinition,
  58. )
  59. if err != nil {
  60. return err
  61. }
  62. w.Event(progress.Event{
  63. ID: *groupDefinition.Name,
  64. Status: progress.Done,
  65. StatusText: "Created",
  66. })
  67. for _, c := range *groupDefinition.Containers {
  68. w.Event(progress.Event{
  69. ID: *c.Name,
  70. Status: progress.Working,
  71. StatusText: "Waiting",
  72. })
  73. }
  74. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  75. if err != nil {
  76. return err
  77. }
  78. for _, c := range *groupDefinition.Containers {
  79. w.Event(progress.Event{
  80. ID: *c.Name,
  81. Status: progress.Done,
  82. StatusText: "Done",
  83. })
  84. }
  85. return err
  86. }
  87. func getACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  88. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  89. if err != nil {
  90. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  91. }
  92. return containerGroupsClient.Get(ctx, aciContext.ResourceGroup, containerGroupName)
  93. }
  94. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (containerinstance.ContainerGroup, error) {
  95. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  96. if err != nil {
  97. return containerinstance.ContainerGroup{}, fmt.Errorf("cannot get container group client: %v", err)
  98. }
  99. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  100. }
  101. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  102. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  103. if err != nil {
  104. return c, errors.Wrapf(err, "cannot get container client")
  105. }
  106. rows, cols := getTermSize()
  107. containerExecRequest := containerinstance.ContainerExecRequest{
  108. Command: to.StringPtr(command),
  109. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  110. Rows: rows,
  111. Cols: cols,
  112. },
  113. }
  114. return containerClient.ExecuteCommand(
  115. ctx,
  116. aciContext.ResourceGroup,
  117. containerGroup,
  118. containerName,
  119. containerExecRequest)
  120. }
  121. func getTermSize() (*int32, *int32) {
  122. rows := tm.Height()
  123. cols := tm.Width()
  124. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  125. }
  126. type commandSender struct {
  127. commands string
  128. }
  129. func (cs *commandSender) Read(p []byte) (int, error) {
  130. if len(cs.commands) == 0 {
  131. return 0, io.EOF
  132. }
  133. var command string
  134. if len(p) >= len(cs.commands) {
  135. command = cs.commands
  136. cs.commands = ""
  137. } else {
  138. command = cs.commands[:len(p)]
  139. cs.commands = cs.commands[len(p):]
  140. }
  141. copy(p, command)
  142. return len(command), nil
  143. }
  144. func execCommands(ctx context.Context, address string, password string, commands []string) error {
  145. writer := ioutil.Discard
  146. reader := &commandSender{
  147. commands: strings.Join(commands, "\n"),
  148. }
  149. return exec(ctx, address, password, reader, writer)
  150. }
  151. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  152. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  153. if err != nil {
  154. return err
  155. }
  156. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  157. if err != nil {
  158. return err
  159. }
  160. downstreamChannel := make(chan error, 10)
  161. upstreamChannel := make(chan error, 10)
  162. go func() {
  163. for {
  164. msg, _, err := wsutil.ReadServerData(conn)
  165. if err != nil {
  166. if err == io.EOF {
  167. downstreamChannel <- nil
  168. return
  169. }
  170. downstreamChannel <- err
  171. return
  172. }
  173. fmt.Fprint(writer, string(msg))
  174. }
  175. }()
  176. go func() {
  177. for {
  178. // We send each byte, byte-per-byte over the
  179. // websocket because the console is in raw mode
  180. buffer := make([]byte, 1)
  181. n, err := reader.Read(buffer)
  182. if err != nil {
  183. if err == io.EOF {
  184. upstreamChannel <- nil
  185. return
  186. }
  187. upstreamChannel <- err
  188. return
  189. }
  190. if n > 0 {
  191. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  192. if err != nil {
  193. upstreamChannel <- err
  194. return
  195. }
  196. }
  197. }
  198. }()
  199. for {
  200. select {
  201. case err := <-downstreamChannel:
  202. return errors.Wrap(err, "failed to read input from container")
  203. case err := <-upstreamChannel:
  204. return errors.Wrap(err, "failed to send input to container")
  205. }
  206. }
  207. }
  208. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) (string, error) {
  209. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  210. if err != nil {
  211. return "", errors.Wrapf(err, "cannot get container client")
  212. }
  213. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, nil)
  214. if err != nil {
  215. return "", fmt.Errorf("cannot get container logs: %v", err)
  216. }
  217. return *logs.Content, err
  218. }
  219. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  220. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  221. err := setupClient(&containerGroupsClient.Client)
  222. if err != nil {
  223. return containerinstance.ContainerGroupsClient{}, err
  224. }
  225. containerGroupsClient.PollingDelay = 5 * time.Second
  226. containerGroupsClient.RetryAttempts = 30
  227. containerGroupsClient.RetryDuration = 1 * time.Second
  228. return containerGroupsClient, nil
  229. }
  230. func setupClient(aciClient *autorest.Client) error {
  231. aciClient.UserAgent = aciDockerUserAgent
  232. auth, err := login.NewAuthorizerFromLogin()
  233. if err != nil {
  234. return err
  235. }
  236. aciClient.Authorizer = auth
  237. return nil
  238. }
  239. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  240. containerClient := containerinstance.NewContainerClient(subscriptionID)
  241. err := setupClient(&containerClient.Client)
  242. if err != nil {
  243. return containerinstance.ContainerClient{}, err
  244. }
  245. return containerClient, nil
  246. }