| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 | 
							- """
 
- Types for objects parsed from the configuration.
 
- """
 
- import json
 
- import ntpath
 
- import os
 
- import re
 
- from collections import namedtuple
 
- from docker.utils.ports import build_port_bindings
 
- from ..const import COMPOSEFILE_V1 as V1
 
- from ..utils import unquote_path
 
- from .errors import ConfigurationError
 
- from compose.const import IS_WINDOWS_PLATFORM
 
- from compose.utils import splitdrive
 
- win32_root_path_pattern = re.compile(r'^[A-Za-z]\:\\.*')
 
- class VolumeFromSpec(namedtuple('_VolumeFromSpec', 'source mode type')):
 
-     # TODO: drop service_names arg when v1 is removed
 
-     @classmethod
 
-     def parse(cls, volume_from_config, service_names, version):
 
-         func = cls.parse_v1 if version == V1 else cls.parse_v2
 
-         return func(service_names, volume_from_config)
 
-     @classmethod
 
-     def parse_v1(cls, service_names, volume_from_config):
 
-         parts = volume_from_config.split(':')
 
-         if len(parts) > 2:
 
-             raise ConfigurationError(
 
-                 "volume_from {} has incorrect format, should be "
 
-                 "service[:mode]".format(volume_from_config))
 
-         if len(parts) == 1:
 
-             source = parts[0]
 
-             mode = 'rw'
 
-         else:
 
-             source, mode = parts
 
-         type = 'service' if source in service_names else 'container'
 
-         return cls(source, mode, type)
 
-     @classmethod
 
-     def parse_v2(cls, service_names, volume_from_config):
 
-         parts = volume_from_config.split(':')
 
-         if len(parts) > 3:
 
-             raise ConfigurationError(
 
-                 "volume_from {} has incorrect format, should be one of "
 
-                 "'<service name>[:<mode>]' or "
 
-                 "'container:<container name>[:<mode>]'".format(volume_from_config))
 
-         if len(parts) == 1:
 
-             source = parts[0]
 
-             return cls(source, 'rw', 'service')
 
-         if len(parts) == 2:
 
-             if parts[0] == 'container':
 
-                 type, source = parts
 
-                 return cls(source, 'rw', type)
 
-             source, mode = parts
 
-             return cls(source, mode, 'service')
 
-         if len(parts) == 3:
 
-             type, source, mode = parts
 
-             if type not in ('service', 'container'):
 
-                 raise ConfigurationError(
 
-                     "Unknown volumes_from type '{}' in '{}'".format(
 
-                         type,
 
-                         volume_from_config))
 
-         return cls(source, mode, type)
 
-     def repr(self):
 
-         return '{v.type}:{v.source}:{v.mode}'.format(v=self)
 
- def parse_restart_spec(restart_config):
 
-     if not restart_config:
 
-         return None
 
-     parts = restart_config.split(':')
 
-     if len(parts) > 2:
 
-         raise ConfigurationError(
 
-             "Restart %s has incorrect format, should be "
 
-             "mode[:max_retry]" % restart_config)
 
-     if len(parts) == 2:
 
-         name, max_retry_count = parts
 
-     else:
 
-         name, = parts
 
-         max_retry_count = 0
 
-     return {'Name': name, 'MaximumRetryCount': int(max_retry_count)}
 
- def serialize_restart_spec(restart_spec):
 
-     if not restart_spec:
 
-         return ''
 
-     parts = [restart_spec['Name']]
 
-     if restart_spec['MaximumRetryCount']:
 
-         parts.append(str(restart_spec['MaximumRetryCount']))
 
-     return ':'.join(parts)
 
- def parse_extra_hosts(extra_hosts_config):
 
-     if not extra_hosts_config:
 
-         return {}
 
-     if isinstance(extra_hosts_config, dict):
 
-         return dict(extra_hosts_config)
 
-     if isinstance(extra_hosts_config, list):
 
-         extra_hosts_dict = {}
 
-         for extra_hosts_line in extra_hosts_config:
 
-             # TODO: validate string contains ':' ?
 
-             host, ip = extra_hosts_line.split(':', 1)
 
-             extra_hosts_dict[host.strip()] = ip.strip()
 
-         return extra_hosts_dict
 
- def normalize_path_for_engine(path):
 
