| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 | 
							- import logging
 
- import re
 
- from collections import OrderedDict
 
- from docker.errors import NotFound
 
- from docker.types import IPAMConfig
 
- from docker.types import IPAMPool
 
- from docker.utils import version_gte
 
- from docker.utils import version_lt
 
- from . import __version__
 
- from .config import ConfigurationError
 
- from .const import LABEL_NETWORK
 
- from .const import LABEL_PROJECT
 
- from .const import LABEL_VERSION
 
- log = logging.getLogger(__name__)
 
- OPTS_EXCEPTIONS = [
 
-     'com.docker.network.driver.overlay.vxlanid_list',
 
-     'com.docker.network.windowsshim.hnsid',
 
-     'com.docker.network.windowsshim.networkname'
 
- ]
 
- class Network(object):
 
-     def __init__(self, client, project, name, driver=None, driver_opts=None,
 
-                  ipam=None, external=False, internal=False, enable_ipv6=False,
 
-                  labels=None, custom_name=False):
 
-         self.client = client
 
-         self.project = project
 
-         self.name = name
 
-         self.driver = driver
 
-         self.driver_opts = driver_opts
 
-         self.ipam = create_ipam_config_from_dict(ipam)
 
-         self.external = external
 
-         self.internal = internal
 
-         self.enable_ipv6 = enable_ipv6
 
-         self.labels = labels
 
-         self.custom_name = custom_name
 
-         self.legacy = None
 
-     def ensure(self):
 
-         if self.external:
 
-             if self.driver == 'overlay':
 
-                 # Swarm nodes do not register overlay networks that were
 
-                 # created on a different node unless they're in use.
 
-                 # See docker/compose#4399
 
-                 return
 
-             try:
 
-                 self.inspect()
 
-                 log.debug(
 
-                     'Network {0} declared as external. No new '
 
-                     'network will be created.'.format(self.name)
 
-                 )
 
-             except NotFound:
 
-                 raise ConfigurationError(
 
-                     'Network {name} declared as external, but could'
 
-                     ' not be found. Please create the network manually'
 
-                     ' using `{command} {name}` and try again.'.format(
 
-                         name=self.full_name,
 
-                         command='docker network create'
 
-                     )
 
-                 )
 
-             return
 
-         self._set_legacy_flag()
 
-         try:
 
-             data = self.inspect(legacy=self.legacy)
 
-             check_remote_network_config(data, self)
 
-         except NotFound:
 
-             driver_name = 'the default driver'
 
-             if self.driver:
 
-                 driver_name = 'driver "{}"'.format(self.driver)
 
-             log.info(
 
-                 'Creating network "{}" with {}'.format(self.full_name, driver_name)
 
-             )
 
-             self.client.create_network(
 
-                 name=self.full_name,
 
-                 driver=self.driver,
 
-                 options=self.driver_opts,
 
-                 ipam=self.ipam,
 
-                 internal=self.internal,
 
-                 enable_ipv6=self.enable_ipv6,
 
-                 labels=self._labels,
 
-                 attachable=version_gte(self.client._version, '1.24') or None,
 
-                 check_duplicate=True,
 
-             )
 
-     def remove(self):
 
-         if self.external:
 
-             log.info("Network %s is external, skipping", self.true_name)
 
-             return
 
-         log.info("Removing network {}".format(self.true_name))
 
-         self.client.remove_network(self.true_name)
 
-     def inspect(self, legacy=False):
 
-         if legacy:
 
-             return self.client.inspect_network(self.legacy_full_name)
 
-         return self.client.inspect_network(self.full_name)
 
-     @property
 
-     def legacy_full_name(self):
 
-         if self.custom_name:
 
-             return self.name
 
-         return '{0}_{1}'.format(
 
-             re.sub(r'[_-]', '', self.project), self.name
 
-         )
 
-     @property
 
-     def full_name(self):
 
-         if self.custom_name:
 
-             return self.name
 
-         return '{0}_{1}'.format(self.project, self.name)
 
-     @property
 
-     def true_name(self):
 
-         self._set_legacy_flag()
 
-         if self.legacy:
 
-             return self.legacy_full_name
 
