| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 | from __future__ import absolute_importfrom __future__ import unicode_literalsimport loggingfrom docker.errors import NotFoundfrom docker.types import IPAMConfigfrom docker.types import IPAMPoolfrom docker.utils import version_gtefrom docker.utils import version_ltfrom .config import ConfigurationErrorfrom .const import LABEL_NETWORKfrom .const import LABEL_PROJECTlog = logging.getLogger(__name__)OPTS_EXCEPTIONS = [    'com.docker.network.driver.overlay.vxlanid_list',]class Network(object):    def __init__(self, client, project, name, driver=None, driver_opts=None,                 ipam=None, external_name=None, internal=False, enable_ipv6=False,                 labels=None):        self.client = client        self.project = project        self.name = name        self.driver = driver        self.driver_opts = driver_opts        self.ipam = create_ipam_config_from_dict(ipam)        self.external_name = external_name        self.internal = internal        self.enable_ipv6 = enable_ipv6        self.labels = labels    def ensure(self):        if self.external_name:            try:                self.inspect()                log.debug(                    'Network {0} declared as external. No new '                    'network will be created.'.format(self.name)                )            except NotFound:                raise ConfigurationError(                    'Network {name} declared as external, but could'                    ' not be found. Please create the network manually'                    ' using `{command} {name}` and try again.'.format(                        name=self.external_name,                        command='docker network create'                    )                )            return        try:            data = self.inspect()            check_remote_network_config(data, self)        except NotFound:            driver_name = 'the default driver'            if self.driver:                driver_name = 'driver "{}"'.format(self.driver)            log.info(                'Creating network "{}" with {}'                .format(self.full_name, driver_name)            )            self.client.create_network(                name=self.full_name,                driver=self.driver,                options=self.driver_opts,                ipam=self.ipam,                internal=self.internal,                enable_ipv6=self.enable_ipv6,                labels=self._labels,                attachable=version_gte(self.client._version, '1.24') or None,            )    def remove(self):        if self.external_name:            log.info("Network %s is external, skipping", self.full_name)            return        log.info("Removing network {}".format(self.full_name))        self.client.remove_network(self.full_name)    def inspect(self):        return self.client.inspect_network(self.full_name)    @property    def full_name(self):        if self.external_name:            return self.external_name        return '{0}_{1}'.format(self.project, self.name)    @property    def _labels(self):        if version_lt(self.client._version, '1.23'):            return None        labels = self.labels.copy() if self.labels else {}        labels.update({            LABEL_PROJECT: self.project,            LABEL_NETWORK: self.name,        })        return labelsdef create_ipam_config_from_dict(ipam_dict):    if not ipam_dict:        return None    return IPAMConfig(        driver=ipam_dict.get('driver'),        pool_configs=[            IPAMPool(                subnet=config.get('subnet'),                iprange=config.get('ip_range'),                gateway=config.get('gateway'),                aux_addresses=config.get('aux_addresses'),            )            for config in ipam_dict.get('config', [])        ],        options=ipam_dict.get('options')    )class NetworkConfigChangedError(ConfigurationError):    def __init__(self, net_name, property_name):        super(NetworkConfigChangedError, self).__init__(            'Network "{}" needs to be recreated - {} has changed'.format(                net_name, property_name            )        )def check_remote_ipam_config(remote, local):    remote_ipam = remote.get('IPAM')    ipam_dict = create_ipam_config_from_dict(local.ipam)    if local.ipam.get('driver') and local.ipam.get('driver') != remote_ipam.get('Driver'):        raise NetworkConfigChangedError(local.full_name, 'IPAM driver')    if len(ipam_dict['Config']) != 0:        if len(ipam_dict['Config']) != len(remote_ipam['Config']):            raise NetworkConfigChangedError(local.full_name, 'IPAM configs')        remote_configs = sorted(remote_ipam['Config'], key='Subnet')        local_configs = sorted(ipam_dict['Config'], key='Subnet')        while local_configs:            lc = local_configs.pop()            rc = remote_configs.pop()            if lc.get('Subnet') != rc.get('Subnet'):                raise NetworkConfigChangedError(local.full_name, 'IPAM config subnet')            if lc.get('Gateway') is not None and lc.get('Gateway') != rc.get('Gateway'):                raise NetworkConfigChangedError(local.full_name, 'IPAM config gateway')            if lc.get('IPRange') != rc.get('IPRange'):                raise NetworkConfigChangedError(local.full_name, 'IPAM config ip_range')            if sorted(lc.get('AuxiliaryAddresses')) != sorted(rc.get('AuxiliaryAddresses')):                raise NetworkConfigChangedError(local.full_name, 'IPAM config aux_addresses')    remote_opts = remote_ipam.get('Options', {})    local_opts = local.ipam.get('options', {})    for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):        if remote_opts.get(k) != local_opts.get(k):            raise NetworkConfigChangedError(local.full_name, 'IPAM option "{}"'.format(k))def check_remote_network_config(remote, local):    if local.driver and remote.get('Driver') != local.driver:        raise NetworkConfigChangedError(local.full_name, 'driver')    local_opts = local.driver_opts or {}    remote_opts = remote.get('Options') or {}    for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):        if k in OPTS_EXCEPTIONS:            continue        if remote_opts.get(k) != local_opts.get(k):            raise NetworkConfigChangedError(local.full_name, 'option "{}"'.format(k))    if local.ipam is not None:        check_remote_ipam_config(remote, local)    if local.internal is not None and local.internal != remote.get('Internal', False):        raise NetworkConfigChangedError(local.full_name, 'internal')    if local.enable_ipv6 is not None and local.enable_ipv6 != remote.get('EnableIPv6', False):        raise NetworkConfigChangedError(local.full_name, 'enable_ipv6')    local_labels = local.labels or {}    remote_labels = remote.get('Labels', {})    for k in set.union(set(remote_labels.keys()), set(local_labels.keys())):        if k.startswith('com.docker.compose.'):  # We are only interested in user-specified labels            continue        if remote_labels.get(k) != local_labels.get(k):            raise NetworkConfigChangedError(local.full_name, 'label "{}"'.format(k))def build_networks(name, config_data, client):    network_config = config_data.networks or {}    networks = {        network_name: Network(            client=client, project=name, name=network_name,            driver=data.get('driver'),            driver_opts=data.get('driver_opts'),            ipam=data.get('ipam'),            external_name=data.get('external_name'),            internal=data.get('internal'),            enable_ipv6=data.get('enable_ipv6'),            labels=data.get('labels'),        )        for network_name, data in network_config.items()    }    if 'default' not in networks:        networks['default'] = Network(client, name, 'default')    return networksclass ProjectNetworks(object):    def __init__(self, networks, use_networking):        self.networks = networks or {}        self.use_networking = use_networking    @classmethod    def from_services(cls, services, networks, use_networking):        service_networks = {            network: networks.get(network)            for service in services            for network in get_network_names_for_service(service)        }        unused = set(networks) - set(service_networks) - {'default'}        if unused:            log.warn(                "Some networks were defined but are not used by any service: "                "{}".format(", ".join(unused)))        return cls(service_networks, use_networking)    def remove(self):        if not self.use_networking:            return        for network in self.networks.values():            try:                network.remove()            except NotFound:                log.warn("Network %s not found.", network.full_name)    def initialize(self):        if not self.use_networking:            return        for network in self.networks.values():            network.ensure()def get_network_defs_for_service(service_dict):    if 'network_mode' in service_dict:        return {}    networks = service_dict.get('networks', {'default': None})    return dict(        (net, (config or {}))        for net, config in networks.items()    )def get_network_names_for_service(service_dict):    return get_network_defs_for_service(service_dict).keys()def get_networks(service_dict, network_definitions):    networks = {}    for name, netdef in get_network_defs_for_service(service_dict).items():        network = network_definitions.get(name)        if network:            networks[network.full_name] = netdef        else:            raise ConfigurationError(                'Service "{}" uses an undefined network "{}"'                .format(service_dict['name'], name))    return networks
 |