sdk.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ecs
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "github.com/aws/aws-sdk-go/service/ssm"
  21. "github.com/aws/aws-sdk-go/service/ssm/ssmiface"
  22. "github.com/docker/compose-cli/api/compose"
  23. "github.com/docker/compose-cli/api/secrets"
  24. "github.com/aws/aws-sdk-go/aws"
  25. "github.com/aws/aws-sdk-go/aws/request"
  26. "github.com/aws/aws-sdk-go/aws/session"
  27. "github.com/aws/aws-sdk-go/service/autoscaling"
  28. "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
  29. "github.com/aws/aws-sdk-go/service/cloudformation"
  30. "github.com/aws/aws-sdk-go/service/cloudformation/cloudformationiface"
  31. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  32. "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
  33. "github.com/aws/aws-sdk-go/service/ec2"
  34. "github.com/aws/aws-sdk-go/service/ec2/ec2iface"
  35. "github.com/aws/aws-sdk-go/service/ecs"
  36. "github.com/aws/aws-sdk-go/service/ecs/ecsiface"
  37. "github.com/aws/aws-sdk-go/service/efs"
  38. "github.com/aws/aws-sdk-go/service/efs/efsiface"
  39. "github.com/aws/aws-sdk-go/service/elbv2"
  40. "github.com/aws/aws-sdk-go/service/elbv2/elbv2iface"
  41. "github.com/aws/aws-sdk-go/service/iam"
  42. "github.com/aws/aws-sdk-go/service/iam/iamiface"
  43. "github.com/aws/aws-sdk-go/service/secretsmanager"
  44. "github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface"
  45. "github.com/hashicorp/go-multierror"
  46. "github.com/sirupsen/logrus"
  47. )
  48. type sdk struct {
  49. ECS ecsiface.ECSAPI
  50. EC2 ec2iface.EC2API
  51. EFS efsiface.EFSAPI
  52. ELB elbv2iface.ELBV2API
  53. CW cloudwatchlogsiface.CloudWatchLogsAPI
  54. IAM iamiface.IAMAPI
  55. CF cloudformationiface.CloudFormationAPI
  56. SM secretsmanageriface.SecretsManagerAPI
  57. SSM ssmiface.SSMAPI
  58. AG autoscalingiface.AutoScalingAPI
  59. }
  60. func newSDK(sess *session.Session) sdk {
  61. sess.Handlers.Build.PushBack(func(r *request.Request) {
  62. request.AddToUserAgent(r, "Docker CLI")
  63. })
  64. return sdk{
  65. ECS: ecs.New(sess),
  66. EC2: ec2.New(sess),
  67. EFS: efs.New(sess),
  68. ELB: elbv2.New(sess),
  69. CW: cloudwatchlogs.New(sess),
  70. IAM: iam.New(sess),
  71. CF: cloudformation.New(sess),
  72. SM: secretsmanager.New(sess),
  73. SSM: ssm.New(sess),
  74. AG: autoscaling.New(sess),
  75. }
  76. }
  77. func (s sdk) CheckRequirements(ctx context.Context, region string) error {
  78. settings, err := s.ECS.ListAccountSettingsWithContext(ctx, &ecs.ListAccountSettingsInput{
  79. EffectiveSettings: aws.Bool(true),
  80. Name: aws.String("serviceLongArnFormat"),
  81. })
  82. if err != nil {
  83. return err
  84. }
  85. serviceLongArnFormat := settings.Settings[0].Value
  86. if *serviceLongArnFormat != "enabled" {
  87. return fmt.Errorf("this tool requires the \"new ARN resource ID format\".\n"+
  88. "Check https://%s.console.aws.amazon.com/ecs/home#/settings\n"+
  89. "Learn more: https://aws.amazon.com/blogs/compute/migrating-your-amazon-ecs-deployment-to-the-new-arn-and-resource-id-format-2", region)
  90. }
  91. return nil
  92. }
  93. func (s sdk) ClusterExists(ctx context.Context, name string) (bool, error) {
  94. logrus.Debug("CheckRequirements if cluster was already created: ", name)
  95. clusters, err := s.ECS.DescribeClustersWithContext(ctx, &ecs.DescribeClustersInput{
  96. Clusters: []*string{aws.String(name)},
  97. })
  98. if err != nil {
  99. return false, err
  100. }
  101. return len(clusters.Clusters) > 0, nil
  102. }
  103. func (s sdk) CreateCluster(ctx context.Context, name string) (string, error) {
  104. logrus.Debug("Create cluster ", name)
  105. response, err := s.ECS.CreateClusterWithContext(ctx, &ecs.CreateClusterInput{ClusterName: aws.String(name)})
  106. if err != nil {
  107. return "", err
  108. }
  109. return *response.Cluster.Status, nil
  110. }
  111. func (s sdk) CheckVPC(ctx context.Context, vpcID string) error {
  112. logrus.Debug("CheckRequirements on VPC : ", vpcID)
  113. output, err := s.EC2.DescribeVpcAttributeWithContext(ctx, &ec2.DescribeVpcAttributeInput{
  114. VpcId: aws.String(vpcID),
  115. Attribute: aws.String("enableDnsSupport"),
  116. })
  117. if err != nil {
  118. return err
  119. }
  120. if !*output.EnableDnsSupport.Value {
  121. return fmt.Errorf("VPC %q doesn't have DNS resolution enabled", vpcID)
  122. }
  123. return err
  124. }
  125. func (s sdk) GetDefaultVPC(ctx context.Context) (string, error) {
  126. logrus.Debug("Retrieve default VPC")
  127. vpcs, err := s.EC2.DescribeVpcsWithContext(ctx, &ec2.DescribeVpcsInput{
  128. Filters: []*ec2.Filter{
  129. {
  130. Name: aws.String("isDefault"),
  131. Values: []*string{aws.String("true")},
  132. },
  133. },
  134. })
  135. if err != nil {
  136. return "", err
  137. }
  138. if len(vpcs.Vpcs) == 0 {
  139. return "", fmt.Errorf("account has not default VPC")
  140. }
  141. return *vpcs.Vpcs[0].VpcId, nil
  142. }
  143. func (s sdk) GetSubNets(ctx context.Context, vpcID string) ([]string, error) {
  144. logrus.Debug("Retrieve SubNets")
  145. subnets, err := s.EC2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{
  146. DryRun: nil,
  147. Filters: []*ec2.Filter{
  148. {
  149. Name: aws.String("vpc-id"),
  150. Values: []*string{aws.String(vpcID)},
  151. },
  152. },
  153. })
  154. if err != nil {
  155. return nil, err
  156. }
  157. ids := []string{}
  158. for _, subnet := range subnets.Subnets {
  159. ids = append(ids, *subnet.SubnetId)
  160. }
  161. return ids, nil
  162. }
  163. func (s sdk) GetRoleArn(ctx context.Context, name string) (string, error) {
  164. role, err := s.IAM.GetRoleWithContext(ctx, &iam.GetRoleInput{
  165. RoleName: aws.String(name),
  166. })
  167. if err != nil {
  168. return "", err
  169. }
  170. return *role.Role.Arn, nil
  171. }
  172. func (s sdk) StackExists(ctx context.Context, name string) (bool, error) {
  173. stacks, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  174. StackName: aws.String(name),
  175. })
  176. if err != nil {
  177. if strings.HasPrefix(err.Error(), fmt.Sprintf("ValidationError: Stack with ID %s does not exist", name)) {
  178. return false, nil
  179. }
  180. return false, nil
  181. }
  182. return len(stacks.Stacks) > 0, nil
  183. }
  184. func (s sdk) CreateStack(ctx context.Context, name string, template []byte) error {
  185. logrus.Debug("Create CloudFormation stack")
  186. _, err := s.CF.CreateStackWithContext(ctx, &cloudformation.CreateStackInput{
  187. OnFailure: aws.String("DELETE"),
  188. StackName: aws.String(name),
  189. TemplateBody: aws.String(string(template)),
  190. TimeoutInMinutes: nil,
  191. Capabilities: []*string{
  192. aws.String(cloudformation.CapabilityCapabilityIam),
  193. },
  194. Tags: []*cloudformation.Tag{
  195. {
  196. Key: aws.String(compose.ProjectTag),
  197. Value: aws.String(name),
  198. },
  199. },
  200. })
  201. return err
  202. }
  203. func (s sdk) CreateChangeSet(ctx context.Context, name string, template []byte) (string, error) {
  204. logrus.Debug("Create CloudFormation Changeset")
  205. update := fmt.Sprintf("Update%s", time.Now().Format("2006-01-02-15-04-05"))
  206. changeset, err := s.CF.CreateChangeSetWithContext(ctx, &cloudformation.CreateChangeSetInput{
  207. ChangeSetName: aws.String(update),
  208. ChangeSetType: aws.String(cloudformation.ChangeSetTypeUpdate),
  209. StackName: aws.String(name),
  210. TemplateBody: aws.String(string(template)),
  211. Capabilities: []*string{
  212. aws.String(cloudformation.CapabilityCapabilityIam),
  213. },
  214. })
  215. if err != nil {
  216. return "", err
  217. }
  218. err = s.CF.WaitUntilChangeSetCreateCompleteWithContext(ctx, &cloudformation.DescribeChangeSetInput{
  219. ChangeSetName: changeset.Id,
  220. })
  221. return *changeset.Id, err
  222. }
  223. func (s sdk) UpdateStack(ctx context.Context, changeset string) error {
  224. desc, err := s.CF.DescribeChangeSetWithContext(ctx, &cloudformation.DescribeChangeSetInput{
  225. ChangeSetName: aws.String(changeset),
  226. })
  227. if err != nil {
  228. return err
  229. }
  230. if strings.HasPrefix(aws.StringValue(desc.StatusReason), "The submitted information didn't contain changes.") {
  231. return nil
  232. }
  233. _, err = s.CF.ExecuteChangeSet(&cloudformation.ExecuteChangeSetInput{
  234. ChangeSetName: aws.String(changeset),
  235. })
  236. return err
  237. }
  238. const (
  239. stackCreate = iota
  240. stackUpdate
  241. stackDelete
  242. )
  243. func (s sdk) WaitStackComplete(ctx context.Context, name string, operation int) error {
  244. input := &cloudformation.DescribeStacksInput{
  245. StackName: aws.String(name),
  246. }
  247. switch operation {
  248. case stackCreate:
  249. return s.CF.WaitUntilStackCreateCompleteWithContext(ctx, input)
  250. case stackDelete:
  251. return s.CF.WaitUntilStackDeleteCompleteWithContext(ctx, input)
  252. default:
  253. return fmt.Errorf("internal error: unexpected stack operation %d", operation)
  254. }
  255. }
  256. func (s sdk) GetStackID(ctx context.Context, name string) (string, error) {
  257. stacks, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  258. StackName: aws.String(name),
  259. })
  260. if err != nil {
  261. return "", err
  262. }
  263. return *stacks.Stacks[0].StackId, nil
  264. }
  265. func (s sdk) ListStacks(ctx context.Context, name string) ([]compose.Stack, error) {
  266. params := cloudformation.DescribeStacksInput{}
  267. if name != "" {
  268. params.StackName = &name
  269. }
  270. cfStacks, err := s.CF.DescribeStacksWithContext(ctx, &params)
  271. if err != nil {
  272. return nil, err
  273. }
  274. stacks := []compose.Stack{}
  275. for _, stack := range cfStacks.Stacks {
  276. for _, t := range stack.Tags {
  277. if *t.Key == compose.ProjectTag {
  278. status := compose.RUNNING
  279. switch aws.StringValue(stack.StackStatus) {
  280. case "CREATE_IN_PROGRESS":
  281. status = compose.STARTING
  282. case "DELETE_IN_PROGRESS":
  283. status = compose.REMOVING
  284. case "UPDATE_IN_PROGRESS":
  285. status = compose.UPDATING
  286. default:
  287. }
  288. stacks = append(stacks, compose.Stack{
  289. ID: aws.StringValue(stack.StackId),
  290. Name: aws.StringValue(stack.StackName),
  291. Status: status,
  292. })
  293. break
  294. }
  295. }
  296. }
  297. return stacks, nil
  298. }
  299. func (s sdk) GetStackClusterID(ctx context.Context, stack string) (string, error) {
  300. resources, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  301. StackName: aws.String(stack),
  302. })
  303. if err != nil {
  304. return "", err
  305. }
  306. for _, r := range resources.StackResourceSummaries {
  307. if aws.StringValue(r.ResourceType) == "AWS::ECS::Cluster" {
  308. return aws.StringValue(r.PhysicalResourceId), nil
  309. }
  310. }
  311. return "", nil
  312. }
  313. func (s sdk) GetServiceTaskDefinition(ctx context.Context, cluster string, serviceArns []string) (map[string]string, error) {
  314. defs := map[string]string{}
  315. svc := []*string{}
  316. for _, s := range serviceArns {
  317. svc = append(svc, aws.String(s))
  318. }
  319. services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{
  320. Cluster: aws.String(cluster),
  321. Services: svc,
  322. })
  323. if err != nil {
  324. return nil, err
  325. }
  326. for _, s := range services.Services {
  327. defs[aws.StringValue(s.ServiceArn)] = aws.StringValue(s.TaskDefinition)
  328. }
  329. return defs, nil
  330. }
  331. func (s sdk) ListStackServices(ctx context.Context, stack string) ([]string, error) {
  332. arns := []string{}
  333. var nextToken *string
  334. for {
  335. response, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  336. StackName: aws.String(stack),
  337. NextToken: nextToken,
  338. })
  339. if err != nil {
  340. return nil, err
  341. }
  342. for _, r := range response.StackResourceSummaries {
  343. if aws.StringValue(r.ResourceType) == "AWS::ECS::Service" {
  344. if r.PhysicalResourceId != nil {
  345. arns = append(arns, aws.StringValue(r.PhysicalResourceId))
  346. }
  347. }
  348. }
  349. nextToken = response.NextToken
  350. if nextToken == nil {
  351. break
  352. }
  353. }
  354. return arns, nil
  355. }
  356. func (s sdk) GetServiceTasks(ctx context.Context, cluster string, service string, stopped bool) ([]*ecs.Task, error) {
  357. state := "RUNNING"
  358. if stopped {
  359. state = "STOPPED"
  360. }
  361. tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
  362. Cluster: aws.String(cluster),
  363. ServiceName: aws.String(service),
  364. DesiredStatus: aws.String(state),
  365. })
  366. if err != nil {
  367. return nil, err
  368. }
  369. if len(tasks.TaskArns) > 0 {
  370. taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
  371. Cluster: aws.String(cluster),
  372. Tasks: tasks.TaskArns,
  373. })
  374. if err != nil {
  375. return nil, err
  376. }
  377. return taskDescriptions.Tasks, nil
  378. }
  379. return nil, nil
  380. }
  381. func (s sdk) GetTaskStoppedReason(ctx context.Context, cluster string, taskArn string) (string, error) {
  382. taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
  383. Cluster: aws.String(cluster),
  384. Tasks: []*string{aws.String(taskArn)},
  385. })
  386. if err != nil {
  387. return "", err
  388. }
  389. if len(taskDescriptions.Tasks) == 0 {
  390. return "", nil
  391. }
  392. task := taskDescriptions.Tasks[0]
  393. return fmt.Sprintf(
  394. "%s: %s",
  395. aws.StringValue(task.StopCode),
  396. aws.StringValue(task.StoppedReason)), nil
  397. }
  398. func (s sdk) DescribeStackEvents(ctx context.Context, stackID string) ([]*cloudformation.StackEvent, error) {
  399. // Fixme implement Paginator on Events and return as a chan(events)
  400. events := []*cloudformation.StackEvent{}
  401. var nextToken *string
  402. for {
  403. resp, err := s.CF.DescribeStackEventsWithContext(ctx, &cloudformation.DescribeStackEventsInput{
  404. StackName: aws.String(stackID),
  405. NextToken: nextToken,
  406. })
  407. if err != nil {
  408. return nil, err
  409. }
  410. events = append(events, resp.StackEvents...)
  411. if resp.NextToken == nil {
  412. return events, nil
  413. }
  414. nextToken = resp.NextToken
  415. }
  416. }
  417. func (s sdk) ListStackParameters(ctx context.Context, name string) (map[string]string, error) {
  418. st, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  419. NextToken: nil,
  420. StackName: aws.String(name),
  421. })
  422. if err != nil {
  423. return nil, err
  424. }
  425. parameters := map[string]string{}
  426. for _, parameter := range st.Stacks[0].Parameters {
  427. parameters[aws.StringValue(parameter.ParameterKey)] = aws.StringValue(parameter.ParameterValue)
  428. }
  429. return parameters, nil
  430. }
  431. type stackResource struct {
  432. LogicalID string
  433. Type string
  434. ARN string
  435. Status string
  436. }
  437. type stackResourceFn func(r stackResource) error
  438. type stackResources []stackResource
  439. func (resources stackResources) apply(awsType string, fn stackResourceFn) error {
  440. var errs *multierror.Error
  441. for _, r := range resources {
  442. if r.Type == awsType {
  443. err := fn(r)
  444. if err != nil {
  445. errs = multierror.Append(err)
  446. }
  447. }
  448. }
  449. return errs.ErrorOrNil()
  450. }
  451. func (s sdk) ListStackResources(ctx context.Context, name string) (stackResources, error) {
  452. // FIXME handle pagination
  453. res, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  454. StackName: aws.String(name),
  455. })
  456. if err != nil {
  457. return nil, err
  458. }
  459. resources := stackResources{}
  460. for _, r := range res.StackResourceSummaries {
  461. resources = append(resources, stackResource{
  462. LogicalID: aws.StringValue(r.LogicalResourceId),
  463. Type: aws.StringValue(r.ResourceType),
  464. ARN: aws.StringValue(r.PhysicalResourceId),
  465. Status: aws.StringValue(r.ResourceStatus),
  466. })
  467. }
  468. return resources, nil
  469. }
  470. func (s sdk) DeleteStack(ctx context.Context, name string) error {
  471. logrus.Debug("Delete CloudFormation stack")
  472. _, err := s.CF.DeleteStackWithContext(ctx, &cloudformation.DeleteStackInput{
  473. StackName: aws.String(name),
  474. })
  475. return err
  476. }
  477. func (s sdk) CreateSecret(ctx context.Context, secret secrets.Secret) (string, error) {
  478. logrus.Debug("Create secret " + secret.Name)
  479. secretStr, err := secret.GetCredString()
  480. if err != nil {
  481. return "", err
  482. }
  483. response, err := s.SM.CreateSecret(&secretsmanager.CreateSecretInput{
  484. Name: &secret.Name,
  485. SecretString: &secretStr,
  486. Description: &secret.Description,
  487. })
  488. if err != nil {
  489. return "", err
  490. }
  491. return aws.StringValue(response.ARN), nil
  492. }
  493. func (s sdk) InspectSecret(ctx context.Context, id string) (secrets.Secret, error) {
  494. logrus.Debug("Inspect secret " + id)
  495. response, err := s.SM.DescribeSecret(&secretsmanager.DescribeSecretInput{SecretId: &id})
  496. if err != nil {
  497. return secrets.Secret{}, err
  498. }
  499. labels := map[string]string{}
  500. for _, tag := range response.Tags {
  501. labels[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
  502. }
  503. secret := secrets.Secret{
  504. ID: aws.StringValue(response.ARN),
  505. Name: aws.StringValue(response.Name),
  506. Labels: labels,
  507. }
  508. if response.Description != nil {
  509. secret.Description = *response.Description
  510. }
  511. return secret, nil
  512. }
  513. func (s sdk) ListSecrets(ctx context.Context) ([]secrets.Secret, error) {
  514. logrus.Debug("List secrets ...")
  515. response, err := s.SM.ListSecrets(&secretsmanager.ListSecretsInput{})
  516. if err != nil {
  517. return nil, err
  518. }
  519. var ls []secrets.Secret
  520. for _, sec := range response.SecretList {
  521. labels := map[string]string{}
  522. for _, tag := range sec.Tags {
  523. labels[*tag.Key] = *tag.Value
  524. }
  525. description := ""
  526. if sec.Description != nil {
  527. description = *sec.Description
  528. }
  529. ls = append(ls, secrets.Secret{
  530. ID: *sec.ARN,
  531. Name: *sec.Name,
  532. Labels: labels,
  533. Description: description,
  534. })
  535. }
  536. return ls, nil
  537. }
  538. func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
  539. logrus.Debug("List secrets ...")
  540. force := !recover
  541. _, err := s.SM.DeleteSecret(&secretsmanager.DeleteSecretInput{SecretId: &id, ForceDeleteWithoutRecovery: &force})
  542. return err
  543. }
  544. func (s sdk) GetLogs(ctx context.Context, name string, consumer func(service, container, message string)) error {
  545. logGroup := fmt.Sprintf("/docker-compose/%s", name)
  546. var startTime int64
  547. for {
  548. select {
  549. case <-ctx.Done():
  550. return nil
  551. default:
  552. var hasMore = true
  553. var token *string
  554. for hasMore {
  555. events, err := s.CW.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{
  556. LogGroupName: aws.String(logGroup),
  557. NextToken: token,
  558. StartTime: aws.Int64(startTime),
  559. })
  560. if err != nil {
  561. return err
  562. }
  563. if events.NextToken == nil {
  564. hasMore = false
  565. } else {
  566. token = events.NextToken
  567. }
  568. for _, event := range events.Events {
  569. p := strings.Split(aws.StringValue(event.LogStreamName), "/")
  570. consumer(p[1], p[2], aws.StringValue(event.Message))
  571. startTime = *event.IngestionTime
  572. }
  573. }
  574. }
  575. time.Sleep(500 * time.Millisecond)
  576. }
  577. }
  578. func (s sdk) DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error) {
  579. services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{
  580. Cluster: aws.String(cluster),
  581. Services: []*string{aws.String(arn)},
  582. Include: aws.StringSlice([]string{"TAGS"}),
  583. })
  584. if err != nil {
  585. return compose.ServiceStatus{}, err
  586. }
  587. service := services.Services[0]
  588. var name string
  589. for _, t := range service.Tags {
  590. if *t.Key == compose.ServiceTag {
  591. name = aws.StringValue(t.Value)
  592. }
  593. }
  594. if name == "" {
  595. return compose.ServiceStatus{}, fmt.Errorf("service %s doesn't have a %s tag", *service.ServiceArn, compose.ServiceTag)
  596. }
  597. targetGroupArns := []string{}
  598. for _, lb := range service.LoadBalancers {
  599. targetGroupArns = append(targetGroupArns, *lb.TargetGroupArn)
  600. }
  601. // getURLwithPortMapping makes 2 queries
  602. // one to get the target groups and another for load balancers
  603. loadBalancers, err := s.getURLWithPortMapping(ctx, targetGroupArns)
  604. if err != nil {
  605. return compose.ServiceStatus{}, err
  606. }
  607. return compose.ServiceStatus{
  608. ID: aws.StringValue(service.ServiceName),
  609. Name: name,
  610. Replicas: int(aws.Int64Value(service.RunningCount)),
  611. Desired: int(aws.Int64Value(service.DesiredCount)),
  612. Publishers: loadBalancers,
  613. }, nil
  614. }
  615. func (s sdk) getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error) {
  616. if len(targetGroupArns) == 0 {
  617. return nil, nil
  618. }
  619. groups, err := s.ELB.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
  620. TargetGroupArns: aws.StringSlice(targetGroupArns),
  621. })
  622. if err != nil {
  623. return nil, err
  624. }
  625. lbarns := []*string{}
  626. for _, tg := range groups.TargetGroups {
  627. lbarns = append(lbarns, tg.LoadBalancerArns...)
  628. }
  629. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  630. LoadBalancerArns: lbarns,
  631. })
  632. if err != nil {
  633. return nil, err
  634. }
  635. filterLB := func(arn *string, lbs []*elbv2.LoadBalancer) *elbv2.LoadBalancer {
  636. if aws.StringValue(arn) == "" {
  637. // load balancer arn is nil/""
  638. return nil
  639. }
  640. for _, lb := range lbs {
  641. if aws.StringValue(lb.LoadBalancerArn) == aws.StringValue(arn) {
  642. return lb
  643. }
  644. }
  645. return nil
  646. }
  647. loadBalancers := []compose.PortPublisher{}
  648. for _, tg := range groups.TargetGroups {
  649. for _, lbarn := range tg.LoadBalancerArns {
  650. lb := filterLB(lbarn, lbs.LoadBalancers)
  651. if lb == nil {
  652. continue
  653. }
  654. loadBalancers = append(loadBalancers, compose.PortPublisher{
  655. URL: aws.StringValue(lb.DNSName),
  656. TargetPort: int(aws.Int64Value(tg.Port)),
  657. PublishedPort: int(aws.Int64Value(tg.Port)),
  658. Protocol: aws.StringValue(tg.Protocol),
  659. })
  660. }
  661. }
  662. return loadBalancers, nil
  663. }
  664. func (s sdk) ListTasks(ctx context.Context, cluster string, family string) ([]string, error) {
  665. tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
  666. Cluster: aws.String(cluster),
  667. Family: aws.String(family),
  668. })
  669. if err != nil {
  670. return nil, err
  671. }
  672. arns := []string{}
  673. for _, arn := range tasks.TaskArns {
  674. arns = append(arns, *arn)
  675. }
  676. return arns, nil
  677. }
  678. func (s sdk) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string]string, error) {
  679. desc, err := s.EC2.DescribeNetworkInterfaces(&ec2.DescribeNetworkInterfacesInput{
  680. NetworkInterfaceIds: aws.StringSlice(interfaces),
  681. })
  682. if err != nil {
  683. return nil, err
  684. }
  685. publicIPs := map[string]string{}
  686. for _, interf := range desc.NetworkInterfaces {
  687. if interf.Association != nil {
  688. publicIPs[aws.StringValue(interf.NetworkInterfaceId)] = aws.StringValue(interf.Association.PublicIp)
  689. }
  690. }
  691. return publicIPs, nil
  692. }
  693. func (s sdk) LoadBalancerType(ctx context.Context, arn string) (string, error) {
  694. logrus.Debug("Check if LoadBalancer exists: ", arn)
  695. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  696. LoadBalancerArns: []*string{aws.String(arn)},
  697. })
  698. if err != nil {
  699. return "", err
  700. }
  701. if len(lbs.LoadBalancers) == 0 {
  702. return "", fmt.Errorf("load balancer does not exist: %s", arn)
  703. }
  704. return aws.StringValue(lbs.LoadBalancers[0].Type), nil
  705. }
  706. func (s sdk) GetLoadBalancerURL(ctx context.Context, arn string) (string, error) {
  707. logrus.Debug("Retrieve load balancer URL: ", arn)
  708. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  709. LoadBalancerArns: []*string{aws.String(arn)},
  710. })
  711. if err != nil {
  712. return "", err
  713. }
  714. dnsName := aws.StringValue(lbs.LoadBalancers[0].DNSName)
  715. if dnsName == "" {
  716. return "", fmt.Errorf("Load balancer %s doesn't have a dns name", aws.StringValue(lbs.LoadBalancers[0].LoadBalancerArn))
  717. }
  718. return dnsName, nil
  719. }
  720. func (s sdk) WithVolumeSecurityGroups(ctx context.Context, id string, fn func(securityGroups []string) error) error {
  721. mounts, err := s.EFS.DescribeMountTargetsWithContext(ctx, &efs.DescribeMountTargetsInput{
  722. FileSystemId: aws.String(id),
  723. })
  724. if err != nil {
  725. return err
  726. }
  727. for _, mount := range mounts.MountTargets {
  728. groups, err := s.EFS.DescribeMountTargetSecurityGroupsWithContext(ctx, &efs.DescribeMountTargetSecurityGroupsInput{
  729. MountTargetId: mount.MountTargetId,
  730. })
  731. if err != nil {
  732. return err
  733. }
  734. err = fn(aws.StringValueSlice(groups.SecurityGroups))
  735. if err != nil {
  736. return err
  737. }
  738. }
  739. return nil
  740. }
  741. func (s sdk) GetParameter(ctx context.Context, name string) (string, error) {
  742. parameter, err := s.SSM.GetParameterWithContext(ctx, &ssm.GetParameterInput{
  743. Name: aws.String(name),
  744. })
  745. if err != nil {
  746. return "", err
  747. }
  748. value := *parameter.Parameter.Value
  749. var ami struct {
  750. SchemaVersion int `json:"schema_version"`
  751. ImageName string `json:"image_name"`
  752. ImageID string `json:"image_id"`
  753. OS string `json:"os"`
  754. ECSRuntimeVersion string `json:"ecs_runtime_verion"`
  755. ECSAgentVersion string `json:"ecs_agent_version"`
  756. }
  757. err = json.Unmarshal([]byte(value), &ami)
  758. if err != nil {
  759. return "", err
  760. }
  761. return ami.ImageID, nil
  762. }
  763. func (s sdk) SecurityGroupExists(ctx context.Context, sg string) (bool, error) {
  764. desc, err := s.EC2.DescribeSecurityGroupsWithContext(ctx, &ec2.DescribeSecurityGroupsInput{
  765. GroupIds: aws.StringSlice([]string{sg}),
  766. })
  767. if err != nil {
  768. return false, err
  769. }
  770. return len(desc.SecurityGroups) > 0, nil
  771. }
  772. func (s sdk) DeleteCapacityProvider(ctx context.Context, arn string) error {
  773. _, err := s.ECS.DeleteCapacityProvider(&ecs.DeleteCapacityProviderInput{
  774. CapacityProvider: aws.String(arn),
  775. })
  776. return err
  777. }
  778. func (s sdk) DeleteAutoscalingGroup(ctx context.Context, arn string) error {
  779. _, err := s.AG.DeleteAutoScalingGroup(&autoscaling.DeleteAutoScalingGroupInput{
  780. AutoScalingGroupName: aws.String(arn),
  781. ForceDelete: aws.Bool(true),
  782. })
  783. return err
  784. }