sdk.go 27 KB


  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package ecs
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "time"
  20. "github.com/docker/compose-cli/api/compose"
  21. "github.com/docker/compose-cli/api/secrets"
  22. "github.com/docker/compose-cli/errdefs"
  23. "github.com/docker/compose-cli/internal"
  24. "github.com/aws/aws-sdk-go/aws"
  25. "github.com/aws/aws-sdk-go/aws/arn"
  26. "github.com/aws/aws-sdk-go/aws/request"
  27. "github.com/aws/aws-sdk-go/aws/session"
  28. "github.com/aws/aws-sdk-go/service/autoscaling"
  29. "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
  30. "github.com/aws/aws-sdk-go/service/cloudformation"
  31. "github.com/aws/aws-sdk-go/service/cloudformation/cloudformationiface"
  32. "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
  33. "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
  34. "github.com/aws/aws-sdk-go/service/ec2"
  35. "github.com/aws/aws-sdk-go/service/ec2/ec2iface"
  36. "github.com/aws/aws-sdk-go/service/ecs"
  37. "github.com/aws/aws-sdk-go/service/ecs/ecsiface"
  38. "github.com/aws/aws-sdk-go/service/efs"
  39. "github.com/aws/aws-sdk-go/service/efs/efsiface"
  40. "github.com/aws/aws-sdk-go/service/elbv2"
  41. "github.com/aws/aws-sdk-go/service/elbv2/elbv2iface"
  42. "github.com/aws/aws-sdk-go/service/iam"
  43. "github.com/aws/aws-sdk-go/service/iam/iamiface"
  44. "github.com/aws/aws-sdk-go/service/secretsmanager"
  45. "github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface"
  46. "github.com/aws/aws-sdk-go/service/ssm"
  47. "github.com/aws/aws-sdk-go/service/ssm/ssmiface"
  48. "github.com/hashicorp/go-multierror"
  49. "github.com/pkg/errors"
  50. "github.com/sirupsen/logrus"
  51. )
  52. type sdk struct {
  53. ECS ecsiface.ECSAPI
  54. EC2 ec2iface.EC2API
  55. EFS efsiface.EFSAPI
  56. ELB elbv2iface.ELBV2API
  57. CW cloudwatchlogsiface.CloudWatchLogsAPI
  58. IAM iamiface.IAMAPI
  59. CF cloudformationiface.CloudFormationAPI
  60. SM secretsmanageriface.SecretsManagerAPI
  61. SSM ssmiface.SSMAPI
  62. AG autoscalingiface.AutoScalingAPI
  63. }
  64. // sdk implement API
  65. var _ API = sdk{}
  66. func newSDK(sess *session.Session) sdk {
  67. sess.Handlers.Build.PushBack(func(r *request.Request) {
  68. request.AddToUserAgent(r, internal.ECSUserAgentName+"/"+internal.Version)
  69. })
  70. return sdk{
  71. ECS: ecs.New(sess),
  72. EC2: ec2.New(sess),
  73. EFS: efs.New(sess),
  74. ELB: elbv2.New(sess),
  75. CW: cloudwatchlogs.New(sess),
  76. IAM: iam.New(sess),
  77. CF: cloudformation.New(sess),
  78. SM: secretsmanager.New(sess),
  79. SSM: ssm.New(sess),
  80. AG: autoscaling.New(sess),
  81. }
  82. }
  83. func (s sdk) CheckRequirements(ctx context.Context, region string) error {
  84. settings, err := s.ECS.ListAccountSettingsWithContext(ctx, &ecs.ListAccountSettingsInput{
  85. EffectiveSettings: aws.Bool(true),
  86. Name: aws.String("serviceLongArnFormat"),
  87. })
  88. if err != nil {
  89. return err
  90. }
  91. serviceLongArnFormat := settings.Settings[0].Value
  92. if *serviceLongArnFormat != "enabled" {
  93. return fmt.Errorf("this tool requires the \"new ARN resource ID format\".\n"+
  94. "Check https://%s.console.aws.amazon.com/ecs/home#/settings\n"+
  95. "Learn more: https://aws.amazon.com/blogs/compute/migrating-your-amazon-ecs-deployment-to-the-new-arn-and-resource-id-format-2", region)
  96. }
  97. return nil
  98. }
  99. func (s sdk) ResolveCluster(ctx context.Context, nameOrArn string) (awsResource, error) {
  100. logrus.Debug("CheckRequirements if cluster was already created: ", nameOrArn)
  101. clusters, err := s.ECS.DescribeClustersWithContext(ctx, &ecs.DescribeClustersInput{
  102. Clusters: []*string{aws.String(nameOrArn)},
  103. })
  104. if err != nil {
  105. return nil, err
  106. }
  107. if len(clusters.Clusters) == 0 {
  108. return nil, errors.Wrapf(errdefs.ErrNotFound, "cluster %q does not exist", nameOrArn)
  109. }
  110. it := clusters.Clusters[0]
  111. return existingAWSResource{
  112. arn: aws.StringValue(it.ClusterArn),
  113. id: aws.StringValue(it.ClusterName),
  114. }, nil
  115. }
  116. func (s sdk) CreateCluster(ctx context.Context, name string) (string, error) {
  117. logrus.Debug("Create cluster ", name)
  118. response, err := s.ECS.CreateClusterWithContext(ctx, &ecs.CreateClusterInput{ClusterName: aws.String(name)})
  119. if err != nil {
  120. return "", err
  121. }
  122. return *response.Cluster.Status, nil
  123. }
  124. func (s sdk) CheckVPC(ctx context.Context, vpcID string) error {
  125. logrus.Debug("CheckRequirements on VPC : ", vpcID)
  126. output, err := s.EC2.DescribeVpcAttributeWithContext(ctx, &ec2.DescribeVpcAttributeInput{
  127. VpcId: aws.String(vpcID),
  128. Attribute: aws.String("enableDnsSupport"),
  129. })
  130. if err != nil {
  131. return err
  132. }
  133. if !*output.EnableDnsSupport.Value {
  134. return fmt.Errorf("VPC %q doesn't have DNS resolution enabled", vpcID)
  135. }
  136. return nil
  137. }
  138. func (s sdk) GetDefaultVPC(ctx context.Context) (string, error) {
  139. logrus.Debug("Retrieve default VPC")
  140. vpcs, err := s.EC2.DescribeVpcsWithContext(ctx, &ec2.DescribeVpcsInput{
  141. Filters: []*ec2.Filter{
  142. {
  143. Name: aws.String("isDefault"),
  144. Values: []*string{aws.String("true")},
  145. },
  146. },
  147. })
  148. if err != nil {
  149. return "", err
  150. }
  151. if len(vpcs.Vpcs) == 0 {
  152. return "", fmt.Errorf("account has not default VPC")
  153. }
  154. return *vpcs.Vpcs[0].VpcId, nil
  155. }
  156. func (s sdk) GetSubNets(ctx context.Context, vpcID string) ([]awsResource, error) {
  157. logrus.Debug("Retrieve SubNets")
  158. subnets, err := s.EC2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{
  159. DryRun: nil,
  160. Filters: []*ec2.Filter{
  161. {
  162. Name: aws.String("vpc-id"),
  163. Values: []*string{aws.String(vpcID)},
  164. },
  165. },
  166. })
  167. if err != nil {
  168. return nil, err
  169. }
  170. ids := []awsResource{}
  171. for _, subnet := range subnets.Subnets {
  172. ids = append(ids, existingAWSResource{
  173. arn: aws.StringValue(subnet.SubnetArn),
  174. id: aws.StringValue(subnet.SubnetId),
  175. })
  176. }
  177. return ids, nil
  178. }
  179. func (s sdk) GetRoleArn(ctx context.Context, name string) (string, error) {
  180. role, err := s.IAM.GetRoleWithContext(ctx, &iam.GetRoleInput{
  181. RoleName: aws.String(name),
  182. })
  183. if err != nil {
  184. return "", err
  185. }
  186. return *role.Role.Arn, nil
  187. }
  188. func (s sdk) StackExists(ctx context.Context, name string) (bool, error) {
  189. stacks, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  190. StackName: aws.String(name),
  191. })
  192. if err != nil {
  193. if strings.HasPrefix(err.Error(), fmt.Sprintf("ValidationError: Stack with ID %s does not exist", name)) {
  194. return false, nil
  195. }
  196. return false, nil
  197. }
  198. return len(stacks.Stacks) > 0, nil
  199. }
  200. func (s sdk) CreateStack(ctx context.Context, name string, template []byte) error {
  201. logrus.Debug("Create CloudFormation stack")
  202. _, err := s.CF.CreateStackWithContext(ctx, &cloudformation.CreateStackInput{
  203. OnFailure: aws.String("DELETE"),
  204. StackName: aws.String(name),
  205. TemplateBody: aws.String(string(template)),
  206. TimeoutInMinutes: nil,
  207. Capabilities: []*string{
  208. aws.String(cloudformation.CapabilityCapabilityIam),
  209. },
  210. Tags: []*cloudformation.Tag{
  211. {
  212. Key: aws.String(compose.ProjectTag),
  213. Value: aws.String(name),
  214. },
  215. },
  216. })
  217. return err
  218. }
  219. func (s sdk) CreateChangeSet(ctx context.Context, name string, template []byte) (string, error) {
  220. logrus.Debug("Create CloudFormation Changeset")
  221. update := fmt.Sprintf("Update%s", time.Now().Format("2006-01-02-15-04-05"))
  222. changeset, err := s.CF.CreateChangeSetWithContext(ctx, &cloudformation.CreateChangeSetInput{
  223. ChangeSetName: aws.String(update),
  224. ChangeSetType: aws.String(cloudformation.ChangeSetTypeUpdate),
  225. StackName: aws.String(name),
  226. TemplateBody: aws.String(string(template)),
  227. Capabilities: []*string{
  228. aws.String(cloudformation.CapabilityCapabilityIam),
  229. },
  230. })
  231. if err != nil {
  232. return "", err
  233. }
  234. err = s.CF.WaitUntilChangeSetCreateCompleteWithContext(ctx, &cloudformation.DescribeChangeSetInput{
  235. ChangeSetName: changeset.Id,
  236. })
  237. return *changeset.Id, err
  238. }
  239. func (s sdk) UpdateStack(ctx context.Context, changeset string) error {
  240. desc, err := s.CF.DescribeChangeSetWithContext(ctx, &cloudformation.DescribeChangeSetInput{
  241. ChangeSetName: aws.String(changeset),
  242. })
  243. if err != nil {
  244. return err
  245. }
  246. if strings.HasPrefix(aws.StringValue(desc.StatusReason), "The submitted information didn't contain changes.") {
  247. return nil
  248. }
  249. _, err = s.CF.ExecuteChangeSet(&cloudformation.ExecuteChangeSetInput{
  250. ChangeSetName: aws.String(changeset),
  251. })
  252. return err
  253. }
  254. const (
  255. stackCreate = iota
  256. stackUpdate
  257. stackDelete
  258. )
  259. func (s sdk) WaitStackComplete(ctx context.Context, name string, operation int) error {
  260. input := &cloudformation.DescribeStacksInput{
  261. StackName: aws.String(name),
  262. }
  263. switch operation {
  264. case stackCreate:
  265. return s.CF.WaitUntilStackCreateCompleteWithContext(ctx, input)
  266. case stackDelete:
  267. return s.CF.WaitUntilStackDeleteCompleteWithContext(ctx, input)
  268. default:
  269. return fmt.Errorf("internal error: unexpected stack operation %d", operation)
  270. }
  271. }
  272. func (s sdk) GetStackID(ctx context.Context, name string) (string, error) {
  273. stacks, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  274. StackName: aws.String(name),
  275. })
  276. if err != nil {
  277. return "", err
  278. }
  279. return *stacks.Stacks[0].StackId, nil
  280. }
  281. func (s sdk) ListStacks(ctx context.Context, name string) ([]compose.Stack, error) {
  282. params := cloudformation.DescribeStacksInput{}
  283. if name != "" {
  284. params.StackName = &name
  285. }
  286. cfStacks, err := s.CF.DescribeStacksWithContext(ctx, &params)
  287. if err != nil {
  288. return nil, err
  289. }
  290. stacks := []compose.Stack{}
  291. for _, stack := range cfStacks.Stacks {
  292. for _, t := range stack.Tags {
  293. if *t.Key == compose.ProjectTag {
  294. status := compose.RUNNING
  295. switch aws.StringValue(stack.StackStatus) {
  296. case "CREATE_IN_PROGRESS":
  297. status = compose.STARTING
  298. case "DELETE_IN_PROGRESS":
  299. status = compose.REMOVING
  300. case "UPDATE_IN_PROGRESS":
  301. status = compose.UPDATING
  302. default:
  303. }
  304. stacks = append(stacks, compose.Stack{
  305. ID: aws.StringValue(stack.StackId),
  306. Name: aws.StringValue(stack.StackName),
  307. Status: status,
  308. })
  309. break
  310. }
  311. }
  312. }
  313. return stacks, nil
  314. }
  315. func (s sdk) GetStackClusterID(ctx context.Context, stack string) (string, error) {
  316. // Note: could use DescribeStackResource but we only can detect `does not exist` case by matching string error message
  317. resources, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  318. StackName: aws.String(stack),
  319. })
  320. if err != nil {
  321. return "", err
  322. }
  323. for _, r := range resources.StackResourceSummaries {
  324. if aws.StringValue(r.ResourceType) == "AWS::ECS::Cluster" {
  325. return aws.StringValue(r.PhysicalResourceId), nil
  326. }
  327. }
  328. // stack is using user-provided cluster
  329. res, err := s.CF.GetTemplateSummaryWithContext(ctx, &cloudformation.GetTemplateSummaryInput{
  330. StackName: aws.String(stack),
  331. })
  332. if err != nil {
  333. return "", err
  334. }
  335. c := aws.StringValue(res.Metadata)
  336. var m templateMetadata
  337. err = json.Unmarshal([]byte(c), &m)
  338. if err != nil {
  339. return "", err
  340. }
  341. if m.Cluster == "" {
  342. return "", errors.Wrap(errdefs.ErrNotFound, "CloudFormation is missing cluster metadata")
  343. }
  344. return m.Cluster, nil
  345. }
  346. type templateMetadata struct {
  347. Cluster string `json:",omitempty"`
  348. }
  349. func (s sdk) GetServiceTaskDefinition(ctx context.Context, cluster string, serviceArns []string) (map[string]string, error) {
  350. defs := map[string]string{}
  351. svc := []*string{}
  352. for _, s := range serviceArns {
  353. svc = append(svc, aws.String(s))
  354. }
  355. services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{
  356. Cluster: aws.String(cluster),
  357. Services: svc,
  358. })
  359. if err != nil {
  360. return nil, err
  361. }
  362. for _, s := range services.Services {
  363. defs[aws.StringValue(s.ServiceArn)] = aws.StringValue(s.TaskDefinition)
  364. }
  365. return defs, nil
  366. }
  367. func (s sdk) ListStackServices(ctx context.Context, stack string) ([]string, error) {
  368. arns := []string{}
  369. var nextToken *string
  370. for {
  371. response, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  372. StackName: aws.String(stack),
  373. NextToken: nextToken,
  374. })
  375. if err != nil {
  376. return nil, err
  377. }
  378. for _, r := range response.StackResourceSummaries {
  379. if aws.StringValue(r.ResourceType) == "AWS::ECS::Service" {
  380. if r.PhysicalResourceId != nil {
  381. arns = append(arns, aws.StringValue(r.PhysicalResourceId))
  382. }
  383. }
  384. }
  385. nextToken = response.NextToken
  386. if nextToken == nil {
  387. break
  388. }
  389. }
  390. return arns, nil
  391. }
  392. func (s sdk) GetServiceTasks(ctx context.Context, cluster string, service string, stopped bool) ([]*ecs.Task, error) {
  393. state := "RUNNING"
  394. if stopped {
  395. state = "STOPPED"
  396. }
  397. tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
  398. Cluster: aws.String(cluster),
  399. ServiceName: aws.String(service),
  400. DesiredStatus: aws.String(state),
  401. })
  402. if err != nil {
  403. return nil, err
  404. }
  405. if len(tasks.TaskArns) > 0 {
  406. taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
  407. Cluster: aws.String(cluster),
  408. Tasks: tasks.TaskArns,
  409. })
  410. if err != nil {
  411. return nil, err
  412. }
  413. return taskDescriptions.Tasks, nil
  414. }
  415. return nil, nil
  416. }
  417. func (s sdk) GetTaskStoppedReason(ctx context.Context, cluster string, taskArn string) (string, error) {
  418. taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
  419. Cluster: aws.String(cluster),
  420. Tasks: []*string{aws.String(taskArn)},
  421. })
  422. if err != nil {
  423. return "", err
  424. }
  425. if len(taskDescriptions.Tasks) == 0 {
  426. return "", nil
  427. }
  428. task := taskDescriptions.Tasks[0]
  429. return fmt.Sprintf(
  430. "%s: %s",
  431. aws.StringValue(task.StopCode),
  432. aws.StringValue(task.StoppedReason)), nil
  433. }
  434. func (s sdk) DescribeStackEvents(ctx context.Context, stackID string) ([]*cloudformation.StackEvent, error) {
  435. // Fixme implement Paginator on Events and return as a chan(events)
  436. events := []*cloudformation.StackEvent{}
  437. var nextToken *string
  438. for {
  439. resp, err := s.CF.DescribeStackEventsWithContext(ctx, &cloudformation.DescribeStackEventsInput{
  440. StackName: aws.String(stackID),
  441. NextToken: nextToken,
  442. })
  443. if err != nil {
  444. return nil, err
  445. }
  446. events = append(events, resp.StackEvents...)
  447. if resp.NextToken == nil {
  448. return events, nil
  449. }
  450. nextToken = resp.NextToken
  451. }
  452. }
  453. func (s sdk) ListStackParameters(ctx context.Context, name string) (map[string]string, error) {
  454. st, err := s.CF.DescribeStacksWithContext(ctx, &cloudformation.DescribeStacksInput{
  455. NextToken: nil,
  456. StackName: aws.String(name),
  457. })
  458. if err != nil {
  459. return nil, err
  460. }
  461. parameters := map[string]string{}
  462. for _, parameter := range st.Stacks[0].Parameters {
  463. parameters[aws.StringValue(parameter.ParameterKey)] = aws.StringValue(parameter.ParameterValue)
  464. }
  465. return parameters, nil
  466. }
  467. type stackResource struct {
  468. LogicalID string
  469. Type string
  470. ARN string
  471. Status string
  472. }
  473. type stackResourceFn func(r stackResource) error
  474. type stackResources []stackResource
  475. func (resources stackResources) apply(awsType string, fn stackResourceFn) error {
  476. var errs *multierror.Error
  477. for _, r := range resources {
  478. if r.Type == awsType {
  479. err := fn(r)
  480. if err != nil {
  481. errs = multierror.Append(err)
  482. }
  483. }
  484. }
  485. return errs.ErrorOrNil()
  486. }
  487. func (s sdk) ListStackResources(ctx context.Context, name string) (stackResources, error) {
  488. // FIXME handle pagination
  489. res, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
  490. StackName: aws.String(name),
  491. })
  492. if err != nil {
  493. return nil, err
  494. }
  495. resources := stackResources{}
  496. for _, r := range res.StackResourceSummaries {
  497. resources = append(resources, stackResource{
  498. LogicalID: aws.StringValue(r.LogicalResourceId),
  499. Type: aws.StringValue(r.ResourceType),
  500. ARN: aws.StringValue(r.PhysicalResourceId),
  501. Status: aws.StringValue(r.ResourceStatus),
  502. })
  503. }
  504. return resources, nil
  505. }
  506. func (s sdk) DeleteStack(ctx context.Context, name string) error {
  507. logrus.Debug("Delete CloudFormation stack")
  508. _, err := s.CF.DeleteStackWithContext(ctx, &cloudformation.DeleteStackInput{
  509. StackName: aws.String(name),
  510. })
  511. return err
  512. }
  513. func (s sdk) CreateSecret(ctx context.Context, secret secrets.Secret) (string, error) {
  514. logrus.Debug("Create secret " + secret.Name)
  515. secretStr, err := secret.GetCredString()
  516. if err != nil {
  517. return "", err
  518. }
  519. response, err := s.SM.CreateSecret(&secretsmanager.CreateSecretInput{
  520. Name: &secret.Name,
  521. SecretString: &secretStr,
  522. Description: &secret.Description,
  523. })
  524. if err != nil {
  525. return "", err
  526. }
  527. return aws.StringValue(response.ARN), nil
  528. }
  529. func (s sdk) InspectSecret(ctx context.Context, id string) (secrets.Secret, error) {
  530. logrus.Debug("Inspect secret " + id)
  531. response, err := s.SM.DescribeSecret(&secretsmanager.DescribeSecretInput{SecretId: &id})
  532. if err != nil {
  533. return secrets.Secret{}, err
  534. }
  535. labels := map[string]string{}
  536. for _, tag := range response.Tags {
  537. labels[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
  538. }
  539. secret := secrets.Secret{
  540. ID: aws.StringValue(response.ARN),
  541. Name: aws.StringValue(response.Name),
  542. Labels: labels,
  543. }
  544. if response.Description != nil {
  545. secret.Description = *response.Description
  546. }
  547. return secret, nil
  548. }
  549. func (s sdk) ListSecrets(ctx context.Context) ([]secrets.Secret, error) {
  550. logrus.Debug("List secrets ...")
  551. response, err := s.SM.ListSecrets(&secretsmanager.ListSecretsInput{})
  552. if err != nil {
  553. return nil, err
  554. }
  555. var ls []secrets.Secret
  556. for _, sec := range response.SecretList {
  557. labels := map[string]string{}
  558. for _, tag := range sec.Tags {
  559. labels[*tag.Key] = *tag.Value
  560. }
  561. description := ""
  562. if sec.Description != nil {
  563. description = *sec.Description
  564. }
  565. ls = append(ls, secrets.Secret{
  566. ID: *sec.ARN,
  567. Name: *sec.Name,
  568. Labels: labels,
  569. Description: description,
  570. })
  571. }
  572. return ls, nil
  573. }
  574. func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
  575. logrus.Debug("List secrets ...")
  576. force := !recover
  577. _, err := s.SM.DeleteSecret(&secretsmanager.DeleteSecretInput{SecretId: &id, ForceDeleteWithoutRecovery: &force})
  578. return err
  579. }
  580. func (s sdk) GetLogs(ctx context.Context, name string, consumer func(service, container, message string)) error {
  581. logGroup := fmt.Sprintf("/docker-compose/%s", name)
  582. var startTime int64
  583. for {
  584. select {
  585. case <-ctx.Done():
  586. return nil
  587. default:
  588. var hasMore = true
  589. var token *string
  590. for hasMore {
  591. events, err := s.CW.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{
  592. LogGroupName: aws.String(logGroup),
  593. NextToken: token,
  594. StartTime: aws.Int64(startTime),
  595. })
  596. if err != nil {
  597. return err
  598. }
  599. if events.NextToken == nil {
  600. hasMore = false
  601. } else {
  602. token = events.NextToken
  603. }
  604. for _, event := range events.Events {
  605. p := strings.Split(aws.StringValue(event.LogStreamName), "/")
  606. consumer(p[1], p[2], aws.StringValue(event.Message))
  607. startTime = *event.IngestionTime
  608. }
  609. }
  610. }
  611. time.Sleep(500 * time.Millisecond)
  612. }
  613. }
  614. func (s sdk) DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error) {
  615. services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{
  616. Cluster: aws.String(cluster),
  617. Services: []*string{aws.String(arn)},
  618. Include: aws.StringSlice([]string{"TAGS"}),
  619. })
  620. if err != nil {
  621. return compose.ServiceStatus{}, err
  622. }
  623. for _, f := range services.Failures {
  624. return compose.ServiceStatus{}, errors.Wrapf(errdefs.ErrNotFound, "can't get service status %s: %s", aws.StringValue(f.Detail), aws.StringValue(f.Reason))
  625. }
  626. service := services.Services[0]
  627. var name string
  628. for _, t := range service.Tags {
  629. if *t.Key == compose.ServiceTag {
  630. name = aws.StringValue(t.Value)
  631. }
  632. }
  633. if name == "" {
  634. return compose.ServiceStatus{}, fmt.Errorf("service %s doesn't have a %s tag", *service.ServiceArn, compose.ServiceTag)
  635. }
  636. targetGroupArns := []string{}
  637. for _, lb := range service.LoadBalancers {
  638. targetGroupArns = append(targetGroupArns, *lb.TargetGroupArn)
  639. }
  640. // getURLwithPortMapping makes 2 queries
  641. // one to get the target groups and another for load balancers
  642. loadBalancers, err := s.getURLWithPortMapping(ctx, targetGroupArns)
  643. if err != nil {
  644. return compose.ServiceStatus{}, err
  645. }
  646. return compose.ServiceStatus{
  647. ID: aws.StringValue(service.ServiceName),
  648. Name: name,
  649. Replicas: int(aws.Int64Value(service.RunningCount)),
  650. Desired: int(aws.Int64Value(service.DesiredCount)),
  651. Publishers: loadBalancers,
  652. }, nil
  653. }
  654. func (s sdk) getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error) {
  655. if len(targetGroupArns) == 0 {
  656. return nil, nil
  657. }
  658. groups, err := s.ELB.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
  659. TargetGroupArns: aws.StringSlice(targetGroupArns),
  660. })
  661. if err != nil {
  662. return nil, err
  663. }
  664. lbarns := []*string{}
  665. for _, tg := range groups.TargetGroups {
  666. lbarns = append(lbarns, tg.LoadBalancerArns...)
  667. }
  668. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  669. LoadBalancerArns: lbarns,
  670. })
  671. if err != nil {
  672. return nil, err
  673. }
  674. filterLB := func(arn *string, lbs []*elbv2.LoadBalancer) *elbv2.LoadBalancer {
  675. if aws.StringValue(arn) == "" {
  676. // load balancer arn is nil/""
  677. return nil
  678. }
  679. for _, lb := range lbs {
  680. if aws.StringValue(lb.LoadBalancerArn) == aws.StringValue(arn) {
  681. return lb
  682. }
  683. }
  684. return nil
  685. }
  686. loadBalancers := []compose.PortPublisher{}
  687. for _, tg := range groups.TargetGroups {
  688. for _, lbarn := range tg.LoadBalancerArns {
  689. lb := filterLB(lbarn, lbs.LoadBalancers)
  690. if lb == nil {
  691. continue
  692. }
  693. loadBalancers = append(loadBalancers, compose.PortPublisher{
  694. URL: aws.StringValue(lb.DNSName),
  695. TargetPort: int(aws.Int64Value(tg.Port)),
  696. PublishedPort: int(aws.Int64Value(tg.Port)),
  697. Protocol: aws.StringValue(tg.Protocol),
  698. })
  699. }
  700. }
  701. return loadBalancers, nil
  702. }
  703. func (s sdk) ListTasks(ctx context.Context, cluster string, family string) ([]string, error) {
  704. tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
  705. Cluster: aws.String(cluster),
  706. Family: aws.String(family),
  707. })
  708. if err != nil {
  709. return nil, err
  710. }
  711. arns := []string{}
  712. for _, arn := range tasks.TaskArns {
  713. arns = append(arns, *arn)
  714. }
  715. return arns, nil
  716. }
  717. func (s sdk) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string]string, error) {
  718. desc, err := s.EC2.DescribeNetworkInterfaces(&ec2.DescribeNetworkInterfacesInput{
  719. NetworkInterfaceIds: aws.StringSlice(interfaces),
  720. })
  721. if err != nil {
  722. return nil, err
  723. }
  724. publicIPs := map[string]string{}
  725. for _, interf := range desc.NetworkInterfaces {
  726. if interf.Association != nil {
  727. publicIPs[aws.StringValue(interf.NetworkInterfaceId)] = aws.StringValue(interf.Association.PublicIp)
  728. }
  729. }
  730. return publicIPs, nil
  731. }
  732. func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrarn string) (awsResource, string, error) {
  733. logrus.Debug("Check if LoadBalancer exists: ", nameOrarn)
  734. var arns []*string
  735. var names []*string
  736. if arn.IsARN(nameOrarn) {
  737. arns = append(arns, aws.String(nameOrarn))
  738. } else {
  739. names = append(names, aws.String(nameOrarn))
  740. }
  741. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  742. LoadBalancerArns: arns,
  743. Names: names,
  744. })
  745. if err != nil {
  746. return nil, "", err
  747. }
  748. if len(lbs.LoadBalancers) == 0 {
  749. return nil, "", errors.Wrapf(errdefs.ErrNotFound, "load balancer %q does not exist", nameOrarn)
  750. }
  751. it := lbs.LoadBalancers[0]
  752. return existingAWSResource{
  753. arn: aws.StringValue(it.LoadBalancerArn),
  754. id: aws.StringValue(it.LoadBalancerName),
  755. }, aws.StringValue(it.Type), nil
  756. }
  757. func (s sdk) GetLoadBalancerURL(ctx context.Context, arn string) (string, error) {
  758. logrus.Debug("Retrieve load balancer URL: ", arn)
  759. lbs, err := s.ELB.DescribeLoadBalancersWithContext(ctx, &elbv2.DescribeLoadBalancersInput{
  760. LoadBalancerArns: []*string{aws.String(arn)},
  761. })
  762. if err != nil {
  763. return "", err
  764. }
  765. dnsName := aws.StringValue(lbs.LoadBalancers[0].DNSName)
  766. if dnsName == "" {
  767. return "", fmt.Errorf("Load balancer %s doesn't have a dns name", aws.StringValue(lbs.LoadBalancers[0].LoadBalancerArn))
  768. }
  769. return dnsName, nil
  770. }
  771. func (s sdk) GetParameter(ctx context.Context, name string) (string, error) {
  772. parameter, err := s.SSM.GetParameterWithContext(ctx, &ssm.GetParameterInput{
  773. Name: aws.String(name),
  774. })
  775. if err != nil {
  776. return "", err
  777. }
  778. value := *parameter.Parameter.Value
  779. var ami struct {
  780. SchemaVersion int `json:"schema_version"`
  781. ImageName string `json:"image_name"`
  782. ImageID string `json:"image_id"`
  783. OS string `json:"os"`
  784. ECSRuntimeVersion string `json:"ecs_runtime_verion"`
  785. ECSAgentVersion string `json:"ecs_agent_version"`
  786. }
  787. err = json.Unmarshal([]byte(value), &ami)
  788. if err != nil {
  789. return "", err
  790. }
  791. return ami.ImageID, nil
  792. }
  793. func (s sdk) SecurityGroupExists(ctx context.Context, sg string) (bool, error) {
  794. desc, err := s.EC2.DescribeSecurityGroupsWithContext(ctx, &ec2.DescribeSecurityGroupsInput{
  795. GroupIds: aws.StringSlice([]string{sg}),
  796. })
  797. if err != nil {
  798. return false, err
  799. }
  800. return len(desc.SecurityGroups) > 0, nil
  801. }
  802. func (s sdk) DeleteCapacityProvider(ctx context.Context, arn string) error {
  803. _, err := s.ECS.DeleteCapacityProvider(&ecs.DeleteCapacityProviderInput{
  804. CapacityProvider: aws.String(arn),
  805. })
  806. return err
  807. }
  808. func (s sdk) DeleteAutoscalingGroup(ctx context.Context, arn string) error {
  809. _, err := s.AG.DeleteAutoScalingGroup(&autoscaling.DeleteAutoScalingGroupInput{
  810. AutoScalingGroupName: aws.String(arn),
  811. ForceDelete: aws.Bool(true),
  812. })
  813. return err
  814. }
  815. func (s sdk) ResolveFileSystem(ctx context.Context, id string) (awsResource, error) {
  816. desc, err := s.EFS.DescribeFileSystemsWithContext(ctx, &efs.DescribeFileSystemsInput{
  817. FileSystemId: aws.String(id),
  818. })
  819. if err != nil {
  820. return nil, err
  821. }
  822. if len(desc.FileSystems) == 0 {
  823. return nil, errors.Wrapf(errdefs.ErrNotFound, "EFS file system %q doesn't exist", id)
  824. }
  825. it := desc.FileSystems[0]
  826. return existingAWSResource{
  827. arn: aws.StringValue(it.FileSystemArn),
  828. id: aws.StringValue(it.FileSystemId),
  829. }, nil
  830. }
  831. func (s sdk) FindFileSystem(ctx context.Context, tags map[string]string) (awsResource, error) {
  832. var token *string
  833. for {
  834. desc, err := s.EFS.DescribeFileSystemsWithContext(ctx, &efs.DescribeFileSystemsInput{
  835. Marker: token,
  836. })
  837. if err != nil {
  838. return nil, err
  839. }
  840. for _, filesystem := range desc.FileSystems {
  841. if containsAll(filesystem.Tags, tags) {
  842. return existingAWSResource{
  843. arn: aws.StringValue(filesystem.FileSystemArn),
  844. id: aws.StringValue(filesystem.FileSystemId),
  845. }, nil
  846. }
  847. }
  848. if desc.NextMarker == token {
  849. return nil, nil
  850. }
  851. token = desc.NextMarker
  852. }
  853. }
  854. func containsAll(tags []*efs.Tag, required map[string]string) bool {
  855. TAGS:
  856. for key, value := range required {
  857. for _, t := range tags {
  858. if aws.StringValue(t.Key) == key && aws.StringValue(t.Value) == value {
  859. continue TAGS
  860. }
  861. }
  862. return false
  863. }
  864. return true
  865. }
  866. func (s sdk) CreateFileSystem(ctx context.Context, tags map[string]string) (string, error) {
  867. var efsTags []*efs.Tag
  868. for k, v := range tags {
  869. efsTags = append(efsTags, &efs.Tag{
  870. Key: aws.String(k),
  871. Value: aws.String(v),
  872. })
  873. }
  874. res, err := s.EFS.CreateFileSystemWithContext(ctx, &efs.CreateFileSystemInput{
  875. Encrypted: aws.Bool(true),
  876. Tags: efsTags,
  877. })
  878. if err != nil {
  879. return "", err
  880. }
  881. id := aws.StringValue(res.FileSystemId)
  882. logrus.Debugf("Created file system %q", id)
  883. return id, nil
  884. }
  885. func (s sdk) DeleteFileSystem(ctx context.Context, id string) error {
  886. _, err := s.EFS.DeleteFileSystemWithContext(ctx, &efs.DeleteFileSystemInput{
  887. FileSystemId: aws.String(id),
  888. })
  889. return err
  890. }