-     """Windows paths, c:\\my\\path\\shiny, need to be changed to be compatible with
 
-     the Engine. Volume paths are expected to be linux style /c/my/path/shiny/
 
-     """
 
-     drive, tail = splitdrive(path)
 
-     if drive:
 
-         path = '/' + drive.lower().rstrip(':') + tail
 
-     return path.replace('\\', '/')
 
- def normpath(path, win_host=False):
 
-     """ Custom path normalizer that handles Compose-specific edge cases like
 
-         UNIX paths on Windows hosts and vice-versa. """
 
-     sysnorm = ntpath.normpath if win_host else os.path.normpath
 
-     # If a path looks like a UNIX absolute path on Windows, it probably is;
 
-     # we'll need to revert the backslashes to forward slashes after normalization
 
-     flip_slashes = path.startswith('/') and IS_WINDOWS_PLATFORM
 
-     path = sysnorm(path)
 
-     if flip_slashes:
 
-         path = path.replace('\\', '/')
 
-     return path
 
- class MountSpec:
 
-     options_map = {
 
-         'volume': {
 
-             'nocopy': 'no_copy'
 
-         },
 
-         'bind': {
 
-             'propagation': 'propagation'
 
-         },
 
-         'tmpfs': {
 
-             'size': 'tmpfs_size'
 
-         }
 
-     }
 
-     _fields = ['type', 'source', 'target', 'read_only', 'consistency']
 
-     @classmethod
 
-     def parse(cls, mount_dict, normalize=False, win_host=False):
 
-         if mount_dict.get('source'):
 
-             if mount_dict['type'] == 'tmpfs':
 
-                 raise ConfigurationError('tmpfs mounts can not specify a source')
 
-             mount_dict['source'] = normpath(mount_dict['source'], win_host)
 
-             if normalize:
 
-                 mount_dict['source'] = normalize_path_for_engine(mount_dict['source'])
 
-         return cls(**mount_dict)
 
-     def __init__(self, type, source=None, target=None, read_only=None, consistency=None, **kwargs):
 
-         self.type = type
 
-         self.source = source
 
-         self.target = target
 
-         self.read_only = read_only
 
-         self.consistency = consistency
 
-         self.options = None
 
-         if self.type in kwargs:
 
-             self.options = kwargs[self.type]
 
-     def as_volume_spec(self):
 
-         mode = 'ro' if self.read_only else 'rw'
 
-         return VolumeSpec(external=self.source, internal=self.target, mode=mode)
 
-     def legacy_repr(self):
 
-         return self.as_volume_spec().repr()
 
-     def repr(self):
 
-         res = {}
 
-         for field in self._fields:
 
-             if getattr(self, field, None):
 
-                 res[field] = getattr(self, field)
 
-         if self.options:
 
-             res[self.type] = self.options
 
-         return res
 
-     @property
 
-     def is_named_volume(self):
 
-         return self.type == 'volume' and self.source
 
-     @property
 
-     def is_tmpfs(self):
 
-         return self.type == 'tmpfs'
 
-     @property
 
-     def external(self):
 
-         return self.source
 
- class VolumeSpec(namedtuple('_VolumeSpec', 'external internal mode')):
 
-     win32 = False
 
-     @classmethod
 
-     def _parse_unix(cls, volume_config):
 
-         parts = volume_config.split(':')
 
-         if len(parts) > 3:
 
-             raise ConfigurationError(
 
-                 "Volume %s has incorrect format, should be "
 
-                 "external:internal[:mode]" % volume_config)
 
-         if len(parts) == 1:
 
-             external = None
 
-             internal = os.path.normpath(parts[0])
 
-         else:
 
-             external = os.path.normpath(parts[0])
 
-             internal = os.path.normpath(parts[1])
 
-         mode = 'rw'
 
-         if len(parts) == 3:
 
-             mode = parts[2]
 
-         return cls(external, internal, mode)
 
-     @classmethod
 
-     def _parse_win32(cls, volume_config, normalize):
 
-         # relative paths in windows expand to include the drive, eg C:\
 
-         # so we join the first 2 parts back together to count as one
 
-         mode = 'rw'
 
-         def separate_next_section(volume_config):
 
-             drive, tail = splitdrive(volume_config)
 
