aci.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "os"
  9. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  10. "github.com/Azure/azure-sdk-for-go/services/keyvault/auth"
  11. "github.com/Azure/go-autorest/autorest"
  12. "github.com/Azure/go-autorest/autorest/to"
  13. "github.com/gobwas/ws"
  14. "github.com/gobwas/ws/wsutil"
  15. "github.com/pkg/errors"
  16. "github.com/docker/api/context/store"
  17. tm "github.com/buger/goterm"
  18. )
  19. func init() {
  20. // required to get auth.NewAuthorizerFromCLI() to work, otherwise getting "The access token has been obtained for wrong audience or resource 'https://vault.azure.net'."
  21. err := os.Setenv("AZURE_KEYVAULT_RESOURCE", "https://management.azure.com")
  22. if err != nil {
  23. panic("unable to set environment variable AZURE_KEYVAULT_RESOURCE")
  24. }
  25. }
  26. func createACIContainers(ctx context.Context, aciContext store.AciContext, groupDefinition containerinstance.ContainerGroup) (c containerinstance.ContainerGroup, err error) {
  27. containerGroupsClient := getContainerGroupsClient(aciContext.SubscriptionID)
  28. // Check if the container group already exists
  29. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  30. if err != nil {
  31. if err, ok := err.(autorest.DetailedError); ok {
  32. if err.StatusCode != http.StatusNotFound {
  33. return c, err
  34. }
  35. } else {
  36. return c, err
  37. }
  38. } else {
  39. return c, fmt.Errorf("container group %q already exists", *groupDefinition.Name)
  40. }
  41. future, err := containerGroupsClient.CreateOrUpdate(
  42. ctx,
  43. aciContext.ResourceGroup,
  44. *groupDefinition.Name,
  45. groupDefinition,
  46. )
  47. if err != nil {
  48. return c, err
  49. }
  50. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  51. if err != nil {
  52. return c, err
  53. }
  54. containerGroup, err := future.Result(containerGroupsClient)
  55. if err != nil {
  56. return c, err
  57. }
  58. if len(*containerGroup.Containers) > 1 {
  59. var commands []string
  60. for _, container := range *containerGroup.Containers {
  61. commands = append(commands, fmt.Sprintf("echo 127.0.0.1 %s >> /etc/hosts", *container.Name))
  62. }
  63. commands = append(commands, "exit")
  64. containers := *containerGroup.Containers
  65. container := containers[0]
  66. response, err := execACIContainer(ctx, aciContext, "/bin/sh", *containerGroup.Name, *container.Name)
  67. if err != nil {
  68. return c, err
  69. }
  70. if err = execCommands(
  71. ctx,
  72. *response.WebSocketURI,
  73. *response.Password,
  74. commands,
  75. ); err != nil {
  76. return containerinstance.ContainerGroup{}, err
  77. }
  78. }
  79. return containerGroup, err
  80. }
  81. func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
  82. containerClient := getContainerClient(aciContext.SubscriptionID)
  83. rows, cols := getTermSize()
  84. containerExecRequest := containerinstance.ContainerExecRequest{
  85. Command: to.StringPtr(command),
  86. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  87. Rows: rows,
  88. Cols: cols,
  89. },
  90. }
  91. return containerClient.ExecuteCommand(
  92. ctx,
  93. aciContext.ResourceGroup,
  94. containerGroup,
  95. containerName,
  96. containerExecRequest)
  97. }
  98. func getTermSize() (*int32, *int32) {
  99. rows := tm.Height()
  100. cols := tm.Width()
  101. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  102. }
  103. type commandSender struct {
  104. commands []string
  105. }
  106. func (cs commandSender) Read(p []byte) (int, error) {
  107. if len(cs.commands) == 0 {
  108. return 0, io.EOF
  109. }
  110. command := cs.commands[0]
  111. cs.commands = cs.commands[1:]
  112. copy(p, command)
  113. return len(command), nil
  114. }
  115. func execCommands(ctx context.Context, address string, password string, commands []string) error {
  116. writer := ioutil.Discard
  117. reader := commandSender{
  118. commands: commands,
  119. }
  120. return exec(ctx, address, password, reader, writer)
  121. }
  122. func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
  123. conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
  124. if err != nil {
  125. return err
  126. }
  127. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
  128. if err != nil {
  129. return err
  130. }
  131. downstreamChannel := make(chan error, 10)
  132. upstreamChannel := make(chan error, 10)
  133. go func() {
  134. for {
  135. msg, _, err := wsutil.ReadServerData(conn)
  136. if err != nil {
  137. if err == io.EOF {
  138. downstreamChannel <- nil
  139. return
  140. }
  141. downstreamChannel <- err
  142. return
  143. }
  144. fmt.Fprint(writer, string(msg))
  145. }
  146. }()
  147. go func() {
  148. for {
  149. // We send each byte, byte-per-byte over the
  150. // websocket because the console is in raw mode
  151. buffer := make([]byte, 1)
  152. n, err := reader.Read(buffer)
  153. if err != nil {
  154. upstreamChannel <- err
  155. return
  156. }
  157. if n > 0 {
  158. err := wsutil.WriteClientMessage(conn, ws.OpText, buffer)
  159. if err != nil {
  160. upstreamChannel <- err
  161. }
  162. }
  163. }
  164. }()
  165. for {
  166. select {
  167. case err := <-downstreamChannel:
  168. return errors.Wrap(err, "failed to read input from container")
  169. case err := <-upstreamChannel:
  170. return errors.Wrap(err, "failed to send input to container")
  171. }
  172. }
  173. }
  174. func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string) (string, error) {
  175. containerClient := getContainerClient(aciContext.SubscriptionID)
  176. logs, err := containerClient.ListLogs(ctx, aciContext.ResourceGroup, containerGroupName, containerName, nil)
  177. if err != nil {
  178. return "", fmt.Errorf("cannot get container logs: %v", err)
  179. }
  180. return *logs.Content, err
  181. }
  182. func getContainerGroupsClient(subscriptionID string) containerinstance.ContainerGroupsClient {
  183. auth, _ := auth.NewAuthorizerFromCLI()
  184. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  185. containerGroupsClient.Authorizer = auth
  186. return containerGroupsClient
  187. }
  188. func getContainerClient(subscriptionID string) containerinstance.ContainerClient {
  189. auth, _ := auth.NewAuthorizerFromCLI()
  190. containerClient := containerinstance.NewContainerClient(subscriptionID)
  191. containerClient.Authorizer = auth
  192. return containerClient
  193. }