| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 | from __future__ import absolute_importfrom __future__ import unicode_literalsimport osimport platformimport sslimport dockerimport pytestimport composefrom compose.cli import errorsfrom compose.cli.docker_client import docker_clientfrom compose.cli.docker_client import get_tls_versionfrom compose.cli.docker_client import tls_config_from_optionsfrom compose.config.environment import Environmentfrom tests import mockfrom tests import unittestclass DockerClientTestCase(unittest.TestCase):    def test_docker_client_no_home(self):        with mock.patch.dict(os.environ):            try:                del os.environ['HOME']            except KeyError:                pass            docker_client(os.environ)    @mock.patch.dict(os.environ)    def test_docker_client_with_custom_timeout(self):        os.environ['COMPOSE_HTTP_TIMEOUT'] = '123'        client = docker_client(os.environ)        assert client.timeout == 123    @mock.patch.dict(os.environ)    def test_custom_timeout_error(self):        os.environ['COMPOSE_HTTP_TIMEOUT'] = '123'        client = docker_client(os.environ)        with mock.patch('compose.cli.errors.log') as fake_log:            with pytest.raises(errors.ConnectionError):                with errors.handle_connection_errors(client):                    raise errors.RequestsConnectionError(                        errors.ReadTimeoutError(None, None, None))        assert fake_log.error.call_count == 1        assert '123' in fake_log.error.call_args[0][0]        with mock.patch('compose.cli.errors.log') as fake_log:            with pytest.raises(errors.ConnectionError):                with errors.handle_connection_errors(client):                    raise errors.ReadTimeout()        assert fake_log.error.call_count == 1        assert '123' in fake_log.error.call_args[0][0]    def test_user_agent(self):        client = docker_client(os.environ)        expected = "docker-compose/{0} docker-py/{1} {2}/{3}".format(            compose.__version__,            docker.__version__,            platform.system(),            platform.release()        )        assert client.headers['User-Agent'] == expectedclass TLSConfigTestCase(unittest.TestCase):    cert_path = 'tests/fixtures/tls/'    ca_cert = os.path.join(cert_path, 'ca.pem')    client_cert = os.path.join(cert_path, 'cert.pem')    key = os.path.join(cert_path, 'key.pem')    def test_simple_tls(self):        options = {'--tls': True}        result = tls_config_from_options(options)        assert result is True    def test_tls_ca_cert(self):        options = {            '--tlscacert': self.ca_cert, '--tlsverify': True        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.ca_cert == options['--tlscacert']        assert result.verify is True    def test_tls_ca_cert_explicit(self):        options = {            '--tlscacert': self.ca_cert, '--tls': True,            '--tlsverify': True        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.ca_cert == options['--tlscacert']        assert result.verify is True    def test_tls_client_cert(self):        options = {            '--tlscert': self.client_cert, '--tlskey': self.key        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (options['--tlscert'], options['--tlskey'])    def test_tls_client_cert_explicit(self):        options = {            '--tlscert': self.client_cert, '--tlskey': self.key,            '--tls': True        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (options['--tlscert'], options['--tlskey'])    def test_tls_client_and_ca(self):        options = {            '--tlscert': self.client_cert, '--tlskey': self.key,            '--tlsverify': True, '--tlscacert': self.ca_cert        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (options['--tlscert'], options['--tlskey'])        assert result.ca_cert == options['--tlscacert']        assert result.verify is True    def test_tls_client_and_ca_explicit(self):        options = {            '--tlscert': self.client_cert, '--tlskey': self.key,            '--tlsverify': True, '--tlscacert': self.ca_cert,            '--tls': True        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (options['--tlscert'], options['--tlskey'])        assert result.ca_cert == options['--tlscacert']        assert result.verify is True    def test_tls_client_missing_key(self):        options = {'--tlscert': self.client_cert}        with pytest.raises(docker.errors.TLSParameterError):            tls_config_from_options(options)        options = {'--tlskey': self.key}        with pytest.raises(docker.errors.TLSParameterError):            tls_config_from_options(options)    def test_assert_hostname_explicit_skip(self):        options = {'--tlscacert': self.ca_cert, '--skip-hostname-check': True}        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.assert_hostname is False    def test_tls_client_and_ca_quoted_paths(self):        options = {            '--tlscacert': '"{0}"'.format(self.ca_cert),            '--tlscert': '"{0}"'.format(self.client_cert),            '--tlskey': '"{0}"'.format(self.key),            '--tlsverify': True        }        result = tls_config_from_options(options)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (self.client_cert, self.key)        assert result.ca_cert == self.ca_cert        assert result.verify is True    def test_tls_simple_with_tls_version(self):        tls_version = 'TLSv1'        options = {'--tls': True}        environment = Environment({'COMPOSE_TLS_VERSION': tls_version})        result = tls_config_from_options(options, environment)        assert isinstance(result, docker.tls.TLSConfig)        assert result.ssl_version == ssl.PROTOCOL_TLSv1    def test_tls_mixed_environment_and_flags(self):        options = {'--tls': True, '--tlsverify': False}        environment = Environment({'DOCKER_CERT_PATH': 'tests/fixtures/tls/'})        result = tls_config_from_options(options, environment)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (self.client_cert, self.key)        assert result.ca_cert == self.ca_cert        assert result.verify is False    def test_tls_flags_override_environment(self):        environment = Environment({            'DOCKER_CERT_PATH': '/completely/wrong/path',            'DOCKER_TLS_VERIFY': 'false'        })        options = {            '--tlscacert': '"{0}"'.format(self.ca_cert),            '--tlscert': '"{0}"'.format(self.client_cert),            '--tlskey': '"{0}"'.format(self.key),            '--tlsverify': True        }        result = tls_config_from_options(options, environment)        assert isinstance(result, docker.tls.TLSConfig)        assert result.cert == (self.client_cert, self.key)        assert result.ca_cert == self.ca_cert        assert result.verify is True    def test_tls_verify_flag_no_override(self):        environment = Environment({            'DOCKER_TLS_VERIFY': 'true',            'COMPOSE_TLS_VERSION': 'TLSv1',            'DOCKER_CERT_PATH': self.cert_path        })        options = {'--tls': True, '--tlsverify': False}        result = tls_config_from_options(options, environment)        assert isinstance(result, docker.tls.TLSConfig)        assert result.ssl_version == ssl.PROTOCOL_TLSv1        # verify is a special case - since `--tlsverify` = False means it        # wasn't used, we set it if either the environment or the flag is True        # see https://github.com/docker/compose/issues/5632        assert result.verify is True    def test_tls_verify_env_falsy_value(self):        environment = Environment({'DOCKER_TLS_VERIFY': '0'})        options = {'--tls': True}        assert tls_config_from_options(options, environment) is True    def test_tls_verify_default_cert_path(self):        environment = Environment({'DOCKER_TLS_VERIFY': '1'})        options = {'--tls': True}        with mock.patch('compose.cli.docker_client.default_cert_path') as dcp:            dcp.return_value = 'tests/fixtures/tls/'            result = tls_config_from_options(options, environment)        assert isinstance(result, docker.tls.TLSConfig)        assert result.verify is True        assert result.ca_cert == self.ca_cert        assert result.cert == (self.client_cert, self.key)class TestGetTlsVersion(object):    def test_get_tls_version_default(self):        environment = {}        assert get_tls_version(environment) is None    @pytest.mark.skipif(not hasattr(ssl, 'PROTOCOL_TLSv1_2'), reason='TLS v1.2 unsupported')    def test_get_tls_version_upgrade(self):        environment = {'COMPOSE_TLS_VERSION': 'TLSv1_2'}        assert get_tls_version(environment) == ssl.PROTOCOL_TLSv1_2    def test_get_tls_version_unavailable(self):        environment = {'COMPOSE_TLS_VERSION': 'TLSv5_5'}        with mock.patch('compose.cli.docker_client.log') as mock_log:            tls_version = get_tls_version(environment)        mock_log.warn.assert_called_once_with(mock.ANY)        assert tls_version is None
 |