-             parts = tail.split(':', 1)
 
-             if drive:
 
-                 parts[0] = drive + parts[0]
 
-             return parts
 
-         parts = separate_next_section(volume_config)
 
-         if len(parts) == 1:
 
-             internal = parts[0]
 
-             external = None
 
-         else:
 
-             external = parts[0]
 
-             parts = separate_next_section(parts[1])
 
-             external = normpath(external, True)
 
-             internal = parts[0]
 
-             if len(parts) > 1:
 
-                 if ':' in parts[1]:
 
-                     raise ConfigurationError(
 
-                         "Volume %s has incorrect format, should be "
 
-                         "external:internal[:mode]" % volume_config
 
-                     )
 
-                 mode = parts[1]
 
-         if normalize:
 
-             external = normalize_path_for_engine(external) if external else None
 
-         result = cls(external, internal, mode)
 
-         result.win32 = True
 
-         return result
 
-     @classmethod
 
-     def parse(cls, volume_config, normalize=False, win_host=False):
 
-         """Parse a volume_config path and split it into external:internal[:mode]
 
-         parts to be returned as a valid VolumeSpec.
 
-         """
 
-         if IS_WINDOWS_PLATFORM or win_host:
 
-             return cls._parse_win32(volume_config, normalize)
 
-         else:
 
-             return cls._parse_unix(volume_config)
 
-     def repr(self):
 
-         external = self.external + ':' if self.external else ''
 
-         mode = ':' + self.mode if self.external else ''
 
-         return '{ext}{v.internal}{mode}'.format(mode=mode, ext=external, v=self)
 
-     @property
 
-     def is_named_volume(self):
 
-         res = self.external and not self.external.startswith(('.', '/', '~'))
 
-         if not self.win32:
 
-             return res
 
-         return (
 
-             res and not self.external.startswith('\\') and
 
-             not win32_root_path_pattern.match(self.external)
 
-         )
 
- class ServiceLink(namedtuple('_ServiceLink', 'target alias')):
 
-     @classmethod
 
-     def parse(cls, link_spec):
 
-         target, _, alias = link_spec.partition(':')
 
-         if not alias:
 
-             alias = target
 
-         return cls(target, alias)
 
-     def repr(self):
 
-         if self.target == self.alias:
 
-             return self.target
 
-         return '{s.target}:{s.alias}'.format(s=self)
 
-     @property
 
-     def merge_field(self):
 
-         return self.alias
 
- class ServiceConfigBase(namedtuple('_ServiceConfigBase', 'source target uid gid mode name')):
 
-     @classmethod
 
-     def parse(cls, spec):
 
-         if isinstance(spec, str):
 
-             return cls(spec, None, None, None, None, None)
 
-         return cls(
 
-             spec.get('source'),
 
-             spec.get('target'),
 
-             spec.get('uid'),
 
-             spec.get('gid'),
 
-             spec.get('mode'),
 
-             spec.get('name')
 
-         )
 
-     @property
 
-     def merge_field(self):
 
-         return self.source
 
-     def repr(self):
 
-         return {
 
-             k: v for k, v in zip(self._fields, self) if v is not None
 
-         }
 
- class ServiceSecret(ServiceConfigBase):
 
-     pass
 
- class ServiceConfig(ServiceConfigBase):
 
-     pass
 
- class ServicePort(namedtuple('_ServicePort', 'target published protocol mode external_ip')):
 
-     def __new__(cls, target, published, *args, **kwargs):
 
-         try:
 
-             if target:
 
-                 target = int(target)
 
-         except ValueError:
 
-             raise ConfigurationError('Invalid target port: {}'.format(target))
 
-         if published:
 
-             if isinstance(published, str) and '-' in published:  # "x-y:z" format
 
-                 a, b = published.split('-', 1)
 
-                 if not a.isdigit() or not b.isdigit():
 
-                     raise ConfigurationError('Invalid published port: {}'.format(published))
 
-             else:
 
-                 try:
 
-                     published = int(published)
 
-                 except ValueError:
 
-                     raise ConfigurationError('Invalid published port: {}'.format(published))
 
-         return super().__new__(
 
-             cls, target, published, *args, **kwargs
 
-         )
 
-     @classmethod
 
-     def parse(cls, spec):
 
-         if isinstance(spec, cls):
 