-         return self.full_name
 
-     @property
 
-     def _labels(self):
 
-         if version_lt(self.client._version, '1.23'):
 
-             return None
 
-         labels = self.labels.copy() if self.labels else {}
 
-         labels.update({
 
-             LABEL_PROJECT: self.project,
 
-             LABEL_NETWORK: self.name,
 
-             LABEL_VERSION: __version__,
 
-         })
 
-         return labels
 
-     def _set_legacy_flag(self):
 
-         if self.legacy is not None:
 
-             return
 
-         try:
 
-             data = self.inspect(legacy=True)
 
-             self.legacy = data is not None
 
-         except NotFound:
 
-             self.legacy = False
 
- def create_ipam_config_from_dict(ipam_dict):
 
-     if not ipam_dict:
 
-         return None
 
-     return IPAMConfig(
 
-         driver=ipam_dict.get('driver') or 'default',
 
-         pool_configs=[
 
-             IPAMPool(
 
-                 subnet=config.get('subnet'),
 
-                 iprange=config.get('ip_range'),
 
-                 gateway=config.get('gateway'),
 
-                 aux_addresses=config.get('aux_addresses'),
 
-             )
 
-             for config in ipam_dict.get('config', [])
 
-         ],
 
-         options=ipam_dict.get('options')
 
-     )
 
- class NetworkConfigChangedError(ConfigurationError):
 
-     def __init__(self, net_name, property_name):
 
-         super(NetworkConfigChangedError, self).__init__(
 
-             'Network "{}" needs to be recreated - {} has changed'.format(
 
-                 net_name, property_name
 
-             )
 
-         )
 
- def check_remote_ipam_config(remote, local):
 
-     remote_ipam = remote.get('IPAM')
 
-     ipam_dict = create_ipam_config_from_dict(local.ipam)
 
-     if local.ipam.get('driver') and local.ipam.get('driver') != remote_ipam.get('Driver'):
 
-         raise NetworkConfigChangedError(local.true_name, 'IPAM driver')
 
-     if len(ipam_dict['Config']) != 0:
 
-         if len(ipam_dict['Config']) != len(remote_ipam['Config']):
 
-             raise NetworkConfigChangedError(local.true_name, 'IPAM configs')
 
-         remote_configs = sorted(remote_ipam['Config'], key='Subnet')
 
-         local_configs = sorted(ipam_dict['Config'], key='Subnet')
 
-         while local_configs:
 
-             lc = local_configs.pop()
 
-             rc = remote_configs.pop()
 
-             if lc.get('Subnet') != rc.get('Subnet'):
 
-                 raise NetworkConfigChangedError(local.true_name, 'IPAM config subnet')
 
-             if lc.get('Gateway') is not None and lc.get('Gateway') != rc.get('Gateway'):
 
-                 raise NetworkConfigChangedError(local.true_name, 'IPAM config gateway')
 
-             if lc.get('IPRange') != rc.get('IPRange'):
 
-                 raise NetworkConfigChangedError(local.true_name, 'IPAM config ip_range')
 
-             if sorted(lc.get('AuxiliaryAddresses')) != sorted(rc.get('AuxiliaryAddresses')):
 
-                 raise NetworkConfigChangedError(local.true_name, 'IPAM config aux_addresses')
 
-     remote_opts = remote_ipam.get('Options') or {}
 
-     local_opts = local.ipam.get('Options') or {}
 
-     for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):
 
-         if remote_opts.get(k) != local_opts.get(k):
 
-             raise NetworkConfigChangedError(local.true_name, 'IPAM option "{}"'.format(k))
 
- def check_remote_network_config(remote, local):
 
-     if local.driver and remote.get('Driver') != local.driver:
 
-         raise NetworkConfigChangedError(local.true_name, 'driver')
 
-     local_opts = local.driver_opts or {}
 
-     remote_opts = remote.get('Options') or {}
 
-     for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):
 
-         if k in OPTS_EXCEPTIONS:
 
-             continue
 
-         if remote_opts.get(k) != local_opts.get(k):
 
-             raise NetworkConfigChangedError(local.true_name, 'option "{}"'.format(k))
 
