| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 | """Types for objects parsed from the configuration."""from __future__ import absolute_importfrom __future__ import unicode_literalsimport osimport refrom collections import namedtupleimport sixfrom docker.utils.ports import build_port_bindingsfrom ..const import COMPOSEFILE_V1 as V1from .errors import ConfigurationErrorfrom compose.const import IS_WINDOWS_PLATFORMfrom compose.utils import splitdrivewin32_root_path_pattern = re.compile(r'^[A-Za-z]\:\\.*')class VolumeFromSpec(namedtuple('_VolumeFromSpec', 'source mode type')):    # TODO: drop service_names arg when v1 is removed    @classmethod    def parse(cls, volume_from_config, service_names, version):        func = cls.parse_v1 if version == V1 else cls.parse_v2        return func(service_names, volume_from_config)    @classmethod    def parse_v1(cls, service_names, volume_from_config):        parts = volume_from_config.split(':')        if len(parts) > 2:            raise ConfigurationError(                "volume_from {} has incorrect format, should be "                "service[:mode]".format(volume_from_config))        if len(parts) == 1:            source = parts[0]            mode = 'rw'        else:            source, mode = parts        type = 'service' if source in service_names else 'container'        return cls(source, mode, type)    @classmethod    def parse_v2(cls, service_names, volume_from_config):        parts = volume_from_config.split(':')        if len(parts) > 3:            raise ConfigurationError(                "volume_from {} has incorrect format, should be one of "                "'<service name>[:<mode>]' or "                "'container:<container name>[:<mode>]'".format(volume_from_config))        if len(parts) == 1:            source = parts[0]            return cls(source, 'rw', 'service')        if len(parts) == 2:            if parts[0] == 'container':                type, source = parts                return cls(source, 'rw', type)            source, mode = parts            return cls(source, mode, 'service')        if len(parts) == 3:            type, source, mode = parts            if type not in ('service', 'container'):                raise ConfigurationError(                    "Unknown volumes_from type '{}' in '{}'".format(                        type,                        volume_from_config))        return cls(source, mode, type)    def repr(self):        return '{v.type}:{v.source}:{v.mode}'.format(v=self)def parse_restart_spec(restart_config):    if not restart_config:        return None    parts = restart_config.split(':')    if len(parts) > 2:        raise ConfigurationError(            "Restart %s has incorrect format, should be "            "mode[:max_retry]" % restart_config)    if len(parts) == 2:        name, max_retry_count = parts    else:        name, = parts        max_retry_count = 0    return {'Name': name, 'MaximumRetryCount': int(max_retry_count)}def serialize_restart_spec(restart_spec):    if not restart_spec:        return ''    parts = [restart_spec['Name']]    if restart_spec['MaximumRetryCount']:        parts.append(six.text_type(restart_spec['MaximumRetryCount']))    return ':'.join(parts)def parse_extra_hosts(extra_hosts_config):    if not extra_hosts_config:        return {}    if isinstance(extra_hosts_config, dict):        return dict(extra_hosts_config)    if isinstance(extra_hosts_config, list):        extra_hosts_dict = {}        for extra_hosts_line in extra_hosts_config:            # TODO: validate string contains ':' ?            host, ip = extra_hosts_line.split(':', 1)            extra_hosts_dict[host.strip()] = ip.strip()        return extra_hosts_dictdef normalize_path_for_engine(path):    """Windows paths, c:\my\path\shiny, need to be changed to be compatible with    the Engine. Volume paths are expected to be linux style /c/my/path/shiny/    """    drive, tail = splitdrive(path)    if drive:        path = '/' + drive.lower().rstrip(':') + tail    return path.replace('\\', '/')class MountSpec(object):    options_map = {        'volume': {            'nocopy': 'no_copy'        },        'bind': {            'propagation': 'propagation'        }    }    _fields = ['type', 'source', 'target', 'read_only', 'consistency']    def __init__(self, type, source=None, target=None, read_only=None, consistency=None, **kwargs):        self.type = type        self.source = source        self.target = target        self.read_only = read_only        self.consistency = consistency        self.options = None        if self.type in kwargs:            self.options = kwargs[self.type]    def as_volume_spec(self):        mode = 'ro' if self.read_only else 'rw'        return VolumeSpec(external=self.source, internal=self.target, mode=mode)    def legacy_repr(self):        return self.as_volume_spec().repr()    def repr(self):        res = {}        for field in self._fields:            if getattr(self, field, None):                res[field] = getattr(self, field)        if self.options:            res[self.type] = self.options        return res    @property    def is_named_volume(self):        return self.type == 'volume' and self.sourceclass VolumeSpec(namedtuple('_VolumeSpec', 'external internal mode')):    @classmethod    def _parse_unix(cls, volume_config):        parts = volume_config.split(':')        if len(parts) > 3:            raise ConfigurationError(                "Volume %s has incorrect format, should be "                "external:internal[:mode]" % volume_config)        if len(parts) == 1:            external = None            internal = os.path.normpath(parts[0])        else:            external = os.path.normpath(parts[0])            internal = os.path.normpath(parts[1])        mode = 'rw'        if len(parts) == 3:            mode = parts[2]        return cls(external, internal, mode)    @classmethod    def _parse_win32(cls, volume_config, normalize):        # relative paths in windows expand to include the drive, eg C:\        # so we join the first 2 parts back together to count as one        mode = 'rw'        def separate_next_section(volume_config):            drive, tail = splitdrive(volume_config)            parts = tail.split(':', 1)            if drive:                parts[0] = drive + parts[0]            return parts        parts = separate_next_section(volume_config)        if len(parts) == 1:            internal = parts[0]            external = None        else:            external = parts[0]            parts = separate_next_section(parts[1])            external = os.path.normpath(external)            internal = parts[0]            if len(parts) > 1:                if ':' in parts[1]:                    raise ConfigurationError(                        "Volume %s has incorrect format, should be "                        "external:internal[:mode]" % volume_config                    )                mode = parts[1]        if normalize:            external = normalize_path_for_engine(external) if external else None        return cls(external, internal, mode)    @classmethod    def parse(cls, volume_config, normalize=False):        """Parse a volume_config path and split it into external:internal[:mode]        parts to be returned as a valid VolumeSpec.        """        if IS_WINDOWS_PLATFORM:            return cls._parse_win32(volume_config, normalize)        else:            return cls._parse_unix(volume_config)    def repr(self):        external = self.external + ':' if self.external else ''        mode = ':' + self.mode if self.external else ''        return '{ext}{v.internal}{mode}'.format(mode=mode, ext=external, v=self)    @property    def is_named_volume(self):        res = self.external and not self.external.startswith(('.', '/', '~'))        if not IS_WINDOWS_PLATFORM:            return res        return (            res and not self.external.startswith('\\') and            not win32_root_path_pattern.match(self.external)        )class ServiceLink(namedtuple('_ServiceLink', 'target alias')):    @classmethod    def parse(cls, link_spec):        target, _, alias = link_spec.partition(':')        if not alias:            alias = target        return cls(target, alias)    def repr(self):        if self.target == self.alias:            return self.target        return '{s.target}:{s.alias}'.format(s=self)    @property    def merge_field(self):        return self.aliasclass ServiceConfigBase(namedtuple('_ServiceConfigBase', 'source target uid gid mode')):    @classmethod    def parse(cls, spec):        if isinstance(spec, six.string_types):            return cls(spec, None, None, None, None)        return cls(            spec.get('source'),            spec.get('target'),            spec.get('uid'),            spec.get('gid'),            spec.get('mode'),        )    @property    def merge_field(self):        return self.source    def repr(self):        return dict(            [(k, v) for k, v in zip(self._fields, self) if v is not None]        )class ServiceSecret(ServiceConfigBase):    passclass ServiceConfig(ServiceConfigBase):    passclass ServicePort(namedtuple('_ServicePort', 'target published protocol mode external_ip')):    def __new__(cls, target, published, *args, **kwargs):        try:            if target:                target = int(target)        except ValueError:            raise ConfigurationError('Invalid target port: {}'.format(target))        try:            if published:                published = int(published)        except ValueError:            raise ConfigurationError('Invalid published port: {}'.format(published))        return super(ServicePort, cls).__new__(            cls, target, published, *args, **kwargs        )    @classmethod    def parse(cls, spec):        if isinstance(spec, cls):            # When extending a service with ports, the port definitions have already been parsed            return [spec]        if not isinstance(spec, dict):            result = []            try:                for k, v in build_port_bindings([spec]).items():                    if '/' in k:                        target, proto = k.split('/', 1)                    else:                        target, proto = (k, None)                    for pub in v:                        if pub is None:                            result.append(                                cls(target, None, proto, None, None)                            )                        elif isinstance(pub, tuple):                            result.append(                                cls(target, pub[1], proto, None, pub[0])                            )                        else:                            result.append(                                cls(target, pub, proto, None, None)                            )            except ValueError as e:                raise ConfigurationError(str(e))            return result        return [cls(            spec.get('target'),            spec.get('published'),            spec.get('protocol'),            spec.get('mode'),            None        )]    @property    def merge_field(self):        return (self.target, self.published, self.external_ip, self.protocol)    def repr(self):        return dict(            [(k, v) for k, v in zip(self._fields, self) if v is not None]        )    def legacy_repr(self):        return normalize_port_dict(self.repr())def normalize_port_dict(port):    return '{external_ip}{has_ext_ip}{published}{is_pub}{target}/{protocol}'.format(        published=port.get('published', ''),        is_pub=(':' if port.get('published') is not None or port.get('external_ip') else ''),        target=port.get('target'),        protocol=port.get('protocol', 'tcp'),        external_ip=port.get('external_ip', ''),        has_ext_ip=(':' if port.get('external_ip') else ''),    )
 |