aci.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package azure
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/base64"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "os/signal"
  13. "runtime"
  14. "strings"
  15. "github.com/compose-spec/compose-go/types"
  16. "github.com/docker/api/compose"
  17. "github.com/docker/api/context/store"
  18. "github.com/sirupsen/logrus"
  19. "github.com/gobwas/ws"
  20. "github.com/gobwas/ws/wsutil"
  21. "github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
  22. "github.com/Azure/azure-sdk-for-go/services/keyvault/auth"
  23. "github.com/Azure/go-autorest/autorest"
  24. "github.com/Azure/go-autorest/autorest/to"
  25. tm "github.com/buger/goterm"
  26. )
  27. const (
  28. AzureFileDriverName = "azure_file"
  29. VolumeDriveroptsShareNameKey = "share_name"
  30. VolumeDriveroptsAccountNameKey = "storage_account_name"
  31. VolumeDriveroptsAccountKeyKey = "storage_account_key"
  32. )
  33. const singleContainerName = "single--container--aci"
  34. func CreateACIContainers(ctx context.Context, project compose.Project, aciContext store.AciContext) (c containerinstance.ContainerGroup, err error) {
  35. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  36. if err != nil {
  37. return c, fmt.Errorf("cannot get container group client: %v", err)
  38. }
  39. groupDefinition, err := convert(project, aciContext)
  40. if err != nil {
  41. return c, err
  42. }
  43. // Check if the container group already exists
  44. _, err = containerGroupsClient.Get(ctx, aciContext.ResourceGroup, *groupDefinition.Name)
  45. if err != nil {
  46. if err, ok := err.(autorest.DetailedError); ok {
  47. if err.StatusCode != http.StatusNotFound {
  48. return c, err
  49. }
  50. } else {
  51. return c, err
  52. }
  53. } else {
  54. return c, fmt.Errorf("Container group %q already exists", *groupDefinition.Name)
  55. }
  56. future, err := containerGroupsClient.CreateOrUpdate(
  57. ctx,
  58. aciContext.ResourceGroup,
  59. *groupDefinition.Name,
  60. groupDefinition,
  61. )
  62. if err != nil {
  63. return c, err
  64. }
  65. err = future.WaitForCompletionRef(ctx, containerGroupsClient.Client)
  66. if err != nil {
  67. return c, err
  68. }
  69. containerGroup, err := future.Result(containerGroupsClient)
  70. if err != nil {
  71. return c, err
  72. }
  73. if len(project.Services) > 1 {
  74. var commands []string
  75. for _, service := range project.Services {
  76. commands = append(commands, fmt.Sprintf("echo 127.0.0.1 %s >> /etc/hosts", service.Name))
  77. }
  78. commands = append(commands, "exit")
  79. response, err := ExecACIContainer(ctx, "/bin/sh", project.Name, project.Services[0].Name, aciContext)
  80. if err != nil {
  81. return c, err
  82. }
  83. err = ExecWebSocketLoopWithCmd(
  84. ctx,
  85. *response.WebSocketURI,
  86. *response.Password,
  87. commands,
  88. false)
  89. if err != nil {
  90. return containerinstance.ContainerGroup{}, err
  91. }
  92. }
  93. return containerGroup, err
  94. }
  95. type ProjectAciHelper compose.Project
  96. func (p ProjectAciHelper) getAciSecretVolumes() ([]containerinstance.Volume, error) {
  97. var secretVolumes []containerinstance.Volume
  98. for secretName, filepathToRead := range p.Secrets {
  99. var data []byte
  100. if strings.HasPrefix(filepathToRead.File, compose.SecretInlineMark) {
  101. data = []byte(filepathToRead.File[len(compose.SecretInlineMark):])
  102. } else {
  103. var err error
  104. data, err = ioutil.ReadFile(filepathToRead.File)
  105. if err != nil {
  106. return secretVolumes, err
  107. }
  108. }
  109. if len(data) == 0 {
  110. continue
  111. }
  112. dataStr := base64.StdEncoding.EncodeToString(data)
  113. secretVolumes = append(secretVolumes, containerinstance.Volume{
  114. Name: to.StringPtr(secretName),
  115. Secret: map[string]*string{
  116. secretName: &dataStr,
  117. },
  118. })
  119. }
  120. return secretVolumes, nil
  121. }
  122. func (p ProjectAciHelper) getAciFileVolumes() (map[string]bool, []containerinstance.Volume, error) {
  123. azureFileVolumesMap := make(map[string]bool, len(p.Volumes))
  124. var azureFileVolumesSlice []containerinstance.Volume
  125. for name, v := range p.Volumes {
  126. if v.Driver == AzureFileDriverName {
  127. shareName, ok := v.DriverOpts[VolumeDriveroptsShareNameKey]
  128. if !ok {
  129. return nil, nil, fmt.Errorf("cannot retrieve share name for Azurefile")
  130. }
  131. accountName, ok := v.DriverOpts[VolumeDriveroptsAccountNameKey]
  132. if !ok {
  133. return nil, nil, fmt.Errorf("cannot retrieve account name for Azurefile")
  134. }
  135. accountKey, ok := v.DriverOpts[VolumeDriveroptsAccountKeyKey]
  136. if !ok {
  137. return nil, nil, fmt.Errorf("cannot retrieve account key for Azurefile")
  138. }
  139. aciVolume := containerinstance.Volume{
  140. Name: to.StringPtr(name),
  141. AzureFile: &containerinstance.AzureFileVolume{
  142. ShareName: to.StringPtr(shareName),
  143. StorageAccountName: to.StringPtr(accountName),
  144. StorageAccountKey: to.StringPtr(accountKey),
  145. },
  146. }
  147. azureFileVolumesMap[name] = true
  148. azureFileVolumesSlice = append(azureFileVolumesSlice, aciVolume)
  149. }
  150. }
  151. return azureFileVolumesMap, azureFileVolumesSlice, nil
  152. }
  153. type ServiceConfigAciHelper types.ServiceConfig
  154. func (s ServiceConfigAciHelper) getAciFileVolumeMounts(volumesCache map[string]bool) ([]containerinstance.VolumeMount, error) {
  155. var aciServiceVolumes []containerinstance.VolumeMount
  156. for _, sv := range s.Volumes {
  157. if !volumesCache[sv.Source] {
  158. return []containerinstance.VolumeMount{}, fmt.Errorf("could not find volume source %q", sv.Source)
  159. }
  160. aciServiceVolumes = append(aciServiceVolumes, containerinstance.VolumeMount{
  161. Name: to.StringPtr(sv.Source),
  162. MountPath: to.StringPtr(sv.Target),
  163. })
  164. }
  165. return aciServiceVolumes, nil
  166. }
  167. func (s ServiceConfigAciHelper) getAciSecretsVolumeMounts() []containerinstance.VolumeMount {
  168. var secretVolumeMounts []containerinstance.VolumeMount
  169. for _, secret := range s.Secrets {
  170. secretsMountPath := "/run/secrets"
  171. if secret.Target == "" {
  172. secret.Target = secret.Source
  173. }
  174. // Specifically use "/" here and not filepath.Join() to avoid windows path being sent and used inside containers
  175. secretsMountPath = secretsMountPath + "/" + secret.Target
  176. vmName := strings.Split(secret.Source, "=")[0]
  177. vm := containerinstance.VolumeMount{
  178. Name: to.StringPtr(vmName),
  179. MountPath: to.StringPtr(secretsMountPath),
  180. ReadOnly: to.BoolPtr(true), // TODO Confirm if the secrets are read only
  181. }
  182. secretVolumeMounts = append(secretVolumeMounts, vm)
  183. }
  184. return secretVolumeMounts
  185. }
  186. func (s ServiceConfigAciHelper) getAciContainer(volumesCache map[string]bool) (containerinstance.Container, error) {
  187. secretVolumeMounts := s.getAciSecretsVolumeMounts()
  188. aciServiceVolumes, err := s.getAciFileVolumeMounts(volumesCache)
  189. if err != nil {
  190. return containerinstance.Container{}, err
  191. }
  192. allVolumes := append(aciServiceVolumes, secretVolumeMounts...)
  193. var volumes *[]containerinstance.VolumeMount
  194. if len(allVolumes) == 0 {
  195. volumes = nil
  196. } else {
  197. volumes = &allVolumes
  198. }
  199. return containerinstance.Container{
  200. Name: to.StringPtr(s.Name),
  201. ContainerProperties: &containerinstance.ContainerProperties{
  202. Image: to.StringPtr(s.Image),
  203. Resources: &containerinstance.ResourceRequirements{
  204. Limits: &containerinstance.ResourceLimits{
  205. MemoryInGB: to.Float64Ptr(1),
  206. CPU: to.Float64Ptr(1),
  207. },
  208. Requests: &containerinstance.ResourceRequests{
  209. MemoryInGB: to.Float64Ptr(1),
  210. CPU: to.Float64Ptr(1),
  211. },
  212. },
  213. VolumeMounts: volumes,
  214. },
  215. }, nil
  216. }
  217. // ListACIContainers List available containers
  218. func ListACIContainers(aciContext store.AciContext) (c []containerinstance.ContainerGroup, err error) {
  219. ctx := context.TODO()
  220. containerGroupsClient, err := getContainerGroupsClient(aciContext.SubscriptionID)
  221. if err != nil {
  222. return c, fmt.Errorf("cannot get container group client: %v", err)
  223. }
  224. var containers []containerinstance.ContainerGroup
  225. result, err := containerGroupsClient.ListByResourceGroup(ctx, aciContext.ResourceGroup)
  226. if err != nil {
  227. return []containerinstance.ContainerGroup{}, err
  228. }
  229. for result.NotDone() {
  230. containers = append(containers, result.Values()...)
  231. if err := result.NextWithContext(ctx); err != nil {
  232. return []containerinstance.ContainerGroup{}, err
  233. }
  234. }
  235. return containers, err
  236. }
  237. func ExecACIContainer(ctx context.Context, command, containerGroup string, containerName string, aciContext store.AciContext) (c containerinstance.ContainerExecResponse, err error) {
  238. containerClient := getContainerClient(aciContext.SubscriptionID)
  239. rows, cols := getTermSize()
  240. containerExecRequest := containerinstance.ContainerExecRequest{
  241. Command: to.StringPtr(command),
  242. TerminalSize: &containerinstance.ContainerExecRequestTerminalSize{
  243. Rows: rows,
  244. Cols: cols,
  245. },
  246. }
  247. return containerClient.ExecuteCommand(
  248. ctx,
  249. aciContext.ResourceGroup,
  250. containerGroup,
  251. containerName,
  252. containerExecRequest)
  253. }
  254. func getTermSize() (*int32, *int32) {
  255. rows := tm.Height()
  256. cols := tm.Width()
  257. return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
  258. }
  259. func ExecWebSocketLoop(ctx context.Context, wsURL, passwd string) error {
  260. return ExecWebSocketLoopWithCmd(ctx, wsURL, passwd, []string{}, true)
  261. }
  262. func ExecWebSocketLoopWithCmd(ctx context.Context, wsURL, passwd string, commands []string, outputEnabled bool) error {
  263. ctx, cancel := context.WithCancel(ctx)
  264. conn, _, _, err := ws.DefaultDialer.Dial(ctx, wsURL)
  265. if err != nil {
  266. cancel()
  267. return err
  268. }
  269. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(passwd))
  270. if err != nil {
  271. cancel()
  272. return err
  273. }
  274. lastCommandLen := 0
  275. done := make(chan struct{})
  276. go func() {
  277. defer close(done)
  278. for {
  279. msg, _, err := wsutil.ReadServerData(conn)
  280. if err != nil {
  281. if err != io.EOF {
  282. fmt.Printf("read error: %s\n", err)
  283. }
  284. return
  285. }
  286. lines := strings.Split(string(msg), "\n")
  287. lastCommandLen = len(lines[len(lines)-1])
  288. if outputEnabled {
  289. fmt.Printf("%s", msg)
  290. }
  291. }
  292. }()
  293. interrupt := make(chan os.Signal, 1)
  294. signal.Notify(interrupt, os.Interrupt)
  295. scanner := bufio.NewScanner(os.Stdin)
  296. rc := make(chan string, 10)
  297. if len(commands) > 0 {
  298. for _, command := range commands {
  299. rc <- command
  300. }
  301. }
  302. go func() {
  303. for {
  304. if !scanner.Scan() {
  305. close(done)
  306. cancel()
  307. fmt.Println("exiting...")
  308. break
  309. }
  310. t := scanner.Text()
  311. rc <- t
  312. cleanLastCommand(lastCommandLen)
  313. }
  314. }()
  315. for {
  316. select {
  317. case <-done:
  318. return nil
  319. case line := <-rc:
  320. err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(line+"\n"))
  321. if err != nil {
  322. fmt.Println("write: ", err)
  323. return nil
  324. }
  325. case <-interrupt:
  326. fmt.Println("interrupted...")
  327. close(done)
  328. cancel()
  329. return nil
  330. }
  331. }
  332. }
  333. func convert(p compose.Project, aciContext store.AciContext) (containerinstance.ContainerGroup, error) {
  334. project := ProjectAciHelper(p)
  335. containerGroupName := strings.ToLower(project.Name)
  336. volumesCache, volumesSlice, err := project.getAciFileVolumes()
  337. if err != nil {
  338. return containerinstance.ContainerGroup{}, err
  339. }
  340. secretVolumes, err := project.getAciSecretVolumes()
  341. if err != nil {
  342. return containerinstance.ContainerGroup{}, err
  343. }
  344. allVolumes := append(volumesSlice, secretVolumes...)
  345. var volumes *[]containerinstance.Volume
  346. if len(allVolumes) == 0 {
  347. volumes = nil
  348. } else {
  349. volumes = &allVolumes
  350. }
  351. var containers []containerinstance.Container
  352. groupDefinition := containerinstance.ContainerGroup{
  353. Name: &containerGroupName,
  354. Location: &aciContext.Location,
  355. ContainerGroupProperties: &containerinstance.ContainerGroupProperties{
  356. OsType: containerinstance.Linux,
  357. Containers: &containers,
  358. Volumes: volumes,
  359. },
  360. }
  361. for _, s := range project.Services {
  362. service := ServiceConfigAciHelper(s)
  363. if s.Name != singleContainerName {
  364. logrus.Debugf("Adding %q\n", service.Name)
  365. }
  366. containerDefinition, err := service.getAciContainer(volumesCache)
  367. if err != nil {
  368. return containerinstance.ContainerGroup{}, err
  369. }
  370. if service.Ports != nil {
  371. var containerPorts []containerinstance.ContainerPort
  372. var groupPorts []containerinstance.Port
  373. for _, portConfig := range service.Ports {
  374. if portConfig.Published != 0 && portConfig.Published != portConfig.Target {
  375. msg := fmt.Sprintf("Port mapping is not supported with ACI, cannot map port %d to %d for container %s",
  376. portConfig.Published, portConfig.Target, service.Name)
  377. return groupDefinition, errors.New(msg)
  378. }
  379. portNumber := int32(portConfig.Target)
  380. containerPorts = append(containerPorts, containerinstance.ContainerPort{
  381. Port: to.Int32Ptr(portNumber),
  382. })
  383. groupPorts = append(groupPorts, containerinstance.Port{
  384. Port: to.Int32Ptr(portNumber),
  385. Protocol: containerinstance.TCP,
  386. })
  387. }
  388. containerDefinition.ContainerProperties.Ports = &containerPorts
  389. groupDefinition.ContainerGroupProperties.IPAddress = &containerinstance.IPAddress{
  390. Type: containerinstance.Public,
  391. Ports: &groupPorts,
  392. }
  393. }
  394. containers = append(containers, containerDefinition)
  395. }
  396. groupDefinition.ContainerGroupProperties.Containers = &containers
  397. return groupDefinition, nil
  398. }
  399. func cleanLastCommand(lastCommandLen int) {
  400. tm.MoveCursorUp(1)
  401. tm.MoveCursorForward(lastCommandLen)
  402. if runtime.GOOS != "windows" {
  403. for i := 0; i < tm.Width(); i++ {
  404. _, _ = tm.Print(" ")
  405. }
  406. tm.MoveCursorUp(1)
  407. }
  408. tm.Flush()
  409. }
  410. func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
  411. auth, _ := auth.NewAuthorizerFromCLI()
  412. containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
  413. containerGroupsClient.Authorizer = auth
  414. return containerGroupsClient, nil
  415. }
  416. func getContainerClient(subscriptionID string) containerinstance.ContainerClient {
  417. auth, _ := auth.NewAuthorizerFromCLI()
  418. containerClient := containerinstance.NewContainerClient(subscriptionID)
  419. containerClient.Authorizer = auth
  420. return containerClient
  421. }