123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764 |
- # Copyright 2013 dotCloud inc.
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import json
- import re
- import shlex
- import struct
- import requests
- import requests.exceptions
- from fig.packages import six
- from .auth import auth
- from .unixconn import unixconn
- from .utils import utils
- if not six.PY3:
- import websocket
- DEFAULT_TIMEOUT_SECONDS = 60
- STREAM_HEADER_SIZE_BYTES = 8
- class APIError(requests.exceptions.HTTPError):
- def __init__(self, message, response, explanation=None):
- super(APIError, self).__init__(message, response=response)
- self.explanation = explanation
- if self.explanation is None and response.content:
- self.explanation = response.content.strip()
- def __str__(self):
- message = super(APIError, self).__str__()
- if self.is_client_error():
- message = '%s Client Error: %s' % (
- self.response.status_code, self.response.reason)
- elif self.is_server_error():
- message = '%s Server Error: %s' % (
- self.response.status_code, self.response.reason)
- if self.explanation:
- message = '%s ("%s")' % (message, self.explanation)
- return message
- def is_client_error(self):
- return 400 <= self.response.status_code < 500
- def is_server_error(self):
- return 500 <= self.response.status_code < 600
- class Client(requests.Session):
- def __init__(self, base_url=None, version="1.6",
- timeout=DEFAULT_TIMEOUT_SECONDS):
- super(Client, self).__init__()
- if base_url is None:
- base_url = "http+unix://var/run/docker.sock"
- if 'unix:///' in base_url:
- base_url = base_url.replace('unix:/', 'unix:')
- if base_url.startswith('unix:'):
- base_url = "http+" + base_url
- if base_url.startswith('tcp:'):
- base_url = base_url.replace('tcp:', 'http:')
- if base_url.endswith('/'):
- base_url = base_url[:-1]
- self.base_url = base_url
- self._version = version
- self._timeout = timeout
- self._auth_configs = auth.load_config()
- self.mount('http+unix://', unixconn.UnixAdapter(base_url, timeout))
- def _set_request_timeout(self, kwargs):
- """Prepare the kwargs for an HTTP request by inserting the timeout
- parameter, if not already present."""
- kwargs.setdefault('timeout', self._timeout)
- return kwargs
- def _post(self, url, **kwargs):
- return self.post(url, **self._set_request_timeout(kwargs))
- def _get(self, url, **kwargs):
- return self.get(url, **self._set_request_timeout(kwargs))
- def _delete(self, url, **kwargs):
- return self.delete(url, **self._set_request_timeout(kwargs))
- def _url(self, path):
- return '{0}/v{1}{2}'.format(self.base_url, self._version, path)
- def _raise_for_status(self, response, explanation=None):
- """Raises stored :class:`APIError`, if one occurred."""
- try:
- response.raise_for_status()
- except requests.exceptions.HTTPError as e:
- raise APIError(e, response, explanation=explanation)
- def _result(self, response, json=False, binary=False):
- assert not (json and binary)
- self._raise_for_status(response)
- if json:
- return response.json()
- if binary:
- return response.content
- return response.text
- def _container_config(self, image, command, hostname=None, user=None,
- detach=False, stdin_open=False, tty=False,
- mem_limit=0, ports=None, environment=None, dns=None,
- volumes=None, volumes_from=None,
- network_disabled=False, entrypoint=None,
- cpu_shares=None, working_dir=None):
- if isinstance(command, six.string_types):
- command = shlex.split(str(command))
- if isinstance(environment, dict):
- environment = [
- '{0}={1}'.format(k, v) for k, v in environment.items()
- ]
- if ports and isinstance(ports, list):
- exposed_ports = {}
- for port_definition in ports:
- port = port_definition
- proto = 'tcp'
- if isinstance(port_definition, tuple):
- if len(port_definition) == 2:
- proto = port_definition[1]
- port = port_definition[0]
- exposed_ports['{0}/{1}'.format(port, proto)] = {}
- ports = exposed_ports
- if volumes and isinstance(volumes, list):
- volumes_dict = {}
- for vol in volumes:
- volumes_dict[vol] = {}
- volumes = volumes_dict
- attach_stdin = False
- attach_stdout = False
- attach_stderr = False
- stdin_once = False
- if not detach:
- attach_stdout = True
- attach_stderr = True
- if stdin_open:
- attach_stdin = True
- stdin_once = True
- return {
- 'Hostname': hostname,
- 'ExposedPorts': ports,
- 'User': user,
- 'Tty': tty,
- 'OpenStdin': stdin_open,
- 'StdinOnce': stdin_once,
- 'Memory': mem_limit,
- 'AttachStdin': attach_stdin,
- 'AttachStdout': attach_stdout,
- 'AttachStderr': attach_stderr,
- 'Env': environment,
- 'Cmd': command,
- 'Dns': dns,
- 'Image': image,
- 'Volumes': volumes,
- 'VolumesFrom': volumes_from,
- 'NetworkDisabled': network_disabled,
- 'Entrypoint': entrypoint,
- 'CpuShares': cpu_shares,
- 'WorkingDir': working_dir
- }
- def _post_json(self, url, data, **kwargs):
- # Go <1.1 can't unserialize null to a string
- # so we do this disgusting thing here.
- data2 = {}
- if data is not None:
- for k, v in six.iteritems(data):
- if v is not None:
- data2[k] = v
- if 'headers' not in kwargs:
- kwargs['headers'] = {}
- kwargs['headers']['Content-Type'] = 'application/json'
- return self._post(url, data=json.dumps(data2), **kwargs)
- def _attach_params(self, override=None):
- return override or {
- 'stdout': 1,
- 'stderr': 1,
- 'stream': 1
- }
- def _attach_websocket(self, container, params=None):
- if six.PY3:
- raise NotImplementedError("This method is not currently supported "
- "under python 3")
- url = self._url("/containers/{0}/attach/ws".format(container))
- req = requests.Request("POST", url, params=self._attach_params(params))
- full_url = req.prepare().url
- full_url = full_url.replace("http://", "ws://", 1)
- full_url = full_url.replace("https://", "wss://", 1)
- return self._create_websocket_connection(full_url)
- def _create_websocket_connection(self, url):
- return websocket.create_connection(url)
- def _stream_result(self, response):
- """Generator for straight-out, non chunked-encoded HTTP responses."""
- self._raise_for_status(response)
- for line in response.iter_lines(chunk_size=1, decode_unicode=True):
- # filter out keep-alive new lines
- if line:
- yield line + '\n'
- def _stream_result_socket(self, response):
- self._raise_for_status(response)
- return response.raw._fp.fp._sock
- def _stream_helper(self, response):
- """Generator for data coming from a chunked-encoded HTTP response."""
- socket_fp = self._stream_result_socket(response)
- socket_fp.setblocking(1)
- socket = socket_fp.makefile()
- while True:
- size = int(socket.readline(), 16)
- if size <= 0:
- break
- data = socket.readline()
- if not data:
- break
- yield data
- def _multiplexed_buffer_helper(self, response):
- """A generator of multiplexed data blocks read from a buffered
- response."""
- buf = self._result(response, binary=True)
- walker = 0
- while True:
- if len(buf[walker:]) < 8:
- break
- _, length = struct.unpack_from('>BxxxL', buf[walker:])
- start = walker + STREAM_HEADER_SIZE_BYTES
- end = start + length
- walker = end
- yield str(buf[start:end])
- def _multiplexed_socket_stream_helper(self, response):
- """A generator of multiplexed data blocks coming from a response
- socket."""
- socket = self._stream_result_socket(response)
- def recvall(socket, size):
- data = ''
- while size > 0:
- block = socket.recv(size)
- if not block:
- return None
- data += block
- size -= len(block)
- return data
- while True:
- socket.settimeout(None)
- header = recvall(socket, STREAM_HEADER_SIZE_BYTES)
- if not header:
- break
- _, length = struct.unpack('>BxxxL', header)
- if not length:
- break
- data = recvall(socket, length)
- if not data:
- break
- yield data
- def attach(self, container, stdout=True, stderr=True,
- stream=False, logs=False):
- if isinstance(container, dict):
- container = container.get('Id')
- params = {
- 'logs': logs and 1 or 0,
- 'stdout': stdout and 1 or 0,
- 'stderr': stderr and 1 or 0,
- 'stream': stream and 1 or 0,
- }
- u = self._url("/containers/{0}/attach".format(container))
- response = self._post(u, params=params, stream=stream)
- # Stream multi-plexing was introduced in API v1.6.
- if utils.compare_version('1.6', self._version) < 0:
- return stream and self._stream_result(response) or \
- self._result(response, binary=True)
- return stream and self._multiplexed_socket_stream_helper(response) or \
- ''.join([x for x in self._multiplexed_buffer_helper(response)])
- def attach_socket(self, container, params=None, ws=False):
- if params is None:
- params = {
- 'stdout': 1,
- 'stderr': 1,
- 'stream': 1
- }
- if ws:
- return self._attach_websocket(container, params)
- if isinstance(container, dict):
- container = container.get('Id')
- u = self._url("/containers/{0}/attach".format(container))
- return self._stream_result_socket(self.post(
- u, None, params=self._attach_params(params), stream=True))
- def build(self, path=None, tag=None, quiet=False, fileobj=None,
- nocache=False, rm=False, stream=False, timeout=None):
- remote = context = headers = None
- if path is None and fileobj is None:
- raise Exception("Either path or fileobj needs to be provided.")
- if fileobj is not None:
- context = utils.mkbuildcontext(fileobj)
- elif path.startswith(('http://', 'https://', 'git://', 'github.com/')):
- remote = path
- else:
- context = utils.tar(path)
- u = self._url('/build')
- params = {
- 't': tag,
- 'remote': remote,
- 'q': quiet,
- 'nocache': nocache,
- 'rm': rm
- }
- if context is not None:
- headers = {'Content-Type': 'application/tar'}
- response = self._post(
- u,
- data=context,
- params=params,
- headers=headers,
- stream=stream,
- timeout=timeout,
- )
- if context is not None:
- context.close()
- if stream:
- return self._stream_result(response)
- else:
- output = self._result(response)
- srch = r'Successfully built ([0-9a-f]+)'
- match = re.search(srch, output)
- if not match:
- return None, output
- return match.group(1), output
- def commit(self, container, repository=None, tag=None, message=None,
- author=None, conf=None):
- params = {
- 'container': container,
- 'repo': repository,
- 'tag': tag,
- 'comment': message,
- 'author': author
- }
- u = self._url("/commit")
- return self._result(self._post_json(u, data=conf, params=params),
- json=True)
- def containers(self, quiet=False, all=False, trunc=True, latest=False,
- since=None, before=None, limit=-1):
- params = {
- 'limit': 1 if latest else limit,
- 'all': 1 if all else 0,
- 'trunc_cmd': 1 if trunc else 0,
- 'since': since,
- 'before': before
- }
- u = self._url("/containers/json")
- res = self._result(self._get(u, params=params), True)
- if quiet:
- return [{'Id': x['Id']} for x in res]
- return res
- def copy(self, container, resource):
- res = self._post_json(
- self._url("/containers/{0}/copy".format(container)),
- data={"Resource": resource},
- stream=True
- )
- self._raise_for_status(res)
- return res.raw
- def create_container(self, image, command=None, hostname=None, user=None,
- detach=False, stdin_open=False, tty=False,
- mem_limit=0, ports=None, environment=None, dns=None,
- volumes=None, volumes_from=None,
- network_disabled=False, name=None, entrypoint=None,
- cpu_shares=None, working_dir=None):
- config = self._container_config(
- image, command, hostname, user, detach, stdin_open, tty, mem_limit,
- ports, environment, dns, volumes, volumes_from, network_disabled,
- entrypoint, cpu_shares, working_dir
- )
- return self.create_container_from_config(config, name)
- def create_container_from_config(self, config, name=None):
- u = self._url("/containers/create")
- params = {
- 'name': name
- }
- res = self._post_json(u, data=config, params=params)
- return self._result(res, True)
- def diff(self, container):
- if isinstance(container, dict):
- container = container.get('Id')
- return self._result(self._get(self._url("/containers/{0}/changes".
- format(container))), True)
- def events(self):
- u = self._url("/events")
- socket = self._stream_result_socket(self.get(u, stream=True))
- while True:
- chunk = socket.recv(4096)
- if chunk:
- # Messages come in the format of length, data, newline.
- length, data = chunk.split("\n", 1)
- length = int(length, 16)
- if length > len(data):
- data += socket.recv(length - len(data))
- yield json.loads(data)
- else:
- break
- def export(self, container):
- if isinstance(container, dict):
- container = container.get('Id')
- res = self._get(self._url("/containers/{0}/export".format(container)),
- stream=True)
- self._raise_for_status(res)
- return res.raw
- def history(self, image):
- res = self._get(self._url("/images/{0}/history".format(image)))
- self._raise_for_status(res)
- return self._result(res)
- def images(self, name=None, quiet=False, all=False, viz=False):
- if viz:
- return self._result(self._get(self._url("images/viz")))
- params = {
- 'filter': name,
- 'only_ids': 1 if quiet else 0,
- 'all': 1 if all else 0,
- }
- res = self._result(self._get(self._url("/images/json"), params=params),
- True)
- if quiet:
- return [x['Id'] for x in res]
- return res
- def import_image(self, src=None, repository=None, tag=None, image=None):
- u = self._url("/images/create")
- params = {
- 'repo': repository,
- 'tag': tag
- }
- if src:
- try:
- # XXX: this is ways not optimal but the only way
- # for now to import tarballs through the API
- fic = open(src)
- data = fic.read()
- fic.close()
- src = "-"
- except IOError:
- # file does not exists or not a file (URL)
- data = None
- if isinstance(src, six.string_types):
- params['fromSrc'] = src
- return self._result(self._post(u, data=data, params=params))
- return self._result(self._post(u, data=src, params=params))
- if image:
- params['fromImage'] = image
- return self._result(self._post(u, data=None, params=params))
- raise Exception("Must specify a src or image")
- def info(self):
- return self._result(self._get(self._url("/info")),
- True)
- def insert(self, image, url, path):
- api_url = self._url("/images/" + image + "/insert")
- params = {
- 'url': url,
- 'path': path
- }
- return self._result(self._post(api_url, params=params))
- def inspect_container(self, container):
- if isinstance(container, dict):
- container = container.get('Id')
- return self._result(
- self._get(self._url("/containers/{0}/json".format(container))),
- True)
- def inspect_image(self, image_id):
- return self._result(
- self._get(self._url("/images/{0}/json".format(image_id))),
- True
- )
- def kill(self, container, signal=None):
- if isinstance(container, dict):
- container = container.get('Id')
- url = self._url("/containers/{0}/kill".format(container))
- params = {}
- if signal is not None:
- params['signal'] = signal
- res = self._post(url, params=params)
- self._raise_for_status(res)
- def login(self, username, password=None, email=None, registry=None,
- reauth=False):
- # If we don't have any auth data so far, try reloading the config file
- # one more time in case anything showed up in there.
- if not self._auth_configs:
- self._auth_configs = auth.load_config()
- registry = registry or auth.INDEX_URL
- authcfg = auth.resolve_authconfig(self._auth_configs, registry)
- # If we found an existing auth config for this registry and username
- # combination, we can return it immediately unless reauth is requested.
- if authcfg and authcfg.get('username', None) == username \
- and not reauth:
- return authcfg
- req_data = {
- 'username': username,
- 'password': password,
- 'email': email,
- 'serveraddress': registry,
- }
- response = self._post_json(self._url('/auth'), data=req_data)
- if response.status_code == 200:
- self._auth_configs[registry] = req_data
- return self._result(response, json=True)
- def logs(self, container, stdout=True, stderr=True, stream=False):
- return self.attach(
- container,
- stdout=stdout,
- stderr=stderr,
- stream=stream,
- logs=True
- )
- def port(self, container, private_port):
- if isinstance(container, dict):
- container = container.get('Id')
- res = self._get(self._url("/containers/{0}/json".format(container)))
- self._raise_for_status(res)
- json_ = res.json()
- s_port = str(private_port)
- h_ports = None
- h_ports = json_['NetworkSettings']['Ports'].get(s_port + '/udp')
- if h_ports is None:
- h_ports = json_['NetworkSettings']['Ports'].get(s_port + '/tcp')
- return h_ports
- def pull(self, repository, tag=None, stream=False):
- registry, repo_name = auth.resolve_repository_name(repository)
- if repo_name.count(":") == 1:
- repository, tag = repository.rsplit(":", 1)
- params = {
- 'tag': tag,
- 'fromImage': repository
- }
- headers = {}
- if utils.compare_version('1.5', self._version) >= 0:
- # If we don't have any auth data so far, try reloading the config
- # file one more time in case anything showed up in there.
- if not self._auth_configs:
- self._auth_configs = auth.load_config()
- authcfg = auth.resolve_authconfig(self._auth_configs, registry)
- # Do not fail here if no atuhentication exists for this specific
- # registry as we can have a readonly pull. Just put the header if
- # we can.
- if authcfg:
- headers['X-Registry-Auth'] = auth.encode_header(authcfg)
- response = self._post(self._url('/images/create'), params=params,
- headers=headers, stream=stream, timeout=None)
- if stream:
- return self._stream_helper(response)
- else:
- return self._result(response)
- def push(self, repository, stream=False):
- registry, repo_name = auth.resolve_repository_name(repository)
- u = self._url("/images/{0}/push".format(repository))
- headers = {}
- if utils.compare_version('1.5', self._version) >= 0:
- # If we don't have any auth data so far, try reloading the config
- # file one more time in case anything showed up in there.
- if not self._auth_configs:
- self._auth_configs = auth.load_config()
- authcfg = auth.resolve_authconfig(self._auth_configs, registry)
- # Do not fail here if no atuhentication exists for this specific
- # registry as we can have a readonly pull. Just put the header if
- # we can.
- if authcfg:
- headers['X-Registry-Auth'] = auth.encode_header(authcfg)
- response = self._post_json(u, None, headers=headers, stream=stream)
- else:
- response = self._post_json(u, authcfg, stream=stream)
- return stream and self._stream_helper(response) \
- or self._result(response)
- def remove_container(self, container, v=False, link=False):
- if isinstance(container, dict):
- container = container.get('Id')
- params = {'v': v, 'link': link}
- res = self._delete(self._url("/containers/" + container),
- params=params)
- self._raise_for_status(res)
- def remove_image(self, image):
- res = self._delete(self._url("/images/" + image))
- self._raise_for_status(res)
- def restart(self, container, timeout=10):
- if isinstance(container, dict):
- container = container.get('Id')
- params = {'t': timeout}
- url = self._url("/containers/{0}/restart".format(container))
- res = self._post(url, params=params)
- self._raise_for_status(res)
- def search(self, term):
- return self._result(self._get(self._url("/images/search"),
- params={'term': term}),
- True)
- def start(self, container, binds=None, port_bindings=None, lxc_conf=None,
- publish_all_ports=False, links=None, privileged=False):
- if isinstance(container, dict):
- container = container.get('Id')
- if isinstance(lxc_conf, dict):
- formatted = []
- for k, v in six.iteritems(lxc_conf):
- formatted.append({'Key': k, 'Value': str(v)})
- lxc_conf = formatted
- start_config = {
- 'LxcConf': lxc_conf
- }
- if binds:
- bind_pairs = [
- '{0}:{1}'.format(host, dest) for host, dest in binds.items()
- ]
- start_config['Binds'] = bind_pairs
- if port_bindings:
- start_config['PortBindings'] = utils.convert_port_bindings(
- port_bindings
- )
- start_config['PublishAllPorts'] = publish_all_ports
- if links:
- if isinstance(links, dict):
- links = six.iteritems(links)
- formatted_links = [
- '{0}:{1}'.format(k, v) for k, v in sorted(links)
- ]
- start_config['Links'] = formatted_links
- start_config['Privileged'] = privileged
- url = self._url("/containers/{0}/start".format(container))
- res = self._post_json(url, data=start_config)
- self._raise_for_status(res)
- def stop(self, container, timeout=10):
- if isinstance(container, dict):
- container = container.get('Id')
- params = {'t': timeout}
- url = self._url("/containers/{0}/stop".format(container))
- res = self._post(url, params=params,
- timeout=max(timeout, self._timeout))
- self._raise_for_status(res)
- def tag(self, image, repository, tag=None, force=False):
- params = {
- 'tag': tag,
- 'repo': repository,
- 'force': 1 if force else 0
- }
- url = self._url("/images/{0}/tag".format(image))
- res = self._post(url, params=params)
- self._raise_for_status(res)
- return res.status_code == 201
- def top(self, container):
- u = self._url("/containers/{0}/top".format(container))
- return self._result(self._get(u), True)
- def version(self):
- return self._result(self._get(self._url("/version")), True)
- def wait(self, container):
- if isinstance(container, dict):
- container = container.get('Id')
- url = self._url("/containers/{0}/wait".format(container))
- res = self._post(url, timeout=None)
- self._raise_for_status(res)
- json_ = res.json()
- if 'StatusCode' in json_:
- return json_['StatusCode']
- return -1
|