123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # Copyright 2013 dotCloud inc.
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import base64
- import fileinput
- import json
- import os
- from fig.packages import six
- from ..utils import utils
- INDEX_URL = 'https://index.docker.io/v1/'
- DOCKER_CONFIG_FILENAME = '.dockercfg'
- def swap_protocol(url):
- if url.startswith('http://'):
- return url.replace('http://', 'https://', 1)
- if url.startswith('https://'):
- return url.replace('https://', 'http://', 1)
- return url
- def expand_registry_url(hostname):
- if hostname.startswith('http:') or hostname.startswith('https:'):
- if '/' not in hostname[9:]:
- hostname = hostname + '/v1/'
- return hostname
- if utils.ping('https://' + hostname + '/v1/_ping'):
- return 'https://' + hostname + '/v1/'
- return 'http://' + hostname + '/v1/'
- def resolve_repository_name(repo_name):
- if '://' in repo_name:
- raise ValueError('Repository name cannot contain a '
- 'scheme ({0})'.format(repo_name))
- parts = repo_name.split('/', 1)
- if not '.' in parts[0] and not ':' in parts[0] and parts[0] != 'localhost':
- # This is a docker index repo (ex: foo/bar or ubuntu)
- return INDEX_URL, repo_name
- if len(parts) < 2:
- raise ValueError('Invalid repository name ({0})'.format(repo_name))
- if 'index.docker.io' in parts[0]:
- raise ValueError('Invalid repository name,'
- 'try "{0}" instead'.format(parts[1]))
- return expand_registry_url(parts[0]), parts[1]
- def resolve_authconfig(authconfig, registry=None):
- """Return the authentication data from the given auth configuration for a
- specific registry. We'll do our best to infer the correct URL for the
- registry, trying both http and https schemes. Returns an empty dictionnary
- if no data exists."""
- # Default to the public index server
- registry = registry or INDEX_URL
- # Ff its not the index server there are three cases:
- #
- # 1. this is a full config url -> it should be used as is
- # 2. it could be a full url, but with the wrong protocol
- # 3. it can be the hostname optionally with a port
- #
- # as there is only one auth entry which is fully qualified we need to start
- # parsing and matching
- if '/' not in registry:
- registry = registry + '/v1/'
- if not registry.startswith('http:') and not registry.startswith('https:'):
- registry = 'https://' + registry
- if registry in authconfig:
- return authconfig[registry]
- return authconfig.get(swap_protocol(registry), None)
- def decode_auth(auth):
- if isinstance(auth, six.string_types):
- auth = auth.encode('ascii')
- s = base64.b64decode(auth)
- login, pwd = s.split(b':')
- return login.decode('ascii'), pwd.decode('ascii')
- def encode_header(auth):
- auth_json = json.dumps(auth).encode('ascii')
- return base64.b64encode(auth_json)
- def load_config(root=None):
- """Loads authentication data from a Docker configuration file in the given
- root directory."""
- conf = {}
- data = None
- config_file = os.path.join(root or os.environ.get('HOME', '.'),
- DOCKER_CONFIG_FILENAME)
- # First try as JSON
- try:
- with open(config_file) as f:
- conf = {}
- for registry, entry in six.iteritems(json.load(f)):
- username, password = decode_auth(entry['auth'])
- conf[registry] = {
- 'username': username,
- 'password': password,
- 'email': entry['email'],
- 'serveraddress': registry,
- }
- return conf
- except:
- pass
- # If that fails, we assume the configuration file contains a single
- # authentication token for the public registry in the following format:
- #
- # auth = AUTH_TOKEN
- # email = [email protected]
- try:
- data = []
- for line in fileinput.input(config_file):
- data.append(line.strip().split(' = ')[1])
- if len(data) < 2:
- # Not enough data
- raise Exception('Invalid or empty configuration file!')
- username, password = decode_auth(data[0])
- conf[INDEX_URL] = {
- 'username': username,
- 'password': password,
- 'email': data[1],
- 'serveraddress': INDEX_URL,
- }
- return conf
- except:
- pass
- # If all fails, return an empty config
- return {}
|