aci.go 7.9 KB


  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/Azure/azure-sdk-for-go/profiles/2019-03-01/resources/mgmt/resources"
  12. "github.com/Azure/azure-sdk-for-go/profiles/preview/preview/subscription/mgmt/subscription"
  13. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  14. "github.com/Azure/azure-sdk-for-go/services/keyvault/auth"
  15. "github.com/Azure/go-autorest/autorest"
  16. "github.com/Azure/go-autorest/autorest/to"
  17. tm "github.com/buger/goterm"
  18. "github.com/gobwas/ws"
  19. "github.com/gobwas/ws/wsutil"
  20. "github.com/pkg/errors"
  21. "github.com/docker/api/context/store"
  22. )
  23. func init() {
  24. // required to get auth.NewAuthorizerFromCLI() to work, otherwise getting "The access token has been obtained for wrong audience or resource 'https://vault.azure.net'."
  25. err := os.Setenv("AZURE_KEYVAULT_RESOURCE", "https://management.azure.com")
  26. if err != nil {
  27. panic("unable to set environment variable AZURE_KEYVAULT_RESOURCE")
  28. }
  29. }
  30. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) error {
  31. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  32. if err != nil {
  33. return errors.Wrapf(err, "cannot get container group client")
  34. }
  35. // Check if the container group already exists
  36. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  37. if err != nil {
  38. if err, ok := err.(autorest.DetailedError); ok {
  39. if err.StatusCode != http.StatusNotFound {
  40. return err
  41. }
  42. } else {
  43. return err
  44. }
  45. } else {
  46. return fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  47. }
  48. future, err := containerGroupsClient.CreateOrUpdate(
  49. ctx,
  50. aciContext.ResourceGroup,
  51. *groupDefinition.Name,
  52. groupDefinition,
  53. )
  54. if err != nil {
  55. return err
  56. }
  57. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  58. if err != nil {
  59. return err
  60. }
  61. containerGroup, err := future.Result(containerGroupsClient)
  62. if err != nil {
  63. return err
  64. }
  65. if len(*containerGroup.Containers) > 1 {
  66. var commands []string
  67. for _, container := range *containerGroup.Containers {
  68. commands = append(commands, fmt.Sprintf("echo 127.0.0.1 %s >> /etc/hosts", *container.Name))
  69. }
  70. commands = append(commands, "exit")
  71. containers := *containerGroup.Containers
  72. container := containers[0]
  73. response, err := execACIContainer(ctx, aciContext, "/bin/sh", *containerGroup.Name, *container.Name)
  74. if err != nil {
  75. return err
  76. }
  77. if err = execCommands(
  78. ctx,
  79. *response.WebSocketURI,
  80. *response.Password,
  81. commands,
  82. ); err != nil {
  83. return err
  84. }
  85. }
  86. return err
  87. }
  88. func deleteACIContainerGroup(ctx context.Context, aciContext store.AciContext, containerGroupName string) (c containerinstance.ContainerGroup, err error) {
  89. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  90. if err != nil {
  91. return c, fmt.Errorf("cannot get container group client: %v", err)
  92. }
  93. return containerGroupsClient.Delete(ctx, aciContext.ResourceGroup, containerGroupName)
  94. }
  95. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  96. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  97. if err != nil {
  98. return c, errors.Wrapf(err, "cannot get container client")
  99. }
  100. rows, cols := getTermSize()
  101. containerExecRequest := containerinstance.ContainerExecRequest{
  102. Command: to.StringPtr(command),
  103. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  104. Rows: rows,
  105. Cols: cols,
  106. },
  107. }
  108. return containerClient.ExecuteCommand(
  109. ctx,
  110. aciContext.ResourceGroup,
  111. containerGroup,
  112. containerName,
  113. containerExecRequest)
  114. }
  115. func getTermSize() (*int32, *int32) {
  116. rows := tm.Height()
  117. cols := tm.Width()
  118. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  119. }
  120. type commandSender struct {
  121. commands string
  122. }
  123. func (cs *commandSender) Read(p []byte) (int, error) {
  124. if len(cs.commands) == 0 {
  125. return 0, io.EOF
  126. }
  127. var command string
  128. if len(p) >= len(cs.commands) {
  129. command = cs.commands
  130. cs.commands = ""
  131. } else {
  132. command = cs.commands[:len(p)]
  133. cs.commands = cs.commands[len(p):]
  134. }
  135. copy(p, command)
  136. return len(command), nil
  137. }
  138. func execCommands(ctx context.Context, address string, password string, commands []string) error {
  139. writer := ioutil.Discard
  140. reader := &commandSender{
  141. commands: strings.Join(commands, "\n"),
  142. }
  143. return exec(ctx, address, password, reader, writer)
  144. }
  145. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  146. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  147. if err != nil {
  148. return err
  149. }
  150. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  151. if err != nil {
  152. return err
  153. }
  154. downstreamChannel := make(chan error, 10)
  155. upstreamChannel := make(chan error, 10)
  156. go func() {
  157. for {
  158. msg, _, err := wsutil.ReadServerData(conn)
  159. if err != nil {
  160. if err == io.EOF {
  161. downstreamChannel <- nil
  162. return
  163. }
  164. downstreamChannel <- err
  165. return
  166. }
  167. fmt.Fprint(writer, string(msg))
  168. }
  169. }()
  170. go func() {
  171. for {
  172. // We send each byte, byte-per-byte over the
  173. // websocket because the console is in raw mode
  174. buffer := make([]byte, 1)
  175. n, err := reader.Read(buffer)
  176. if err != nil {
  177. if err == io.EOF {
  178. upstreamChannel <- nil
  179. return
  180. }
  181. upstreamChannel <- err
  182. return
  183. }
  184. if n > 0 {
  185. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  186. if err != nil {
  187. upstreamChannel <- err
  188. return
  189. }
  190. }
  191. }
  192. }()
  193. for {
  194. select {
  195. case err := <-downstreamChannel:
  196. return errors.Wrap(err, "failed to read input from container")
  197. case err := <-upstreamChannel:
  198. return errors.Wrap(err, "failed to send input to container")
  199. }
  200. }
  201. }
  202. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) (string, error) {
  203. containerClient, err := getContainerClient(aciContext.SubscriptionID)
  204. if err != nil {
  205. return "", errors.Wrapf(err, "cannot get container client")
  206. }
  207. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, nil)
  208. if err != nil {
  209. return "", fmt.Errorf("cannot get container logs: %v", err)
  210. }
  211. return *logs.Content, err
  212. }
  213. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  214. auth, err := auth.NewAuthorizerFromCLI()
  215. if err != nil {
  216. return containerinstance.ContainerGroupsClient{}, err
  217. }
  218. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  219. containerGroupsClient.Authorizer = auth
  220. containerGroupsClient.PollingDelay = 5 * time.Second
  221. containerGroupsClient.RetryAttempts = 30
  222. containerGroupsClient.RetryDuration = 1 * time.Second
  223. return containerGroupsClient, nil
  224. }
  225. func getContainerClient(subscriptionID string) (containerinstance.ContainerClient, error) {
  226. auth, err := auth.NewAuthorizerFromCLI()
  227. if err != nil {
  228. return containerinstance.ContainerClient{}, err
  229. }
  230. containerClient := containerinstance.NewContainerClient(subscriptionID)
  231. containerClient.Authorizer = auth
  232. return containerClient, nil
  233. }
  234. func getSubscriptionsClient() subscription.SubscriptionsClient {
  235. subc := subscription.NewSubscriptionsClient()
  236. authorizer, _ := auth.NewAuthorizerFromCLI()
  237. subc.Authorizer = authorizer
  238. return subc
  239. }
  240. //GetGroupsClient ...
  241. func GetGroupsClient(subscriptionID string) resources.GroupsClient {
  242. groupsClient := resources.NewGroupsClient(subscriptionID)
  243. authorizer, _ := auth.NewAuthorizerFromCLI()
  244. groupsClient.Authorizer = authorizer
  245. return groupsClient
  246. }
  247. //GetSubscriptionID ...
  248. func GetSubscriptionID(ctx context.Context) (string, error) {
  249. c := getSubscriptionsClient()
  250. res, err := c.List(ctx)
  251. if err != nil {
  252. return "", err
  253. }
  254. subs := res.Values()
  255. if len(subs) == 0 {
  256. return "", errors.New("no subscriptions found")
  257. }
  258. sub := subs[0]
  259. return *sub.SubscriptionID, nil
  260. }