-     if local.ipam is not None:
 
-         check_remote_ipam_config(remote, local)
 
-     if local.internal is not None and local.internal != remote.get('Internal', False):
 
-         raise NetworkConfigChangedError(local.true_name, 'internal')
 
-     if local.enable_ipv6 is not None and local.enable_ipv6 != remote.get('EnableIPv6', False):
 
-         raise NetworkConfigChangedError(local.true_name, 'enable_ipv6')
 
-     local_labels = local.labels or {}
 
-     remote_labels = remote.get('Labels') or {}
 
-     for k in set.union(set(remote_labels.keys()), set(local_labels.keys())):
 
-         if k.startswith('com.docker.'):  # We are only interested in user-specified labels
 
-             continue
 
-         if remote_labels.get(k) != local_labels.get(k):
 
-             log.warning(
 
-                 'Network {}: label "{}" has changed. It may need to be'
 
-                 ' recreated.'.format(local.true_name, k)
 
-             )
 
- def build_networks(name, config_data, client):
 
-     network_config = config_data.networks or {}
 
-     networks = {
 
-         network_name: Network(
 
-             client=client, project=name,
 
-             name=data.get('name', network_name),
 
-             driver=data.get('driver'),
 
-             driver_opts=data.get('driver_opts'),
 
-             ipam=data.get('ipam'),
 
-             external=bool(data.get('external', False)),
 
-             internal=data.get('internal'),
 
-             enable_ipv6=data.get('enable_ipv6'),
 
-             labels=data.get('labels'),
 
-             custom_name=data.get('name') is not None,
 
-         )
 
-         for network_name, data in network_config.items()
 
-     }
 
-     if 'default' not in networks:
 
-         networks['default'] = Network(client, name, 'default')
 
-     return networks
 
- class ProjectNetworks(object):
 
-     def __init__(self, networks, use_networking):
 
-         self.networks = networks or {}
 
-         self.use_networking = use_networking
 
-     @classmethod
 
-     def from_services(cls, services, networks, use_networking):
 
-         service_networks = {
 
-             network: networks.get(network)
 
-             for service in services
 
-             for network in get_network_names_for_service(service)
 
-         }
 
-         unused = set(networks) - set(service_networks) - {'default'}
 
-         if unused:
 
-             log.warning(
 
-                 "Some networks were defined but are not used by any service: "
 
-                 "{}".format(", ".join(unused)))
 
-         return cls(service_networks, use_networking)
 
-     def remove(self):
 
-         if not self.use_networking:
 
-             return
 
-         for network in self.networks.values():
 
-             try:
 
-                 network.remove()
 
-             except NotFound:
 
-                 log.warning("Network %s not found.", network.true_name)
 
-     def initialize(self):
 
-         if not self.use_networking:
 
-             return
 
-         for network in self.networks.values():
 
-             network.ensure()
 
- def get_network_defs_for_service(service_dict):
 
-     if 'network_mode' in service_dict:
 
-         return {}
 
-     networks = service_dict.get('networks', {'default': None})
 
-     return dict(
 
-         (net, (config or {}))
 
-         for net, config in networks.items()
 
-     )
 
- def get_network_names_for_service(service_dict):
 
-     return get_network_defs_for_service(service_dict).keys()
 
- def get_networks(service_dict, network_definitions):
 
-     networks = {}
 
-     for name, netdef in get_network_defs_for_service(service_dict).items():
 
-         network = network_definitions.get(name)
 
-         if network:
 
-             networks[network.true_name] = netdef
 
-         else:
 
-             raise ConfigurationError(
 
-                 'Service "{}" uses an undefined network "{}"'
 
-                 .format(service_dict['name'], name))
 
-     if any([v.get('priority') for v in networks.values()]):
 
-         return OrderedDict(sorted(
 
-             networks.items(),
 
-             key=lambda t: t[1].get('priority') or 0, reverse=True
 
-         ))
 
-     else:
 
-         # Ensure Compose will pick a consistent primary network if no
 
-         # priority is set
 
-         return OrderedDict(sorted(networks.items(), key=lambda t: t[0]))
 
 
  |