Răsfoiți Sursa

Implement subnet config validation (fixes #4552)

Signed-off-by: Drew Romanyk <[email protected]>
Drew Romanyk 8 ani în urmă
părinte
comite
5691b8241d

+ 1 - 1
compose/config/config_schema_v3.5.json

@@ -418,7 +418,7 @@
               "items": {
                 "type": "object",
                 "properties": {
-                  "subnet": {"type": "string"}
+                  "subnet": {"type": "string", "format": "subnet_ip_address"}
                 },
                 "additionalProperties": false
               }

+ 29 - 1
compose/config/validation.py

@@ -5,6 +5,7 @@ import json
 import logging
 import os
 import re
+import socket
 import sys
 
 import six
@@ -43,6 +44,9 @@ DOCKER_CONFIG_HINTS = {
 
 VALID_NAME_CHARS = '[a-zA-Z0-9\._\-]'
 VALID_EXPOSE_FORMAT = r'^\d+(\-\d+)?(\/[a-zA-Z]+)?$'
+VALID_IPV4_FORMAT = r'^(\d{1,3}.){3}\d{1,3}$'
+VALID_IPV4_CIDR_FORMAT = r'^(\d|[1-2]\d|3[0-2])$'
+VALID_IPV6_CIDR_FORMAT = r'^(\d|[1-9]\d|1[0-1]\d|12[0-8])$'
 
 
 @FormatChecker.cls_checks(format="ports", raises=ValidationError)
@@ -64,6 +68,30 @@ def format_expose(instance):
     return True
 
 
[email protected]_checks("subnet_ip_address", raises=ValidationError)
+def format_subnet_ip_address(instance):
+    if isinstance(instance, six.string_types):
+        if '/' not in instance:
+            raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'")
+
+        ip_address, cidr = instance.split('/')
+
+        if re.match(VALID_IPV4_FORMAT, ip_address):
+            if not (re.match(VALID_IPV4_CIDR_FORMAT, cidr) and
+                    all(0 <= int(component) <= 255 for component in ip_address.split("."))):
+                raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'")
+        elif re.match(VALID_IPV6_CIDR_FORMAT, cidr) and hasattr(socket, "inet_pton"):
+            try:
+                if not (socket.inet_pton(socket.AF_INET6, ip_address)):
+                    raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'")
+            except socket.error as e:
+                raise ValidationError(six.text_type(e))
+        else:
+            raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'")
+
+    return True
+
+
 def match_named_volumes(service_dict, project_volumes):
     service_volumes = service_dict.get('volumes', [])
     for volume_spec in service_volumes:
@@ -391,7 +419,7 @@ def process_config_schema_errors(error):
 
 def validate_against_config_schema(config_file):
     schema = load_jsonschema(config_file)
-    format_checker = FormatChecker(["ports", "expose"])
+    format_checker = FormatChecker(["ports", "expose", "subnet_ip_address"])
     validator = Draft4Validator(
         schema,
         resolver=RefResolver(get_resolver_path(), schema),

+ 82 - 0
tests/unit/config/config_test.py

@@ -2846,6 +2846,88 @@ class PortsTest(unittest.TestCase):
         )
 
 
+class SubnetTest(unittest.TestCase):
+    INVALID_SUBNET_TYPES = [
+        None,
+        False,
+        10,
+    ]
+
+    INVALID_SUBNET_MAPPINGS = [
+        "",
+        "192.168.0.1/sdfsdfs",
+        "192.168.0.1/",
+        "192.168.0.1/33",
+        "192.168.0.1/01",
+        "192.168.0.1",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/sdfsdfs",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/129",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/01",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156",
+    ]
+
+    ILLEGAL_SUBNET_MAPPINGS = [
+        "ge80:0000:0000:0000:0204:61ff:fe9d:f156/128"
+    ]
+
+    VALID_SUBNET_MAPPINGS = [
+        "192.168.0.1/0",
+        "192.168.0.1/32",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/0",
+        "fe80:0000:0000:0000:0204:61ff:fe9d:f156/128",
+    ]
+
+    def test_config_invalid_subnet_type_validation(self):
+        for invalid_subnet in self.INVALID_SUBNET_TYPES:
+            with pytest.raises(ConfigurationError) as exc:
+                self.check_config(invalid_subnet)
+
+            assert "contains an invalid type" in exc.value.msg
+
+    def test_config_invalid_subnet_format_validation(self):
+        for invalid_subnet in self.INVALID_SUBNET_MAPPINGS:
+            with pytest.raises(ConfigurationError) as exc:
+                self.check_config(invalid_subnet)
+
+            assert "should be of the format 'IP_ADDRESS/CIDR'" in exc.value.msg
+
+    def test_config_illegal_subnet_type_validation(self):
+        for invalid_subnet in self.ILLEGAL_SUBNET_MAPPINGS:
+            with pytest.raises(ConfigurationError) as exc:
+                self.check_config(invalid_subnet)
+
+            assert "illegal IP address string" in exc.value.msg
+
+    def test_config_valid_subnet_format_validation(self):
+        for valid_subnet in self.VALID_SUBNET_MAPPINGS:
+            self.check_config(valid_subnet)
+
+    def check_config(self, subnet):
+        config.load(
+            build_config_details({
+                'version': '3.5',
+                'services': {
+                    'web': {
+                        'image': 'busybox'
+                    }
+                },
+                'networks': {
+                    'default': {
+                        'ipam': {
+                            'config': [
+                                {
+                                    'subnet': subnet
+                                }
+                            ],
+                            'driver': 'default'
+                        }
+                    }
+                }
+            })
+        )
+
+
 class InterpolationTest(unittest.TestCase):
 
     @mock.patch.dict(os.environ)