-             # When extending a service with ports, the port definitions have already been parsed
 
-             return [spec]
 
-         if not isinstance(spec, dict):
 
-             result = []
 
-             try:
 
-                 for k, v in build_port_bindings([spec]).items():
 
-                     if '/' in k:
 
-                         target, proto = k.split('/', 1)
 
-                     else:
 
-                         target, proto = (k, None)
 
-                     for pub in v:
 
-                         if pub is None:
 
-                             result.append(
 
-                                 cls(target, None, proto, None, None)
 
-                             )
 
-                         elif isinstance(pub, tuple):
 
-                             result.append(
 
-                                 cls(target, pub[1], proto, None, pub[0])
 
-                             )
 
-                         else:
 
-                             result.append(
 
-                                 cls(target, pub, proto, None, None)
 
-                             )
 
-             except ValueError as e:
 
-                 raise ConfigurationError(str(e))
 
-             return result
 
-         return [cls(
 
-             spec.get('target'),
 
-             spec.get('published'),
 
-             spec.get('protocol'),
 
-             spec.get('mode'),
 
-             None
 
-         )]
 
-     @property
 
-     def merge_field(self):
 
-         return (self.target, self.published, self.external_ip, self.protocol)
 
-     def repr(self):
 
-         return {
 
-             k: v for k, v in zip(self._fields, self) if v is not None
 
-         }
 
-     def legacy_repr(self):
 
-         return normalize_port_dict(self.repr())
 
- class GenericResource(namedtuple('_GenericResource', 'kind value')):
 
-     @classmethod
 
-     def parse(cls, dct):
 
-         if 'discrete_resource_spec' not in dct:
 
-             raise ConfigurationError(
 
-                 'generic_resource entry must include a discrete_resource_spec key'
 
-             )
 
-         if 'kind' not in dct['discrete_resource_spec']:
 
-             raise ConfigurationError(
 
-                 'generic_resource entry must include a discrete_resource_spec.kind subkey'
 
-             )
 
-         return cls(
 
-             dct['discrete_resource_spec']['kind'],
 
-             dct['discrete_resource_spec'].get('value')
 
-         )
 
-     def repr(self):
 
-         return {
 
-             'discrete_resource_spec': {
 
-                 'kind': self.kind,
 
-                 'value': self.value,
 
-             }
 
-         }
 
-     @property
 
-     def merge_field(self):
 
-         return self.kind
 
- def normalize_port_dict(port):
 
-     return '{external_ip}{has_ext_ip}{published}{is_pub}{target}/{protocol}'.format(
 
-         published=port.get('published', ''),
 
-         is_pub=(':' if port.get('published') is not None or port.get('external_ip') else ''),
 
-         target=port.get('target'),
 
-         protocol=port.get('protocol', 'tcp'),
 
-         external_ip=port.get('external_ip', ''),
 
-         has_ext_ip=(':' if port.get('external_ip') else ''),
 
-     )
 
- class SecurityOpt(namedtuple('_SecurityOpt', 'value src_file')):
 
-     @classmethod
 
-     def parse(cls, value):
 
-         if not isinstance(value, str):
 
-             return value
 
-         # based on https://github.com/docker/cli/blob/9de1b162f/cli/command/container/opts.go#L673-L697
 
-         con = value.split('=', 2)
 
-         if len(con) == 1 and con[0] != 'no-new-privileges':
 
-             if ':' not in value:
 
-                 raise ConfigurationError('Invalid security_opt: {}'.format(value))
 
-             con = value.split(':', 2)
 
-         if con[0] == 'seccomp' and con[1] != 'unconfined':
 
-             try:
 
-                 with open(unquote_path(con[1])) as f:
 
-                     seccomp_data = json.load(f)
 
-             except (OSError, ValueError) as e:
 
-                 raise ConfigurationError('Error reading seccomp profile: {}'.format(e))
 
-             return cls(
 
-                 'seccomp={}'.format(json.dumps(seccomp_data)), con[1]
 
-             )
 
-         return cls(value, None)
 
-     def repr(self):
 
-         if self.src_file is not None:
 
-             return 'seccomp:{}'.format(self.src_file)
 
-         return self.value
 
-     @property
 
-     def merge_field(self):
 
-         return self.value
 
 
  |