Browse Source

feat(AliESA) 阿里云边缘安全加速 DNS provider support (#498)

* Initial plan

* Implement AliESA (阿里云边缘安全加速) DNS provider with comprehensive tests and documentation

Co-authored-by: NewFuture <[email protected]>

* Add manual site ID specification support for minimal permissions and enhanced documentation

Co-authored-by: NewFuture <[email protected]>

* Refactor AliesaProvider to extend AlidnsProvider and use BaseProvider's _split_zone_and_sub with manual site ID support

Co-authored-by: NewFuture <[email protected]>

* Add comprehensive unit tests following official ESA API documentation

Co-authored-by: NewFuture <[email protected]>

* fix pipeline

* Refactor aliesa inheritance structure and address review comments

Co-authored-by: NewFuture <[email protected]>

* Address review comments: fix lint issues, remove unsupported features from docs, clean up schema

Co-authored-by: NewFuture <[email protected]>

* Address review comments: fix Python 2.7 compatibility, simplify tests, remove unsupported features, add configurable endpoints

Co-authored-by: NewFuture <[email protected]>

* Address review comments: fix Python 2.7 compatibility, implement _validate method, fix lint issues, update docs

Co-authored-by: NewFuture <[email protected]>

* Remove _split_zone_and_sub override as requested - focus on format checking

Co-authored-by: NewFuture <[email protected]>

* Fix ESA API HTTP method issue and imports

Co-authored-by: NewFuture <[email protected]>

* Refactor request method to avoid code duplication and improve TTL handling

Co-authored-by: NewFuture <[email protected]>

* fix aliesa

* Refactor API attribute to endpoint in provider classes for consistency

* Remove redundant validation method from AliesaProvider class

* Update author attribution and improve documentation formatting in AliesaProvider

* Remove outdated configuration examples and tests for region endpoints in AliesaProvider documentation and tests

* Add Cloudflare DNS configuration guides in English and Chinese

* remove json content

* Refactor encoding methods and update domain splitting function across providers

* fix py2

* fix

* Update type hints and logging in AliDNS and AliESA providers

* 修改请求参数

* lower case

* update

* fix true

* improve query

* fix bug

* fix typos

* override  remark

* fix lint

---------

Co-authored-by: copilot-swe-agent[bot] <[email protected]>
Co-authored-by: NewFuture <[email protected]>
Co-authored-by: New Future <[email protected]>
Copilot 6 months ago
parent
commit
78b0e953da

+ 5 - 2
README.md

@@ -39,6 +39,7 @@
 - 服务商支持:
   - [DNSPOD](https://www.dnspod.cn/) ([配置指南](doc/providers/dnspod.md))
   - [阿里 DNS](http://www.alidns.com/) ([配置指南](doc/providers/alidns.md)) ⚡
+  - [阿里云边缘安全加速(ESA)](https://esa.console.aliyun.com/) ([配置指南](doc/providers/aliesa.md)) ⚡
   - [DNS.COM](https://www.dns.com/) (@loftor-git)
   - [DNSPOD 国际版](https://www.dnspod.com/)
   - [CloudFlare](https://www.cloudflare.com/) (@tongyifan)
@@ -117,6 +118,7 @@
 
    - **DNSPOD(中国版)**: [创建 token](https://support.dnspod.cn/Kb/showarticle/tsid/227/) | [详细配置文档](doc/providers/dnspod.md)
    - **阿里云 DNS**: [申请 accesskey](https://help.aliyun.com/document_detail/87745.htm) | [详细配置文档](doc/providers/alidns.md)
+   - **阿里云边缘安全加速(ESA)**: [申请 accesskey](https://help.aliyun.com/document_detail/87745.htm) | [详细配置文档](doc/providers/aliesa.md)
    - **DNS.COM**: [API Key/Secret](https://www.dns.com/member/apiSet)
    - **DNSPOD(国际版)**: [获取 token](https://www.dnspod.com/docs/info.html#get-the-user-token)
    - **CloudFlare**: [API Key](https://support.cloudflare.com/hc/en-us/articles/200167836-Where-do-I-find-my-Cloudflare-API-key-)(除了 `email + API KEY`,也可使用 `Token`,**需要list Zone 权限**)
@@ -174,7 +176,7 @@ python -m ddns -c /path/to/config.json
 | :----: | :----------------: | :------: | :---------: | :----------------: | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
 |   id   |       string       |    √     |     无      |    api 访问 ID     | Cloudflare 为邮箱(使用 Token 时留空)<br>HE.net 可留空<br>华为云为 Access Key ID (AK)                                                                                                   |
 | token  |       string       |    √     |     无      |   api 授权 token   | 部分平台叫 secret key,**反馈粘贴时删除**                                                                                                                                                |
-|  dns   |       string       |    No    | `"dnspod"`  |     dns 服务商     | 阿里 DNS 为 `alidns`,Cloudflare 为 `cloudflare`,dns.com 为 `dnscom`,DNSPOD 国内为 `dnspod`,DNSPOD 国际为 `dnspod_com`,HE.net 为 `he`,华为云为 `huaweidns`,腾讯云为 `tencentcloud`,No-IP 为 `noip`,自定义回调为 `callback`。部分服务商有[详细配置文档](doc/providers/) |
+|  dns   |       string       |    No    | `"dnspod"`  |     dns 服务商     | 阿里 DNS 为 `alidns`,阿里ESA为 `aliesa`,Cloudflare 为 `cloudflare`,dns.com 为 `dnscom`,DNSPOD 国内为 `dnspod`,DNSPOD 国际为 `dnspod_com`,HE.net 为 `he`,华为云为 `huaweidns`,腾讯云为 `tencentcloud`,No-IP 为 `noip`,自定义回调为 `callback`。部分服务商有[详细配置文档](doc/providers/) |
 |  ipv4  |       array        |    No    |    `[]`     |   ipv4 域名列表    | 为 `[]` 时,不会获取和更新 IPv4 地址                                                                                                                                                     |
 |  ipv6  |       array        |    No    |    `[]`     |   ipv6 域名列表    | 为 `[]` 时,不会获取和更新 IPv6 地址                                                                                                                                                     |
 | index4 | string\|int\|array |    No    | `"default"` |   ipv4 获取方式    | 可设置 `网卡`、`内网`、`公网`、`正则` 等方式                                                                                                                                             |
@@ -223,7 +225,7 @@ python -m ddns -c /path/to/config.json
   "$schema": "https://ddns.newfuture.cc/schema/v4.0.json",
   "id": "12345",
   "token": "mytokenkey",
-  "dns": "dnspod 或 dnspod_com 或 alidns 或 dnscom 或 cloudflare 或 he 或 huaweidns 或 tencentcloud 或 noip 或 callback",
+  "dns": "dnspod 或 dnspod_com 或 alidns 或 aliesa 或 dnscom 或 cloudflare 或 he 或 huaweidns 或 tencentcloud 或 noip 或 callback",
   "ipv4": ["ddns.newfuture.cc", "ipv4.ddns.newfuture.cc"],
   "ipv6": ["ddns.newfuture.cc", "ipv6.ddns.newfuture.cc"],
   "index4": 0,
@@ -287,6 +289,7 @@ Docker 镜像在无额外参数的情况下,已默认启用每 5 分钟执行
 使用系统自带的 IE 浏览器访问一次对应的 API 即可
 
 - alidns 打开: <https://alidns.aliyuncs.com>
+- aliesa 打开: <https://esa.cn-hangzhou.aliyuncs.com>
 - cloudflare 打开: <https://api.cloudflare.com>
 - dns.com 打开: <https://www.dns.com>
 - dnspod.cn 打开: <https://dnsapi.cn>

+ 4 - 0
ddns/provider/__init__.py

@@ -1,6 +1,7 @@
 # coding=utf-8
 from ._base import SimpleProvider  # noqa: F401
 from .alidns import AlidnsProvider
+from .aliesa import AliesaProvider
 from .callback import CallbackProvider
 from .cloudflare import CloudflareProvider
 from .dnscom import DnscomProvider
@@ -38,6 +39,9 @@ def get_provider_class(provider_name):
         # aliyun alidns
         "alidns": AlidnsProvider,
         "aliyun": AlidnsProvider,  # 兼容aliyun
+        # aliyun esa
+        "aliesa": AliesaProvider,
+        "esa": AliesaProvider,  # 兼容esa
         # dns.com
         "dnscom": DnscomProvider,
         "51dns": DnscomProvider,  # 兼容旧的51dns

+ 40 - 45
ddns/provider/_base.py

@@ -63,17 +63,30 @@ from hmac import HMAC
 from json import loads as jsondecode, dumps as jsonencode
 from logging import Logger, getLogger  # noqa:F401 # type: ignore[no-redef]
 from os import environ
-from ..util.http import send_http_request
-
-try:  # python 3
-    from urllib.parse import quote, urlencode
-except ImportError:  # python 2
-    from urllib import urlencode, quote  # type: ignore[no-redef,import-untyped]
+from ..util.http import send_http_request, quote, urlencode
 
 TYPE_FORM = "application/x-www-form-urlencoded"
 TYPE_JSON = "application/json"
 
 
+def encode_params(params):
+    # type: (dict|list|str|bytes|None) -> str
+    """
+    编码参数为 URL 查询字符串,参数顺序会排序
+
+    Args:
+        params (dict|list|str|bytes|None): 参数字典、列表或字符串
+    Returns:
+        str: 编码后的查询字符串
+    """
+    if not params:
+        return ""
+    elif isinstance(params, (str, bytes)):
+        return params  # type: ignore[return-value]
+    items = params.items() if isinstance(params, dict) else params
+    return urlencode(sorted(items), doseq=True)
+
+
 def hmac_sha256(key, message):
     # type: (str | bytes, str | bytes) -> HMAC
     """
@@ -188,7 +201,7 @@ class SimpleProvider(object):
     __metaclass__ = ABCMeta
 
     # API endpoint domain (to be defined in subclass)
-    API = ""  # type: str # https://exampledns.com
+    endpoint = ""  # type: str # https://exampledns.com
     # Content-Type for requests (to be defined in subclass)
     content_type = TYPE_FORM  # type: Literal["application/x-www-form-urlencoded"] | Literal["application/json"]
     # 默认 accept 头部, 空则不设置
@@ -275,7 +288,7 @@ class SimpleProvider(object):
             raise ValueError("id must be configured")
         if not self.auth_token:
             raise ValueError("token must be configured")
-        if not self.API:
+        if not self.endpoint:
             raise ValueError("API endpoint must be defined in {}".format(self.__class__.__name__))
 
     def _http(self, method, url, params=None, body=None, queries=None, headers=None):  # noqa: C901
@@ -314,13 +327,13 @@ class SimpleProvider(object):
 
         # 构建查询字符串
         if len(query_params) > 0:
-            url += ("&" if "?" in url else "?") + self._encode(query_params)
+            url += ("&" if "?" in url else "?") + encode_params(query_params)
 
         # 构建完整URL
         if not url.startswith("http://") and not url.startswith("https://"):
-            if not url.startswith("/") and self.API.endswith("/"):
+            if not url.startswith("/") and self.endpoint.endswith("/"):
                 url = "/" + url
-            url = self.API + url
+            url = self.endpoint + url
 
         # 记录请求日志
         self.logger.info("%s %s", method, self._mask_sensitive_data(url))
@@ -330,12 +343,7 @@ class SimpleProvider(object):
         if body:
             if "content-type" not in headers:
                 headers["content-type"] = self.content_type
-            if isinstance(body, (str, bytes)):
-                body_data = body
-            elif self.content_type == TYPE_FORM:
-                body_data = self._encode(body)
-            else:
-                body_data = jsonencode(body)
+            body_data = self._encode_body(body)
             self.logger.debug("body:\n%s", self._mask_sensitive_data(body_data))
 
         # 处理headers
@@ -368,7 +376,7 @@ class SimpleProvider(object):
             elif status_code == 401:
                 raise RuntimeError("认证失败 [401]: " + response.reason)
             elif status_code == 403:
-                raise RuntimeError("权限不足 [403]: " + response.reason)
+                raise RuntimeError("禁止访问 [403]: " + response.reason)
             else:
                 raise RuntimeError("服务器错误 [{}]: {}".format(status_code, response.reason))
 
@@ -382,36 +390,23 @@ class SimpleProvider(object):
             self.logger.error("fail to decode response: %s", e)
         return res
 
-    @staticmethod
-    def _encode(params):
-        # type: (dict|list|str|bytes|None) -> str
+    def _encode_body(self, data):
+        # type: (dict | list | str | bytes | None) -> str
         """
-        编码参数为 URL 查询字符串
-
+        自动编码数据为字符串或字节, 根据 content_type 选择编码方式。
         Args:
-            params (dict|list|str|bytes|None): 参数字典、列表或字符串
-        Returns:
-            str: 编码后的查询字符串
-        """
-        if not params:
-            return ""
-        elif isinstance(params, (str, bytes)):
-            return params  # type: ignore[return-value]
-        return urlencode(params, doseq=True)
-
-    @staticmethod
-    def _quote(data, safe="/"):
-        # type: (str, str) -> str
-        """
-        对字符串进行 URL 编码
-
-        Args:
-            data (str): 待编码字符串
+            data (dict | list | str | bytes | None): 待编码数据
 
         Returns:
-            str: 编码后的字符串
+            str | bytes | None: 编码后的数据
         """
-        return quote(data, safe=safe)
+        if isinstance(data, (str, bytes)):
+            return data  # type: ignore[return-value]
+        if not data:
+            return ""
+        if self.content_type == TYPE_FORM:
+            return encode_params(data)
+        return jsonencode(data)
 
     def _mask_sensitive_data(self, data):
         # type: (str | bytes | None) -> str | bytes | None
@@ -472,7 +467,7 @@ class BaseProvider(SimpleProvider):
         """
         domain = domain.lower()
         self.logger.info("%s => %s(%s)", domain, value, record_type)
-        sub, main = split_custom_domain(domain)
+        sub, main = _split_custom_domain(domain)
         try:
             if sub is not None:
                 # 使用自定义分隔符格式
@@ -619,7 +614,7 @@ class BaseProvider(SimpleProvider):
         return None, None, main
 
 
-def split_custom_domain(domain):
+def _split_custom_domain(domain):
     # type: (str) -> tuple[str | None, str]
     """
     拆分支持 ~ 或 + 的自定义格式域名为 (子域, 主域)

+ 28 - 12
ddns/provider/alidns.py

@@ -5,25 +5,36 @@ AliDNS API
 @author: NewFuture
 """
 
-from ._base import TYPE_FORM, BaseProvider, hmac_sha256_authorization, sha256_hash, join_domain
+from ._base import TYPE_FORM, BaseProvider, hmac_sha256_authorization, sha256_hash, join_domain, encode_params
 from time import strftime, gmtime, time
 
 
-class AlidnsProvider(BaseProvider):
-    API = "https://alidns.aliyuncs.com"
-    content_type = TYPE_FORM  # 阿里云DNS API使用表单格式
+class AliBaseProvider(BaseProvider):
+    """阿里云基础Provider,提供通用的_request方法"""
 
+    endpoint = "https://alidns.aliyuncs.com"
+    content_type = TYPE_FORM  # 阿里云DNS API使用表单格式
     api_version = "2015-01-09"  # API版本,v3签名需要
 
-    def _request(self, action, **params):
-        # type: (str, **(str | int | bytes | bool | None)) -> dict
+    def _request(self, action, method="POST", **params):
+        # type: (str, str, **(Any)) -> dict
         """Aliyun v3 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
         params = {k: v for k, v in params.items() if v is not None}
-        body_content = self._encode(params) if len(params) > 0 else ""
+
+        if method in ("GET", "DELETE"):
+            # For GET and DELETE requests, parameters go in query string
+            query_string = encode_params(params) if len(params) > 0 else ""
+            body_content = ""
+        else:
+            # For POST requests, parameters go in body
+            body_content = self._encode_body(params)
+            query_string = ""
+
+        path = "/"
         content_hash = sha256_hash(body_content)
         # 构造请求头部
         headers = {
-            "host": self.API.split("://", 1)[1].strip("/"),
+            "host": self.endpoint.split("://", 1)[1].strip("/"),
             "content-type": self.content_type,
             "x-acs-action": action,
             "x-acs-content-sha256": content_hash,
@@ -35,9 +46,9 @@ class AlidnsProvider(BaseProvider):
         # 使用通用签名函数
         authorization = hmac_sha256_authorization(
             secret_key=self.auth_token,
-            method="POST",
-            path="/",
-            query="",
+            method=method,
+            path=path,
+            query=query_string,
             headers=headers,
             body_hash=content_hash,
             signing_string_format="ACS3-HMAC-SHA256\n{HashedCanonicalRequest}",
@@ -47,7 +58,12 @@ class AlidnsProvider(BaseProvider):
         )
         headers["Authorization"] = authorization
         # 对于v3签名的RPC API,参数在request body中
-        return self._http("POST", "/", body=body_content, headers=headers)
+        path = path if not query_string else path + "?" + format(query_string)
+        return self._http(method, path, body=body_content, headers=headers)
+
+
+class AlidnsProvider(AliBaseProvider):
+    """阿里云DNS Provider"""
 
     def _split_zone_and_sub(self, domain):
         # type: (str) -> tuple[str | None, str | None, str]

+ 128 - 0
ddns/provider/aliesa.py

@@ -0,0 +1,128 @@
+# coding=utf-8
+"""
+AliESA API
+阿里云边缘安全加速(ESA) DNS 解析操作库
+@author: NewFuture, GitHub Copilot
+"""
+
+from .alidns import AliBaseProvider
+from ._base import join_domain, TYPE_JSON
+
+
+class AliesaProvider(AliBaseProvider):
+    """阿里云边缘安全加速(ESA) DNS Provider"""
+
+    endpoint = "https://esa.cn-hangzhou.aliyuncs.com"
+    api_version = "2024-09-10"  # ESA API版本
+    content_type = TYPE_JSON
+    remark = "Managed by DDNS"  # ESA comment 不能包含符号
+
+    def _query_zone_id(self, domain):
+        # type: (str) -> str | None
+        """
+        查询站点ID
+        https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-listsites
+        """
+        res = self._request(method="GET", action="ListSites", SiteName=domain, PageSize=500)
+        sites = res.get("Sites", [])
+
+        for site in sites:
+            if site.get("SiteName") == domain:
+                site_id = site.get("SiteId")
+                self.logger.debug("Found site ID %s for domain %s", site_id, domain)
+                return site_id
+
+        self.logger.error("Site not found for domain: %s", domain)
+        return None
+
+    def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
+        # type: (str, str, str, str, str | None, dict) -> dict | None
+        """
+        查询DNS记录
+        https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-listrecords
+        """
+        full_domain = join_domain(subdomain, main_domain)
+        res = self._request(
+            method="GET",
+            action="ListRecords",
+            SiteId=int(zone_id),
+            RecordName=full_domain,
+            Type=self._get_type(record_type),
+            RecordMatchType="exact",  # 精确匹配
+            PageSize=100,
+        )
+
+        records = res.get("Records", [])
+        if len(records) == 0:
+            self.logger.warning("No records found for [%s] with %s <%s>", zone_id, full_domain, record_type)
+            return None
+
+        # 返回第一个匹配的记录
+        record = records[0]
+        self.logger.debug("Found record: %s", record)
+        return record
+
+    def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
+        # type: (str, str, str, str, str, int | str | None, str | None, dict) -> bool
+        """
+        创建DNS记录
+        https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-createrecord
+        """
+        full_domain = join_domain(subdomain, main_domain)
+        extra["Comment"] = extra.get("Comment", self.remark)
+        extra["BizName"] = extra.get("BizName", "web")
+        extra["Proxied"] = extra.get("Proxied", True)
+        data = self._request(
+            method="POST",
+            action="CreateRecord",
+            SiteId=int(zone_id),
+            RecordName=full_domain,
+            Type=self._get_type(record_type),
+            Data={"Value": value},
+            Ttl=ttl or 1,
+            **extra
+        )
+
+        if data and data.get("RecordId"):
+            self.logger.info("Record created: %s", data)
+            return True
+
+        self.logger.error("Failed to create record: %s", data)
+        return False
+
+    def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
+        # type: (str, dict, str, str, int | str | None, str | None, dict) -> bool
+        """
+        更新DNS记录
+        https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-updaterecord
+        """
+        # 检查是否需要更新
+        if (
+            old_record.get("Data", {}).get("Value") == value
+            and old_record.get("RecordType") == self._get_type(record_type)
+            and (not ttl or old_record.get("Ttl") == ttl)
+        ):
+            self.logger.warning("No changes detected, skipping update for record: %s", old_record.get("RecordName"))
+            return True
+
+        extra["Comment"] = extra.get("Comment", self.remark)
+        extra["Proxied"] = extra.get("Proxied", old_record.get("Proxied"))
+        data = self._request(
+            method="POST",
+            action="UpdateRecord",
+            RecordId=old_record.get("RecordId"),
+            Data={"Value": value},
+            Ttl=ttl,
+            **extra
+        )
+
+        if data and data.get("RecordId"):
+            self.logger.info("Record updated: %s", data)
+            return True
+
+        self.logger.error("Failed to update record: %s", data)
+        return False
+
+    def _get_type(self, record_type):
+        # type: (str) -> str
+        return "A/AAAA" if record_type in ("A", "AAAA") else record_type

+ 1 - 1
ddns/provider/callback.py

@@ -16,7 +16,7 @@ class CallbackProvider(SimpleProvider):
     Generic custom callback provider, supports GET/POST arbitrary API.
     """
 
-    API = ""  # CallbackProvider uses auth_id as URL, no fixed API endpoint
+    endpoint = ""  # CallbackProvider uses auth_id as URL, no fixed API endpoint
     content_type = TYPE_JSON
     decode_response = False  # Callback response is not JSON, it's a custom response
 

+ 1 - 1
ddns/provider/cloudflare.py

@@ -8,7 +8,7 @@ from ._base import BaseProvider, TYPE_JSON, join_domain
 
 
 class CloudflareProvider(BaseProvider):
-    API = "https://api.cloudflare.com"
+    endpoint = "https://api.cloudflare.com"
     content_type = TYPE_JSON
 
     def _validate(self):

+ 3 - 3
ddns/provider/dnscom.py

@@ -5,7 +5,7 @@ www.51dns.com (原dns.com)
 @author: Bigjin<[email protected]>, NewFuture
 """
 
-from ._base import BaseProvider, TYPE_FORM
+from ._base import BaseProvider, TYPE_FORM, encode_params
 from hashlib import md5
 from time import time
 
@@ -16,7 +16,7 @@ class DnscomProvider(BaseProvider):
     https://www.51dns.com/document/api/index.html
     """
 
-    API = "https://www.51dns.com"
+    endpoint = "https://www.51dns.com"
     content_type = TYPE_FORM
 
     def _validate(self):
@@ -35,7 +35,7 @@ class DnscomProvider(BaseProvider):
                 "timestamp": time(),  # 时间戳
             }
         )
-        query = self._encode(sorted(params.items()))
+        query = encode_params(params)
         sign = md5((query + self.auth_token).encode("utf-8")).hexdigest()
         params["hash"] = sign
         return params

+ 1 - 1
ddns/provider/dnspod.py

@@ -14,7 +14,7 @@ class DnspodProvider(BaseProvider):
     DNSPOD 接口解析操作库
     """
 
-    API = "https://dnsapi.cn"
+    endpoint = "https://dnsapi.cn"
     content_type = TYPE_FORM
 
     DefaultLine = "默认"

+ 1 - 1
ddns/provider/dnspod_com.py

@@ -14,5 +14,5 @@ class DnspodComProvider(DnspodProvider):
     This class extends the DnspodProvider to use the global DNSPOD API.
     """
 
-    API = "https://api.dnspod.com"
+    endpoint = "https://api.dnspod.com"
     DefaultLine = "default"

+ 1 - 1
ddns/provider/he.py

@@ -8,7 +8,7 @@ from ._base import SimpleProvider, TYPE_FORM
 
 
 class HeProvider(SimpleProvider):
-    API = "https://dyn.dns.he.net"
+    endpoint = "https://dyn.dns.he.net"
     content_type = TYPE_FORM
     accept = None  # he.net does not require a specific Accept header
     decode_response = False  # he.net response is plain text, not JSON

+ 5 - 6
ddns/provider/huaweidns.py

@@ -5,13 +5,12 @@ HuaweiDNS API
 @author: NewFuture
 """
 
-from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash, join_domain
-from json import dumps as jsonencode
+from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash, join_domain, encode_params
 from time import strftime, gmtime
 
 
 class HuaweiDNSProvider(BaseProvider):
-    API = "https://dns.myhuaweicloud.com"
+    endpoint = "https://dns.myhuaweicloud.com"
     content_type = TYPE_JSON
     algorithm = "SDK-HMAC-SHA256"
 
@@ -32,16 +31,16 @@ class HuaweiDNSProvider(BaseProvider):
         # type: (str, str, **Any) -> dict
         params = {k: v for k, v in params.items() if v is not None}
         if method.upper() == "GET" or method.upper() == "DELETE":
-            query = self._encode(sorted(params.items()))
+            query = encode_params(params)
             body = ""
         else:
             query = ""
-            body = jsonencode(params)
+            body = self._encode_body(params)
 
         now = strftime("%Y%m%dT%H%M%SZ", gmtime())
         headers = {
             "content-type": self.content_type,
-            "host": self.API.split("://", 1)[1].strip("/"),
+            "host": self.endpoint.split("://", 1)[1].strip("/"),
             "X-Sdk-Date": now,
         }
 

+ 14 - 27
ddns/provider/noip.py

@@ -17,7 +17,7 @@ class NoipProvider(SimpleProvider):
     No-IP update protocol.
     """
 
-    API = "https://dynupdate.no-ip.com"
+    endpoint = "https://dynupdate.no-ip.com"
     content_type = TYPE_FORM
     accept = None  # No-IP returns plain text response
     decode_response = False  # Response is plain text, not JSON
@@ -31,8 +31,7 @@ class NoipProvider(SimpleProvider):
         if not self.auth_token:
             raise ValueError("No-IP requires password as 'token'")
 
-    def set_record(self, domain, value, record_type="A", ttl=None,
-                   line=None, **extra):
+    def set_record(self, domain, value, record_type="A", ttl=None, line=None, **extra):
         """
         Update DNS record using No-IP Dynamic Update API
 
@@ -58,60 +57,48 @@ class NoipProvider(SimpleProvider):
         self.logger.info("%s => %s(%s)", domain, value, record_type)
 
         # Prepare request parameters
-        params = {
-            "hostname": domain,
-            "myip": value
-        }
+        params = {"hostname": domain, "myip": value}
 
         # Prepare HTTP Basic Authentication headers
         auth_string = "{0}:{1}".format(self.auth_id, self.auth_token)
-        if hasattr(auth_string, 'encode'):  # Python 3
-            auth_bytes = auth_string.encode('utf-8')
+        if not isinstance(auth_string, bytes):  # Python 3
+            auth_bytes = auth_string.encode("utf-8")
         else:  # Python 2
             auth_bytes = auth_string
 
-        auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
+        auth_b64 = base64.b64encode(auth_bytes).decode("ascii")
         headers = {
             "Authorization": "Basic {0}".format(auth_b64),
-            "User-Agent": "DDNS/{0} ([email protected])".format(
-                self.version)
+            "User-Agent": "DDNS/{0} ([email protected])".format(self.version),
         }
 
         try:
             # Use GET request as it's the most common method for DDNS
-            response = self._http("GET", "/nic/update", queries=params,
-                                  headers=headers)
+            response = self._http("GET", "/nic/update", queries=params, headers=headers)
 
             if response is not None:
                 response_str = str(response).strip()
                 self.logger.info("No-IP API response: %s", response_str)
 
                 # Check for successful responses
-                if (response_str.startswith("good") or
-                        response_str.startswith("nochg")):
+                if response_str.startswith("good") or response_str.startswith("nochg"):
                     return True
                 elif response_str.startswith("nohost"):
-                    self.logger.error(
-                        "Hostname %s does not exist under No-IP account",
-                        domain)
+                    self.logger.error("Hostname %s does not exist under No-IP account", domain)
                 elif response_str.startswith("badauth"):
-                    self.logger.error(
-                        "Invalid No-IP username/password combination")
+                    self.logger.error("Invalid No-IP username/password combination")
                 elif response_str.startswith("badagent"):
                     self.logger.error("No-IP client disabled")
                 elif response_str.startswith("!donator"):
-                    self.logger.error(
-                        "Feature not available for No-IP free account")
+                    self.logger.error("Feature not available for No-IP free account")
                 elif response_str.startswith("abuse"):
                     self.logger.error("No-IP account blocked due to abuse")
                 else:
-                    self.logger.error("Unexpected No-IP API response: %s",
-                                      response_str)
+                    self.logger.error("Unexpected No-IP API response: %s", response_str)
             else:
                 self.logger.error("Empty response from No-IP API")
 
         except Exception as e:
-            self.logger.error("Error updating No-IP record for %s: %s",
-                              domain, e)
+            self.logger.error("Error updating No-IP record for %s: %s", domain, e)
 
         return False

+ 3 - 4
ddns/provider/tencentcloud.py

@@ -7,7 +7,6 @@ Tencent Cloud DNSPod API
 """
 from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash, hmac_sha256
 from time import time, strftime, gmtime
-from json import dumps as jsonencode
 
 
 class TencentCloudProvider(BaseProvider):
@@ -19,7 +18,7 @@ class TencentCloudProvider(BaseProvider):
     Documentation: https://cloud.tencent.com/document/api/1427
     """
 
-    API = "https://dnspod.tencentcloudapi.com"
+    endpoint = "https://dnspod.tencentcloudapi.com"
     content_type = TYPE_JSON
 
     # 腾讯云 DNSPod API 配置
@@ -42,12 +41,12 @@ class TencentCloudProvider(BaseProvider):
         """
         # 构建请求体
         params = {k: v for k, v in params.items() if v is not None}
-        body = jsonencode(params)
+        body = self._encode_body(params)
 
         # 构建请求头,小写 腾讯云只签名特定头部
         headers = {
             "content-type": self.content_type,
-            "host": self.API.split("://", 1)[1].strip("/"),
+            "host": self.endpoint.split("://", 1)[1].strip("/"),
         }
 
         # 腾讯云特殊的密钥派生过程

+ 4 - 6
ddns/util/http.py

@@ -14,12 +14,13 @@ import os
 
 try:  # python 3
     from http.client import HTTPSConnection, HTTPConnection, HTTPException
-    from urllib.parse import urlparse
+    from urllib.parse import quote, urlencode, urlparse
 except ImportError:  # python 2
     from httplib import HTTPSConnection, HTTPConnection, HTTPException  # type: ignore[no-redef]
     from urlparse import urlparse  # type: ignore[no-redef]
+    from urllib import urlencode, quote  # type: ignore[no-redef]
 
-__all__ = ["send_http_request", "HttpResponse"]
+__all__ = ["send_http_request", "HttpResponse", "quote", "urlencode"]
 
 logger = getLogger().getChild(__name__)
 
@@ -123,10 +124,7 @@ def _load_system_ca_certs(ssl_context):
                 loaded_count += 1
                 logger.debug("Loaded CA certificates from: %s", ca_path)
             except Exception as e:
-                logger.debug("Failed to load CA certificates from %s: %s", ca_path, e)
-
-    if loaded_count > 0:
-        logger.debug("Successfully loaded CA certificates from %d locations", loaded_count)
+                logger.info("Failed to load CA certificates from %s: %s", ca_path, e)
 
 
 def _close_connection(conn):

+ 2 - 1
doc/providers/README.md

@@ -10,7 +10,9 @@
 |----------|--------|----------|----------|------|
 | `dnspod` | [DNSPod 中国版](https://www.dnspod.cn/) | [dnspod.md](dnspod.md) | [dnspod.en.md](dnspod.en.md) | 国内最大DNS服务商 |
 | `alidns` | [阿里云 DNS](https://dns.console.aliyun.com/) | [alidns.md](alidns.md) | [alidns.en.md](alidns.en.md) | 阿里云生态集成 |
+| `aliesa` | [阿里云 ESA](https://esa.console.aliyun.com/) | [aliesa.md](aliesa.md) | [aliesa.en.md](aliesa.en.md) | 阿里云边缘安全加速 |
 | `tencentcloud` | [腾讯云 DNSPod](https://cloud.tencent.com/product/cns) | [tencentcloud.md](tencentcloud.md) | [tencentcloud.en.md](tencentcloud.en.md) | 腾讯云DNSPod服务 |
+| `cloudflare` | [Cloudflare](https://www.cloudflare.com/) | [cloudflare.md](cloudflare.md) | [cloudflare.en.md](cloudflare.en.md) | 全球CDN和DNS服务 |
 | `noip` | [No-IP](https://www.noip.com/) | [noip.md](noip.md) | [noip.en.md](noip.en.md) | 流行的动态DNS服务 |
 | `callback` | 自定义API (Webhook) | [callback.md](callback.md) | [callback.en.md](callback.en.md) | 自定义HTTP API |
 
@@ -18,7 +20,6 @@
 
 | Provider | 服务商 | 官方文档 | 状态 |
 |----------|--------|----------|------|
-| `cloudflare` | [Cloudflare](https://www.cloudflare.com/) | [API文档](https://developers.cloudflare.com/api/) | ⚠️ 缺少充分测试 |
 | `dnscom` | [DNS.COM](https://www.dns.com/) | [API文档](https://www.dns.com/member/apiSet) | ⚠️ 缺少充分测试 |
 | `dnspod_com` | [DNSPod 国际版](https://www.dnspod.com/) | [API文档](https://www.dnspod.com/docs/info.html) | 国际版DNSPod |
 | `he` | [HE.net](https://dns.he.net/) | [DDNS文档](https://dns.he.net/docs.html) | ⚠️ 缺少充分测试,不支持自动创建记录 |

+ 133 - 0
doc/providers/aliesa.en.md

@@ -0,0 +1,133 @@
+# Alibaba Cloud Edge Security Acceleration (ESA) Configuration Guide
+
+## Overview
+
+Alibaba Cloud Edge Security Acceleration (ESA) is an edge security acceleration service provided by Alibaba Cloud, supporting CDN acceleration and edge security protection. This DDNS project supports ESA DNS record management through Alibaba Cloud AccessKey.
+
+## Authentication
+
+### AccessKey Authentication
+
+ESA API uses the same AccessKey authentication as other Alibaba Cloud services, requiring AccessKey ID and AccessKey Secret.
+
+```json
+{
+    "id": "your_access_key_id",
+    "token": "your_access_key_secret",
+    "dns": "aliesa"
+}
+```
+
+## Complete Configuration Examples
+
+### Basic Configuration
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx",
+    "dns": "aliesa",
+    "ipv4": ["www.example.com", "api.example.com"],
+    "ipv6": ["ipv6.example.com"]
+}
+```
+
+### Advanced Configuration
+
+```json
+{
+    "id": "LTAI4xxx", 
+    "token": "xxx",
+    "dns": "aliesa",
+    "ipv4": ["www.example.com", "api.example.com"],
+    "ipv6": ["ipv6.example.com"],
+    "ttl": 300
+}
+```
+
+## Optional Parameters
+
+| Parameter | Description | Type | Default | Example |
+|-----------|-------------|------|---------|---------|
+| `ttl` | DNS record TTL value | Integer | 600 | 300 |
+
+## Use Cases
+
+### Dynamic IP CDN Origin
+
+When your NAS or other services act as ESA CDN origin, you can use this DDNS to automatically update origin IP:
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx", 
+    "dns": "aliesa",
+    "ipv4": ["origin.example.com"]
+}
+```
+
+## Permission Requirements
+
+Ensure the Alibaba Cloud account has the following ESA permissions:
+
+- **ESA Site Query Permission**: Used to query site IDs (`esa:ListSites`)
+- **ESA DNS Record Management Permission**: Used to query, create, and update DNS records (`esa:ListRecords`, `esa:CreateRecord`, `esa:UpdateRecord`)
+
+It's recommended to create a dedicated RAM sub-account with only the necessary ESA permissions.
+
+### How to Get Site ID
+
+1. Log in to [Alibaba Cloud ESA Console](https://esa.console.aliyun.com/)
+2. Select the corresponding site
+3. The site ID can be seen in the URL of the site details page, or use API call `ListSites` to get it
+
+## API Limitations
+
+- **Site Count**: Varies by ESA package
+- **DNS Record Count**: Varies by ESA package
+- **API Call Frequency**: Follows Alibaba Cloud ESA API limits
+
+## Troubleshooting
+
+### Common Issues
+
+#### "Site not found for domain"
+
+- Check if the domain has been added to ESA service
+- Confirm domain format is correct (no protocol prefix)
+- Verify AccessKey permissions
+
+#### "Failed to create/update record"
+
+- Check if DNS record type is supported
+- Confirm record value format is correct
+- Verify TTL value is within allowed range
+
+#### "API call failed"
+
+- Check if AccessKey ID and Secret are correct
+- Confirm network connection is normal
+- View detailed error logs
+
+### Debug Mode
+
+Enable debug mode to view detailed API interaction information:
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx",
+    "dns": "aliesa",
+    "debug": true,
+    "ipv4": ["www.example.com"]
+}
+```
+
+## Support and Resources
+
+- [Alibaba Cloud ESA Product Documentation](https://help.aliyun.com/product/122312.html)
+- [Alibaba Cloud ESA API Documentation](https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-overview)
+- [Alibaba Cloud ESA Console](https://esa.console.aliyun.com/)
+- [Alibaba Cloud Technical Support](https://selfservice.console.aliyun.com/ticket)
+
+> It's recommended to use RAM sub-accounts with only necessary ESA permissions to improve security. Regularly rotate AccessKeys to ensure account security.

+ 133 - 0
doc/providers/aliesa.md

@@ -0,0 +1,133 @@
+# 阿里云边缘安全加速 (ESA) 配置指南
+
+## 概述
+
+阿里云边缘安全加速(ESA)是阿里云提供的边缘安全加速服务,支持CDN加速和边缘安全防护。本 DDNS 项目支持通过阿里云AccessKey进行ESA DNS记录的管理。
+
+## 认证方式
+
+### AccessKey 认证
+
+ESA API使用与阿里云其他服务相同的AccessKey认证方式,需要提供AccessKey ID和AccessKey Secret。
+
+```json
+{
+    "id": "your_access_key_id",
+    "token": "your_access_key_secret",
+    "dns": "aliesa"
+}
+```
+
+## 完整配置示例
+
+### 基础配置
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx",
+    "dns": "aliesa",
+    "ipv4": ["www.example.com", "api.example.com"],
+    "ipv6": ["ipv6.example.com"]
+}
+```
+
+### 高级配置
+
+```json
+{
+    "id": "LTAI4xxx", 
+    "token": "xxx",
+    "dns": "aliesa",
+    "ipv4": ["www.example.com", "api.example.com"],
+    "ipv6": ["ipv6.example.com"],
+    "ttl": 300
+}
+```
+
+## 可选参数
+
+| 参数 | 说明 | 类型 | 默认值 | 示例 |
+|------|------|------|--------|------|
+| `ttl` | DNS记录的TTL值 | 整数 | 600 | 300 |
+
+## 使用场景
+
+### 动态IP的CDN源站
+
+当你的NAS或其他服务作为ESA的CDN源站时,可以使用此DDNS来自动更新源站IP:
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx", 
+    "dns": "aliesa",
+    "ipv4": ["origin.example.com"]
+}
+```
+
+## 权限要求
+
+确保使用的阿里云账号具有以下ESA权限:
+
+- **ESA站点查询权限**:用于查询站点ID (`esa:ListSites`)
+- **ESA DNS记录管理权限**:用于查询、创建和更新DNS记录 (`esa:ListRecords`, `esa:CreateRecord`, `esa:UpdateRecord`)
+
+推荐创建专门的RAM子账号并仅授予必要的ESA权限。
+
+### 如何获取站点ID
+
+1. 登录[阿里云ESA控制台](https://esa.console.aliyun.com/)
+2. 选择对应的站点
+3. 在站点详情页面的URL中可以看到站点ID,或使用API调用`ListSites`获取
+
+## API限制
+
+- **站点数量**:根据ESA套餐不同
+- **DNS记录数量**:根据ESA套餐不同
+- **API调用频率**:遵循阿里云ESA API限制
+
+## 故障排除
+
+### 常见问题
+
+#### "Site not found for domain"
+
+- 检查域名是否已添加到ESA服务
+- 确认域名格式正确(不包含协议前缀)
+- 验证AccessKey权限
+
+#### "Failed to create/update record"
+
+- 检查DNS记录类型是否支持
+- 确认记录值格式正确
+- 验证TTL值在允许范围内
+
+#### "API调用失败"
+
+- 检查AccessKey ID和Secret是否正确
+- 确认网络连接正常
+- 查看详细错误日志
+
+### 调试模式
+
+启用调试模式查看详细的API交互信息:
+
+```json
+{
+    "id": "LTAI4xxx",
+    "token": "xxx",
+    "dns": "aliesa",
+    "debug": true,
+    "ipv4": ["www.example.com"]
+}
+```
+
+## 支持与资源
+
+- [阿里云ESA产品文档](https://help.aliyun.com/product/122312.html)
+- [阿里云ESA API文档](https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-overview)
+- [阿里云ESA控制台](https://esa.console.aliyun.com/)
+- [阿里云技术支持](https://selfservice.console.aliyun.com/ticket)
+
+> 建议使用RAM子账号并仅授予必要的ESA权限,以提高安全性。定期轮换AccessKey以确保账号安全。

+ 102 - 0
doc/providers/cloudflare.en.md

@@ -0,0 +1,102 @@
+# Cloudflare DNS Configuration Guide
+
+## Overview
+
+Cloudflare is a leading global CDN and network security service provider. This DDNS project supports automatic DNS record management through the Cloudflare API.
+
+## Authentication Methods
+
+### API Token Authentication (Recommended)
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+### API Key Authentication
+
+```json
+{
+    "id": "[email protected]",
+    "token": "your_global_api_key",
+    "dns": "cloudflare",
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+## Getting Authentication Credentials
+
+### API Token
+
+1. Log in to [Cloudflare Dashboard](https://dash.cloudflare.com/)
+2. Go to "My Profile" → "API Tokens"
+3. Create custom token with permissions:
+   - **Zone:Read** and **DNS:Edit**
+4. Select domains to manage
+
+### Global API Key
+
+1. Log in to [Cloudflare Dashboard](https://dash.cloudflare.com/)
+2. Go to "My Profile" → "API Tokens"
+3. View "Global API Key"
+
+## Configuration Examples
+
+### Basic Configuration
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com", "www.example.com"],
+    "ipv6": ["ddns.example.com"]
+}
+```
+
+### Advanced Configuration
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com"],
+    "ttl": 300,
+    "comment": "Dynamic DNS update"
+}
+```
+
+## Optional Parameters
+
+| Parameter | Description | Default |
+|-----------|-------------|---------|
+| `ttl` | DNS record TTL value | 300 |
+| `comment` | DNS record comment | "DDNS" |
+
+## Troubleshooting
+
+### Common Errors
+
+- **"Invalid API token"** - Check token validity and permissions
+- **"Zone not found"** - Ensure domain is added to Cloudflare
+- **"Record creation failed"** - Check record format and TTL value (60-86400 seconds)
+
+### Debug Mode
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "debug": true,
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+## Resources
+
+- [Cloudflare API Documentation](https://developers.cloudflare.com/api/)
+- [Cloudflare Dashboard](https://dash.cloudflare.com/)
+
+> It's recommended to use API Token instead of Global API Key for better security with finer-grained permissions.

+ 102 - 0
doc/providers/cloudflare.md

@@ -0,0 +1,102 @@
+# Cloudflare DNS 配置指南
+
+## 概述
+
+Cloudflare 是全球领先的 CDN 和网络安全服务提供商。本 DDNS 项目支持通过 Cloudflare API 进行 DNS 记录的自动管理。
+
+## 认证方式
+
+### API Token 认证(推荐)
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+### API Key 认证
+
+```json
+{
+    "id": "[email protected]",
+    "token": "your_global_api_key",
+    "dns": "cloudflare",
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+## 获取认证信息
+
+### API Token
+
+1. 登录 [Cloudflare 控制台](https://dash.cloudflare.com/)
+2. 进入「我的个人资料」→「API 令牌」
+3. 创建自定义令牌,配置权限:
+   - **区域:读取** 和 **DNS:编辑**
+4. 选择要管理的域名
+
+### Global API Key
+
+1. 登录 [Cloudflare 控制台](https://dash.cloudflare.com/)
+2. 进入「我的个人资料」→「API 令牌」
+3. 查看「Global API Key」
+
+## 配置示例
+
+### 基础配置
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com", "www.example.com"],
+    "ipv6": ["ddns.example.com"]
+}
+```
+
+### 高级配置
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "ipv4": ["ddns.example.com"],
+    "ttl": 300,
+    "comment": "动态DNS更新"
+}
+```
+
+## 可选参数
+
+| 参数 | 说明 | 默认值 |
+|------|------|--------|
+| `ttl` | DNS记录的TTL值 | 300 |
+| `comment` | DNS记录备注 | "DDNS" |
+
+## 故障排除
+
+### 常见错误
+
+- **"Invalid API token"** - 检查 Token 是否正确及权限
+- **"Zone not found"** - 确认域名已添加到 Cloudflare
+- **"Record creation failed"** - 检查记录格式和 TTL 值(60-86400秒)
+
+### 调试模式
+
+```json
+{
+    "dns": "cloudflare",
+    "token": "your_api_token_here",
+    "debug": true,
+    "ipv4": ["ddns.example.com"]
+}
+```
+
+## 相关链接
+
+- [Cloudflare API 文档](https://developers.cloudflare.com/api/)
+- [Cloudflare 控制台](https://dash.cloudflare.com/)
+
+> 建议使用 API Token 而非 Global API Key,权限更精细更安全。

+ 2 - 1
schema/v4.0.json

@@ -34,7 +34,7 @@
       "$id": "/properties/dns",
       "type": "string",
       "title": "DNS Provider",
-      "description": "dns服务商:阿里为alidns,DNS.COM为dnscom,DNSPOD国际版为(dnspod_com),cloudflare,HE.net为he,华为DNS为huaweidns,自定义回调为callback",
+      "description": "dns服务商:阿里为alidns,阿里ESA为aliesa,DNS.COM为dnscom,DNSPOD国际版为(dnspod_com),cloudflare,HE.net为he,华为DNS为huaweidns,自定义回调为callback",
       "default": "dnspod",
       "examples": [
         "dnspod",
@@ -43,6 +43,7 @@
       ],
       "enum": [
         "alidns",
+        "aliesa",
         "callback",
         "cloudflare",
         "debug",

+ 12 - 0
tests/__init__.py

@@ -6,7 +6,19 @@ DDNS Tests Package
 import sys
 import os
 
+try:
+    from unittest.mock import patch, MagicMock
+except ImportError:  # Python 2
+    from mock import patch, MagicMock  # type: ignore
+
+__all__ = ["patch", "MagicMock"]
+
 # 添加当前目录到 Python 路径,这样就可以直接导入 test_base
 current_dir = os.path.dirname(__file__)
 if current_dir not in sys.path:
     sys.path.insert(0, current_dir)
+
+# 添加上级目录到 Python 路径,这样就可以导入 ddns 模块
+parent_dir = os.path.dirname(current_dir)
+if parent_dir not in sys.path:
+    sys.path.insert(0, parent_dir)

+ 1 - 12
tests/base_test.py

@@ -4,19 +4,8 @@ Base test utilities and common imports for all provider tests
 
 @author: Github Copilot
 """
-
 import unittest
-import sys
-import os
-
-# Add the parent directory to the path so we can import the ddns module
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-try:
-    from unittest.mock import patch, MagicMock
-except ImportError:
-    # Python 2.7 compatibility
-    from mock import patch, MagicMock  # type: ignore
+from __init__ import patch, MagicMock  # noqa: F401 # Ensure the package is initialized
 
 
 class BaseProviderTestCase(unittest.TestCase):

+ 78 - 79
tests/test_config_ssl.py

@@ -9,15 +9,7 @@ import unittest
 import sys
 import os
 
-# Add parent directory to path for imports
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-try:
-    from unittest.mock import patch
-except ImportError:
-    # Python 2.7 compatibility
-    from mock import patch  # type: ignore
-
+from __init__ import patch
 from ddns.util.config import init_config, get_config  # noqa: E402
 from ddns.__init__ import __version__, __description__, __doc__, build_date  # noqa
 from ddns.provider._base import SimpleProvider  # noqa: E402
@@ -26,10 +18,9 @@ from ddns.provider._base import SimpleProvider  # noqa: E402
 class _TestSSLProvider(SimpleProvider):
     """Test provider to verify SSL configuration"""
 
-    API = "https://api.example.com"
+    endpoint = "https://api.example.com"
 
-    def set_record(self, domain, value, record_type="A", ttl=None,
-                   line=None, **extra):
+    def set_record(self, domain, value, record_type="A", ttl=None, line=None, **extra):
         return True
 
 
@@ -41,13 +32,13 @@ class TestSSLConfiguration(unittest.TestCase):
         # Clear global state before each test
         import ddns.util.config
         from argparse import Namespace
+
         ddns.util.config.__cli_args = Namespace()
         ddns.util.config.__config = {}
         # Clear environment variables that might affect tests
         # Check both uppercase and lowercase variants
         for key in list(os.environ.keys()):
-            if (key.upper().startswith('DDNS_') or
-                    key.lower().startswith('ddns_')):
+            if key.upper().startswith("DDNS_") or key.lower().startswith("ddns_"):
                 del os.environ[key]
 
     def tearDown(self):
@@ -55,137 +46,145 @@ class TestSSLConfiguration(unittest.TestCase):
         # Clear global state after each test
         import ddns.util.config
         from argparse import Namespace
+
         ddns.util.config.__cli_args = Namespace()
         ddns.util.config.__config = {}
         # Clear environment variables that might affect future tests
         for key in list(os.environ.keys()):
-            if (key.upper().startswith('DDNS_') or
-                    key.lower().startswith('ddns_')):
+            if key.upper().startswith("DDNS_") or key.lower().startswith("ddns_"):
                 del os.environ[key]
 
     def test_cli_ssl_false(self):
         """Test SSL configuration via CLI argument --ssl false"""
-        args = ['test', '--token', 'test', '--ssl', 'false']
-        with patch.object(sys, 'argv', args):
+        args = ["test", "--token", "test", "--ssl", "false"]
+        with patch.object(sys, "argv", args):
             init_config(__description__, __doc__, __version__, build_date)
-            ssl_config = get_config('ssl')
-            self.assertEqual(ssl_config, 'false')
+            ssl_config = get_config("ssl")
+            self.assertEqual(ssl_config, "false")
 
     def test_cli_ssl_true(self):
         """Test SSL configuration via CLI argument --ssl true"""
-        args = ['test', '--token', 'test', '--ssl', 'true']
-        with patch.object(sys, 'argv', args):
+        args = ["test", "--token", "test", "--ssl", "true"]
+        with patch.object(sys, "argv", args):
             init_config(__description__, __doc__, __version__, build_date)
-            ssl_config = get_config('ssl')
-            self.assertEqual(ssl_config, 'true')
+            ssl_config = get_config("ssl")
+            self.assertEqual(ssl_config, "true")
 
     def test_cli_ssl_auto(self):
         """Test SSL configuration via CLI argument --ssl auto"""
-        args = ['test', '--token', 'test', '--ssl', 'auto']
-        with patch.object(sys, 'argv', args):
+        args = ["test", "--token", "test", "--ssl", "auto"]
+        with patch.object(sys, "argv", args):
             init_config(__description__, __doc__, __version__, build_date)
-            ssl_config = get_config('ssl')
-            self.assertEqual(ssl_config, 'auto')
+            ssl_config = get_config("ssl")
+            self.assertEqual(ssl_config, "auto")
 
     def test_cli_ssl_custom_path(self):
         """Test SSL configuration via CLI argument --ssl /path/to/cert.pem"""
-        args = ['test', '--token', 'test', '--ssl', '/path/to/cert.pem']
-        with patch.object(sys, 'argv', args):
+        args = ["test", "--token", "test", "--ssl", "/path/to/cert.pem"]
+        with patch.object(sys, "argv", args):
             init_config(__description__, __doc__, __version__, build_date)
-            ssl_config = get_config('ssl')
-            self.assertEqual(ssl_config, '/path/to/cert.pem')
+            ssl_config = get_config("ssl")
+            self.assertEqual(ssl_config, "/path/to/cert.pem")
 
     def test_env_ssl_false(self):
         """Test SSL configuration via environment variable DDNS_SSL=false"""
         # Ensure completely clean environment
-        clean_env = {k: v for k, v in os.environ.items()
-                     if not (k.upper().startswith('DDNS_') or
-                             k.lower().startswith('ddns_'))}
-        clean_env.update({'DDNS_SSL': 'false', 'DDNS_TOKEN': 'test'})
+        clean_env = {
+            k: v
+            for k, v in os.environ.items()
+            if not (k.upper().startswith("DDNS_") or k.lower().startswith("ddns_"))
+        }
+        clean_env.update({"DDNS_SSL": "false", "DDNS_TOKEN": "test"})
 
         with patch.dict(os.environ, clean_env, clear=True):
-            with patch.object(sys, 'argv', ['test']):
+            with patch.object(sys, "argv", ["test"]):
                 init_config(__description__, __doc__, __version__, build_date)
-                ssl_config = get_config('ssl')
-                self.assertEqual(ssl_config, 'false')
+                ssl_config = get_config("ssl")
+                self.assertEqual(ssl_config, "false")
 
     def test_env_ssl_true(self):
         """Test SSL configuration via environment variable DDNS_SSL=true"""
         # Ensure completely clean environment
-        clean_env = {k: v for k, v in os.environ.items()
-                     if not (k.upper().startswith('DDNS_') or
-                             k.lower().startswith('ddns_'))}
-        clean_env.update({'DDNS_SSL': 'true', 'DDNS_TOKEN': 'test'})
+        clean_env = {
+            k: v
+            for k, v in os.environ.items()
+            if not (k.upper().startswith("DDNS_") or k.lower().startswith("ddns_"))
+        }
+        clean_env.update({"DDNS_SSL": "true", "DDNS_TOKEN": "test"})
 
         with patch.dict(os.environ, clean_env, clear=True):
-            with patch.object(sys, 'argv', ['test']):
+            with patch.object(sys, "argv", ["test"]):
                 init_config(__description__, __doc__, __version__, build_date)
-                ssl_config = get_config('ssl')
-                self.assertEqual(ssl_config, 'true')
+                ssl_config = get_config("ssl")
+                self.assertEqual(ssl_config, "true")
 
     def test_env_ssl_auto(self):
         """Test SSL configuration via environment variable DDNS_SSL=auto"""
         # Ensure completely clean environment
-        clean_env = {k: v for k, v in os.environ.items()
-                     if not (k.upper().startswith('DDNS_') or
-                             k.lower().startswith('ddns_'))}
-        clean_env.update({'DDNS_SSL': 'auto', 'DDNS_TOKEN': 'test'})
+        clean_env = {
+            k: v
+            for k, v in os.environ.items()
+            if not (k.upper().startswith("DDNS_") or k.lower().startswith("ddns_"))
+        }
+        clean_env.update({"DDNS_SSL": "auto", "DDNS_TOKEN": "test"})
 
         with patch.dict(os.environ, clean_env, clear=True):
-            with patch.object(sys, 'argv', ['test']):
+            with patch.object(sys, "argv", ["test"]):
                 init_config(__description__, __doc__, __version__, build_date)
-                ssl_config = get_config('ssl')
-                self.assertEqual(ssl_config, 'auto')
+                ssl_config = get_config("ssl")
+                self.assertEqual(ssl_config, "auto")
 
     def test_cli_overrides_env(self):
         """Test that CLI argument overrides environment variable"""
         # Ensure completely clean environment
-        clean_env = {k: v for k, v in os.environ.items()
-                     if not (k.upper().startswith('DDNS_') or
-                             k.lower().startswith('ddns_'))}
-        clean_env.update({'DDNS_SSL': 'false', 'DDNS_TOKEN': 'test'})
+        clean_env = {
+            k: v
+            for k, v in os.environ.items()
+            if not (k.upper().startswith("DDNS_") or k.lower().startswith("ddns_"))
+        }
+        clean_env.update({"DDNS_SSL": "false", "DDNS_TOKEN": "test"})
 
         with patch.dict(os.environ, clean_env, clear=True):
-            with patch.object(sys, 'argv', ['test', '--ssl', 'true']):
+            with patch.object(sys, "argv", ["test", "--ssl", "true"]):
                 init_config(__description__, __doc__, __version__, build_date)
-                ssl_config = get_config('ssl')
-                self.assertEqual(ssl_config, 'true')
+                ssl_config = get_config("ssl")
+                self.assertEqual(ssl_config, "true")
 
     def test_default_ssl_config(self):
         """Test default SSL configuration when none provided"""
-        with patch.object(sys, 'argv', ['test', '--token', 'test']):
+        with patch.object(sys, "argv", ["test", "--token", "test"]):
             init_config(__description__, __doc__, __version__, build_date)
-            ssl_config = get_config('ssl', 'auto')
-            self.assertEqual(ssl_config, 'auto')
+            ssl_config = get_config("ssl", "auto")
+            self.assertEqual(ssl_config, "auto")
 
     def test_provider_ssl_integration(self):
         """Test that SSL configuration is passed to provider correctly"""
-        provider = _TestSSLProvider('test_id', 'test_token',
-                                    verify_ssl='false')
-        self.assertEqual(provider.verify_ssl, 'false')
+        provider = _TestSSLProvider("test_id", "test_token", verify_ssl="false")
+        self.assertEqual(provider.verify_ssl, "false")
 
-        provider = _TestSSLProvider('test_id', 'test_token', verify_ssl=True)
+        provider = _TestSSLProvider("test_id", "test_token", verify_ssl=True)
         self.assertTrue(provider.verify_ssl)
 
-        cert_path = '/path/to/cert.pem'
-        provider = _TestSSLProvider('test_id', 'test_token',
-                                    verify_ssl=cert_path)
-        self.assertEqual(provider.verify_ssl, '/path/to/cert.pem')
+        cert_path = "/path/to/cert.pem"
+        provider = _TestSSLProvider("test_id", "test_token", verify_ssl=cert_path)
+        self.assertEqual(provider.verify_ssl, "/path/to/cert.pem")
 
     def test_case_insensitive_env_vars(self):
         """Test that environment variables are case insensitive"""
         # Ensure completely clean environment
-        clean_env = {k: v for k, v in os.environ.items()
-                     if not (k.upper().startswith('DDNS_') or
-                             k.lower().startswith('ddns_'))}
-        clean_env.update({'ddns_ssl': 'false', 'ddns_token': 'test'})
+        clean_env = {
+            k: v
+            for k, v in os.environ.items()
+            if not (k.upper().startswith("DDNS_") or k.lower().startswith("ddns_"))
+        }
+        clean_env.update({"ddns_ssl": "false", "ddns_token": "test"})
 
         with patch.dict(os.environ, clean_env, clear=True):
-            with patch.object(sys, 'argv', ['test']):
+            with patch.object(sys, "argv", ["test"]):
                 init_config(__description__, __doc__, __version__, build_date)
-                ssl_config = get_config('ssl')
-                self.assertEqual(ssl_config, 'false')
+                ssl_config = get_config("ssl")
+                self.assertEqual(ssl_config, "false")
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main()

+ 2 - 2
tests/test_provider_alidns.py

@@ -20,7 +20,7 @@ class TestAlidnsProvider(BaseProviderTestCase):
 
     def test_class_constants(self):
         """Test AlidnsProvider class constants"""
-        self.assertEqual(AlidnsProvider.API, "https://alidns.aliyuncs.com")
+        self.assertEqual(AlidnsProvider.endpoint, "https://alidns.aliyuncs.com")
         self.assertEqual(AlidnsProvider.content_type, "application/x-www-form-urlencoded")
         self.assertTrue(AlidnsProvider.decode_response)
 
@@ -29,7 +29,7 @@ class TestAlidnsProvider(BaseProviderTestCase):
         provider = AlidnsProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://alidns.aliyuncs.com")
+        self.assertEqual(provider.endpoint, "https://alidns.aliyuncs.com")
 
     def test_request_basic(self):
         """Test _request method with basic parameters"""

+ 534 - 0
tests/test_provider_aliesa.py

@@ -0,0 +1,534 @@
+# coding=utf-8
+"""
+测试 AliESA Provider
+@author: NewFuture
+"""
+
+import unittest
+from base_test import BaseProviderTestCase
+from ddns.provider.aliesa import AliesaProvider
+
+
+class TestAliesaProvider(BaseProviderTestCase):
+    """AliESA Provider 测试类"""
+
+    def setUp(self):
+        """Setup test provider with mock credentials"""
+        super(TestAliesaProvider, self).setUp()
+        self.provider = AliesaProvider(auth_id="test_access_key", auth_token="test_secret_key")
+
+    def test_class_constants(self):
+        """Test AliesaProvider class constants"""
+        self.assertEqual(self.provider.endpoint, "https://esa.cn-hangzhou.aliyuncs.com")
+        self.assertEqual(self.provider.api_version, "2024-09-10")
+
+    def test_init_with_basic_config(self):
+        """Test AliesaProvider initialization with basic configuration"""
+        provider = AliesaProvider(auth_id="test_access_key", auth_token="test_secret_key")
+        self.assertEqual(provider.auth_id, "test_access_key")
+        self.assertEqual(provider.auth_token, "test_secret_key")
+        self.assertEqual(provider.endpoint, "https://esa.cn-hangzhou.aliyuncs.com")
+
+    def test_init_with_empty_credentials(self):
+        """Test AliesaProvider initialization with empty credentials raises ValueError"""
+        with self.assertRaises(ValueError):
+            AliesaProvider(auth_id="", auth_token="")
+
+    def test_init_with_long_credentials(self):
+        """Test AliesaProvider initialization with long credentials"""
+        long_id = "a" * 50
+        long_token = "b" * 100
+        provider = AliesaProvider(auth_id=long_id, auth_token=long_token)
+        self.assertEqual(provider.auth_id, long_id)
+        self.assertEqual(provider.auth_token, long_token)
+
+    def test_zone_id_query_logic(self):
+        """Test zone ID query logic with various domain formats"""
+        # Test with exact domain match
+        sites_response = {"Sites": [{"SiteId": 12345, "SiteName": "example.com"}]}
+        # Simulate the logic from _query_zone_id
+        sites = sites_response.get("Sites", [])
+        zone_id = None
+        for site in sites:
+            if site.get("SiteName") == "example.com":
+                zone_id = site.get("SiteId")
+                break
+        self.assertEqual(zone_id, 12345)
+
+        # Test with multiple sites
+        sites_response = {
+            "Sites": [
+                {"SiteId": 12346, "SiteName": "other.com"},
+                {"SiteId": 12345, "SiteName": "example.com"},
+                {"SiteId": 12347, "SiteName": "test.com"},
+            ]
+        }
+        sites = sites_response.get("Sites", [])
+        zone_id = None
+        for site in sites:
+            if site.get("SiteName") == "example.com":
+                zone_id = site.get("SiteId")
+                break
+        self.assertEqual(zone_id, 12345)
+
+        # Test with no matching site
+        sites_response = {"Sites": [{"SiteId": 12346, "SiteName": "other.com"}]}
+        sites = sites_response.get("Sites", [])
+        zone_id = None
+        for site in sites:
+            if site.get("SiteName") == "example.com":
+                zone_id = site.get("SiteId")
+                break
+        self.assertIsNone(zone_id)
+
+        # Test with empty sites list
+        sites_response = {"Sites": []}
+        sites = sites_response.get("Sites", [])
+        zone_id = None
+        for site in sites:
+            if site.get("SiteName") == "example.com":
+                zone_id = site.get("SiteId")
+                break
+        self.assertIsNone(zone_id)
+
+    def test_record_query_logic(self):
+        """Test record query logic with various response formats"""
+        # Test with single record
+        records_response = {
+            "Records": [
+                {
+                    "RecordId": "123456",
+                    "RecordName": "www.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.100",
+                    "TTL": 300,
+                }
+            ]
+        }
+        records = records_response.get("Records", [])  # type: list[dict] # type: ignore
+        record = records[0] if records else None
+        self.assertIsNotNone(record)
+        self.assertEqual(record["RecordId"], "123456")  # type: ignore
+        self.assertEqual(record["Value"], "192.168.1.100")  # type: ignore
+
+        # Test with multiple records (should return first one)
+        records_response = {
+            "Records": [
+                {
+                    "RecordId": "123456",
+                    "RecordName": "www.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.100",
+                    "TTL": 300,
+                },
+                {
+                    "RecordId": "123457",
+                    "RecordName": "www.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.101",
+                    "TTL": 600,
+                },
+            ]
+        }
+        records = records_response.get("Records", [])
+        record = records[0] if records else None
+        self.assertIsNotNone(record)
+        self.assertEqual(record["RecordId"], "123456")  # type: ignore
+        self.assertEqual(record["Value"], "192.168.1.100")  # type: ignore
+
+        # Test with no records
+        records_response = {"Records": []}
+        records = records_response.get("Records", [])
+        record = records[0] if records else None
+        self.assertIsNone(record)
+
+    def test_create_record_logic(self):
+        """Test create record logic with various response formats"""
+        # Test successful creation
+        create_response = {"RecordId": "123456"}
+        result = bool(create_response and create_response.get("RecordId"))
+        self.assertTrue(result)
+
+        # Test creation with additional fields
+        create_response = {"RecordId": "123456", "Status": "success", "Message": "Record created"}
+        result = bool(create_response and create_response.get("RecordId"))
+        self.assertTrue(result)
+
+        # Test failed creation (no RecordId)
+        create_response = {"Error": "Invalid domain"}
+        result = bool(create_response and create_response.get("RecordId"))
+        self.assertFalse(result)
+
+        # Test failed creation (empty response)
+        create_response = {}
+        result = bool(create_response and create_response.get("RecordId"))
+        self.assertFalse(result)
+
+        # Test failed creation (None response)
+        create_response = None
+        result = bool(create_response and create_response.get("RecordId"))
+        self.assertFalse(result)
+
+    def test_update_record_logic(self):
+        """Test update record logic with various response formats"""
+        # Test successful update
+        update_response = {"RecordId": "123456"}
+        result = bool(update_response and update_response.get("RecordId"))
+        self.assertTrue(result)
+
+        # Test update with additional fields
+        update_response = {"RecordId": "123456", "Status": "success", "Message": "Record updated"}
+        result = bool(update_response and update_response.get("RecordId"))
+        self.assertTrue(result)
+
+        # Test failed update (no RecordId)
+        update_response = {"Error": "Record not found"}
+        result = bool(update_response and update_response.get("RecordId"))
+        self.assertFalse(result)
+
+        # Test failed update (empty response)
+        update_response = {}
+        result = bool(update_response and update_response.get("RecordId"))
+        self.assertFalse(result)
+
+    def test_update_record_change_detection(self):
+        """Test update record change detection logic"""
+        old_record = {
+            "RecordId": "123456",
+            "RecordName": "www.example.com",
+            "Type": "A",
+            "Value": "192.168.1.100",
+            "TTL": 300,
+        }
+
+        # Test no changes detected - simulate _update_record logic
+        value = "192.168.1.100"
+        record_type = "A"
+        ttl = 300
+        needs_update = not (
+            old_record.get("Value") == value
+            and old_record.get("Type") == record_type
+            and (not ttl or old_record.get("TTL") == ttl)
+        )
+        self.assertFalse(needs_update)
+
+        # Test value change
+        value = "192.168.1.200"
+        needs_update = not (
+            old_record.get("Value") == value
+            and old_record.get("Type") == record_type
+            and (not ttl or old_record.get("TTL") == ttl)
+        )
+        self.assertTrue(needs_update)
+
+        # Test TTL change
+        value = "192.168.1.100"
+        ttl = 600
+        needs_update = not (
+            old_record.get("Value") == value
+            and old_record.get("Type") == record_type
+            and (not ttl or old_record.get("TTL") == ttl)
+        )
+        self.assertTrue(needs_update)
+
+        # Test type change
+        ttl = 300
+        record_type = "AAAA"
+        needs_update = not (
+            old_record.get("Value") == value
+            and old_record.get("Type") == record_type
+            and (not ttl or old_record.get("TTL") == ttl)
+        )
+        self.assertTrue(needs_update)
+
+        # Test TTL None (should be ignored)
+        record_type = "A"
+        ttl = None
+        needs_update = not (
+            old_record.get("Value") == value
+            and old_record.get("Type") == record_type
+            and (not ttl or old_record.get("TTL") == ttl)
+        )
+        self.assertFalse(needs_update)
+
+    def test_domain_formatting(self):
+        """Test domain formatting for various subdomain and main domain combinations"""
+        # Import the helper function
+        from ddns.provider._base import join_domain
+
+        # Test with standard subdomain
+        formatted = join_domain("www", "example.com")
+        self.assertEqual(formatted, "www.example.com")
+
+        # Test with @ (root domain)
+        formatted = join_domain("@", "example.com")
+        self.assertEqual(formatted, "example.com")
+
+        # Test with empty subdomain
+        formatted = join_domain("", "example.com")
+        self.assertEqual(formatted, "example.com")
+
+        # Test with nested subdomain
+        formatted = join_domain("mail.server", "example.com")
+        self.assertEqual(formatted, "mail.server.example.com")
+
+    def test_record_type_validation(self):
+        """Test record type validation and handling"""
+        valid_types = ["A", "AAAA", "CNAME", "MX", "TXT", "NS", "PTR", "SRV"]
+
+        # Test that record types are strings and uppercase
+        for record_type in valid_types:
+            self.assertIsInstance(record_type, str)
+            self.assertEqual(record_type, record_type.upper())
+
+        # Test case insensitive handling
+        self.assertEqual("A", "a".upper())
+        self.assertEqual("CNAME", "cname".upper())
+
+    def test_ttl_validation(self):
+        """Test TTL validation and conversion"""
+        # Test valid TTL values
+        valid_ttls = [60, 300, 600, 3600, 86400]
+        for ttl in valid_ttls:
+            converted = int(ttl) if ttl else None
+            self.assertEqual(converted, ttl)
+
+        # Test string TTL conversion
+        ttl_str = "300"
+        converted = int(ttl_str) if ttl_str else None
+        self.assertEqual(converted, 300)
+
+        # Test None TTL (should return None)
+        ttl_none = None
+        converted = int(ttl_none) if ttl_none else None
+        self.assertIsNone(converted)
+
+    def test_error_handling(self):
+        """Test error handling in various scenarios"""
+        # Test empty response handling
+        response = {}
+        result = response.get("RecordId")
+        self.assertIsNone(result)
+
+        # Test error response handling
+        error_response = {"Error": "Invalid request", "Code": "400"}
+        result = error_response.get("RecordId")
+        self.assertIsNone(result)
+
+        # Test None response handling
+        none_response = None
+        result = none_response.get("RecordId") if none_response else None
+        self.assertIsNone(result)
+
+    def test_comment_handling(self):
+        """Test comment parameter handling"""
+        # Test default comment from provider
+        default_comment = self.provider.remark
+        self.assertIn("DDNS", default_comment)
+
+        # Test custom comment
+        extra = {"Comment": "Custom DNS record"}
+        comment = extra.get("Comment", self.provider.remark)
+        self.assertEqual(comment, "Custom DNS record")
+
+        # Test fallback to default
+        extra = {}
+        comment = extra.get("Comment", self.provider.remark)
+        self.assertEqual(comment, self.provider.remark)
+
+
+class TestAliesaProviderIntegration(BaseProviderTestCase):
+    """AliESA Provider 集成测试类 - 使用最少的 mock"""
+
+    def setUp(self):
+        """Setup test provider with mock credentials"""
+        super(TestAliesaProviderIntegration, self).setUp()
+        self.provider = AliesaProvider(auth_id="test_access_key", auth_token="test_secret_key")
+
+    def test_full_workflow_logic(self):
+        """Test complete workflow logic without mocking internal methods"""
+        # Test scenario: updating an existing record
+        # Simulate the decision logic in set_record method
+
+        # Mock data that would be returned from API calls
+        existing_record = {
+            "RecordId": "123456",
+            "RecordName": "www.example.com",
+            "Type": "A",
+            "Value": "192.168.1.1",
+            "TTL": 300,
+        }
+        new_value = "192.168.1.100"
+
+        # Test the logic: should update because value changed
+        needs_update = not (
+            existing_record.get("Value") == new_value
+            and existing_record.get("Type") == "A"
+            and (not 300 or existing_record.get("TTL") == 300)
+        )
+        self.assertTrue(needs_update)
+
+        # Test scenario: no changes needed
+        same_value = "192.168.1.1"
+        needs_update = not (
+            existing_record.get("Value") == same_value
+            and existing_record.get("Type") == "A"
+            and (not 300 or existing_record.get("TTL") == 300)
+        )
+        self.assertFalse(needs_update)
+
+    def test_api_parameter_construction(self):
+        """Test API parameter construction logic"""
+        # Test zone_id conversion to int
+        zone_id_str = "12345"
+        zone_id_int = int(zone_id_str)
+        self.assertEqual(zone_id_int, 12345)
+
+        # Test TTL conversion
+        ttl_str = "600"
+        ttl_int = int(ttl_str) if ttl_str else None
+        self.assertEqual(ttl_int, 600)
+
+        # Test comment handling
+        extra_with_comment = {"Comment": "Custom comment"}
+        comment = extra_with_comment.get("Comment", self.provider.remark)
+        self.assertEqual(comment, "Custom comment")
+
+        extra_without_comment = {}
+        comment = extra_without_comment.get("Comment", self.provider.remark)
+        self.assertEqual(comment, self.provider.remark)
+
+    def test_site_matching_logic(self):
+        """Test site matching logic used in _query_zone_id"""
+        # Mock API response format
+        api_response = {
+            "Sites": [
+                {"SiteId": 11111, "SiteName": "other.com"},
+                {"SiteId": 22222, "SiteName": "example.com"},
+                {"SiteId": 33333, "SiteName": "test.com"},
+            ]
+        }
+
+        # Test exact match logic
+        target_domain = "example.com"
+        found_site_id = None
+        for site in api_response.get("Sites", []):
+            if site.get("SiteName") == target_domain:
+                found_site_id = site.get("SiteId")
+                break
+
+        self.assertEqual(found_site_id, 22222)
+
+        # Test no match
+        target_domain = "nonexistent.com"
+        found_site_id = None
+        for site in api_response.get("Sites", []):
+            if site.get("SiteName") == target_domain:
+                found_site_id = site.get("SiteId")
+                break
+
+        self.assertIsNone(found_site_id)
+
+    def test_record_filtering_logic(self):
+        """Test record filtering logic used in _query_record"""
+        # Mock API response format
+        api_response = {
+            "Records": [
+                {
+                    "RecordId": "111",
+                    "RecordName": "www.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.1",
+                    "TTL": 300,
+                },
+                {
+                    "RecordId": "222",
+                    "RecordName": "www.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.2",
+                    "TTL": 600,
+                },
+                {
+                    "RecordId": "333",
+                    "RecordName": "mail.example.com",
+                    "Type": "A",
+                    "Value": "192.168.1.3",
+                    "TTL": 300,
+                },
+            ]
+        }
+
+        # Test getting first matching record (current behavior)
+        records = api_response.get("Records", [])
+        first_record = records[0] if records else None
+
+        self.assertIsNotNone(first_record)
+        if first_record:  # Add type guard for static analysis
+            self.assertEqual(first_record["RecordId"], "111")
+            self.assertEqual(first_record["Value"], "192.168.1.1")
+
+
+class TestAliesaProviderAPIResponse(BaseProviderTestCase):
+    """Test AliesaProvider API response handling - minimal mocking"""
+
+    def setUp(self):
+        """Setup test provider with mock credentials"""
+        super(TestAliesaProviderAPIResponse, self).setUp()
+        self.provider = AliesaProvider(auth_id="test_access_key", auth_token="test_secret_key")
+
+    def test_successful_response_detection(self):
+        """Test successful API response detection logic"""
+        # Test create/update success detection
+        success_response = {"RecordId": "123456", "RequestId": "ABC-123"}
+        is_success = bool(success_response and success_response.get("RecordId"))
+        self.assertTrue(is_success)
+
+        # Test failure detection
+        failure_response = {"Error": "InvalidDomain", "Code": "400"}
+        is_success = bool(failure_response and failure_response.get("RecordId"))
+        self.assertFalse(is_success)
+
+        # Test empty response
+        empty_response = {}
+        is_success = bool(empty_response and empty_response.get("RecordId"))
+        self.assertFalse(is_success)
+
+    def test_different_record_types_parameters(self):
+        """Test parameter handling for different record types"""
+        # Test A record parameters
+        record_params = {"RecordName": "www.example.com", "Type": "A", "Value": "192.168.1.100", "TTL": 300}
+        self.assertEqual(record_params["Type"], "A")
+        self.assertTrue(record_params["Value"].count(".") == 3)  # IPv4 format
+
+        # Test AAAA record parameters
+        record_params = {"RecordName": "www.example.com", "Type": "AAAA", "Value": "2001:db8::1", "TTL": 600}
+        self.assertEqual(record_params["Type"], "AAAA")
+        self.assertTrue(":" in record_params["Value"])  # IPv6 format
+
+        # Test CNAME record parameters
+        record_params = {
+            "RecordName": "alias.example.com",
+            "Type": "CNAME",
+            "Value": "target.example.com",
+            "TTL": 300,
+        }
+        self.assertEqual(record_params["Type"], "CNAME")
+        self.assertTrue(record_params["Value"].endswith(".com"))
+
+    def test_parameter_filtering_logic(self):
+        """Test parameter filtering logic (removes None values)"""
+        # Test with None values
+        params = {"SiteName": "example.com", "PageSize": 500, "Filter": None, "ExtraParam": None}
+
+        # Simulate the filtering logic from _request method
+        filtered_params = {k: v for k, v in params.items() if v is not None}
+
+        expected_params = {"SiteName": "example.com", "PageSize": 500}
+
+        self.assertEqual(filtered_params, expected_params)
+        self.assertNotIn("Filter", filtered_params)
+        self.assertNotIn("ExtraParam", filtered_params)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 15 - 15
tests/test_provider_base.py

@@ -6,13 +6,13 @@ BaseProvider 单元测试
 """
 
 from base_test import BaseProviderTestCase, unittest
-from ddns.provider._base import BaseProvider
+from ddns.provider._base import BaseProvider, encode_params
 
 
 class _TestProvider(BaseProvider):
     """测试用的具体Provider实现"""
 
-    API = "https://api.example.com"
+    endpoint = "https://api.example.com"
 
     def __init__(self, auth_id="test_id", auth_token="test_token_123456789", **options):
         super(_TestProvider, self).__init__(auth_id, auth_token, **options)
@@ -85,34 +85,39 @@ class TestBaseProvider(BaseProviderTestCase):
 
     def test_split_custom_domain_with_tilde(self):
         """测试用~分隔的自定义域名"""
-        from ddns.provider._base import split_custom_domain
-        sub, main = split_custom_domain("www~example.com")
+        from ddns.provider._base import _split_custom_domain
+
+        sub, main = _split_custom_domain("www~example.com")
         self.assertEqual(sub, "www")
         self.assertEqual(main, "example.com")
 
     def test_split_custom_domain_with_plus(self):
         """测试用+分隔的自定义域名"""
-        from ddns.provider._base import split_custom_domain
-        sub, main = split_custom_domain("api+test.com")
+        from ddns.provider._base import _split_custom_domain
+
+        sub, main = _split_custom_domain("api+test.com")
         self.assertEqual(sub, "api")
         self.assertEqual(main, "test.com")
 
     def test_split_custom_domain_no_separator(self):
         """测试没有分隔符的域名"""
-        from ddns.provider._base import split_custom_domain
-        sub, main = split_custom_domain("example.com")
+        from ddns.provider._base import _split_custom_domain
+
+        sub, main = _split_custom_domain("example.com")
         self.assertIsNone(sub)
         self.assertEqual(main, "example.com")
 
     def test_join_domain_normal(self):
         """测试正常合并域名"""
         from ddns.provider._base import join_domain
+
         domain = join_domain("www", "example.com")
         self.assertEqual(domain, "www.example.com")
 
     def test_join_domain_empty_sub(self):
         """测试空子域名合并"""
         from ddns.provider._base import join_domain
+
         domain = join_domain("", "example.com")
         self.assertEqual(domain, "example.com")
 
@@ -122,21 +127,16 @@ class TestBaseProvider(BaseProviderTestCase):
     def test_encode_dict(self):
         """测试编码字典参数"""
         params = {"key1": "value1", "key2": "value2"}
-        result = BaseProvider._encode(params)
+        result = encode_params(params)
         # 由于字典顺序可能不同,我们检查包含关系
         self.assertIn("key1=value1", result)
         self.assertIn("key2=value2", result)
 
     def test_encode_none(self):
         """测试编码None参数"""
-        result = BaseProvider._encode(None)
+        result = encode_params(None)
         self.assertEqual(result, "")
 
-    def test_quote_basic(self):
-        """测试基本URL编码"""
-        result = BaseProvider._quote("hello world")
-        self.assertEqual(result, "hello%20world")
-
     def test_mask_sensitive_data_empty(self):
         """测试空数据打码"""
         result = self.provider._mask_sensitive_data("")

+ 2 - 2
tests/test_provider_cloudflare.py

@@ -20,7 +20,7 @@ class TestCloudflareProvider(BaseProviderTestCase):
 
     def test_class_constants(self):
         """Test CloudflareProvider class constants"""
-        self.assertEqual(CloudflareProvider.API, "https://api.cloudflare.com")
+        self.assertEqual(CloudflareProvider.endpoint, "https://api.cloudflare.com")
         self.assertEqual(CloudflareProvider.content_type, "application/json")
         self.assertTrue(CloudflareProvider.decode_response)
 
@@ -29,7 +29,7 @@ class TestCloudflareProvider(BaseProviderTestCase):
         provider = CloudflareProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://api.cloudflare.com")
+        self.assertEqual(provider.endpoint, "https://api.cloudflare.com")
 
     def test_init_with_token_only(self):
         """Test CloudflareProvider initialization with token only (Bearer auth)"""

+ 2 - 2
tests/test_provider_dnscom.py

@@ -20,7 +20,7 @@ class TestDnscomProvider(BaseProviderTestCase):
 
     def test_class_constants(self):
         """Test DnscomProvider class constants"""
-        self.assertEqual(DnscomProvider.API, "https://www.51dns.com")
+        self.assertEqual(DnscomProvider.endpoint, "https://www.51dns.com")
         self.assertEqual(DnscomProvider.content_type, "application/x-www-form-urlencoded")
         self.assertTrue(DnscomProvider.decode_response)
 
@@ -29,7 +29,7 @@ class TestDnscomProvider(BaseProviderTestCase):
         provider = DnscomProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://www.51dns.com")
+        self.assertEqual(provider.endpoint, "https://www.51dns.com")
 
     @patch("ddns.provider.dnscom.time")
     @patch("ddns.provider.dnscom.md5")

+ 5 - 3
tests/test_provider_dnspod.py

@@ -21,12 +21,12 @@ class TestDnspodProvider(BaseProviderTestCase):
         provider = DnspodProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://dnsapi.cn")
+        self.assertEqual(provider.endpoint, "https://dnsapi.cn")
         self.assertEqual(provider.DefaultLine, "默认")
 
     def test_class_constants(self):
         """Test DnspodProvider class constants"""
-        self.assertEqual(DnspodProvider.API, "https://dnsapi.cn")
+        self.assertEqual(DnspodProvider.endpoint, "https://dnsapi.cn")
         self.assertEqual(DnspodProvider.DefaultLine, "默认")
         # ContentType should be TYPE_FORM
         from ddns.provider._base import TYPE_FORM
@@ -367,7 +367,9 @@ class TestDnspodProvider(BaseProviderTestCase):
             mock_request.return_value = {"record": {"id": "12345", "name": "www", "value": "192.168.1.1"}}
 
             # Test create record with line parameter
-            result = self.provider._create_record("zone123", "www", "example.com", "192.168.1.1", "A", 600, "电信", {})
+            result = self.provider._create_record(
+                "zone123", "www", "example.com", "192.168.1.1", "A", 600, "电信", {}
+            )
 
             self.assertTrue(result)
             mock_request.assert_called_once_with(

+ 4 - 4
tests/test_provider_dnspod_com.py

@@ -20,7 +20,7 @@ class TestDnspodComProvider(BaseProviderTestCase):
 
     def test_class_constants(self):
         """Test DnspodComProvider class constants"""
-        self.assertEqual(DnspodComProvider.API, "https://api.dnspod.com")
+        self.assertEqual(DnspodComProvider.endpoint, "https://api.dnspod.com")
         self.assertEqual(DnspodComProvider.DefaultLine, "default")
 
     def test_init_with_basic_config(self):
@@ -28,7 +28,7 @@ class TestDnspodComProvider(BaseProviderTestCase):
         provider = DnspodComProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://api.dnspod.com")
+        self.assertEqual(provider.endpoint, "https://api.dnspod.com")
 
     def test_inheritance_from_dnspod(self):
         """Test that DnspodComProvider properly inherits from DnspodProvider"""
@@ -61,8 +61,8 @@ class TestDnspodComProviderIntegration(BaseProviderTestCase):
         dnspod_com_provider = DnspodComProvider(self.auth_id, self.auth_token)
 
         # Should use different API endpoints
-        self.assertNotEqual(dnspod_provider.API, dnspod_com_provider.API)
-        self.assertEqual(dnspod_com_provider.API, "https://api.dnspod.com")
+        self.assertNotEqual(dnspod_provider.endpoint, dnspod_com_provider.endpoint)
+        self.assertEqual(dnspod_com_provider.endpoint, "https://api.dnspod.com")
 
     def test_default_line_setting(self):
         """Test that DnspodComProvider uses correct default line"""

+ 2 - 2
tests/test_provider_he.py

@@ -25,13 +25,13 @@ class TestHeProvider(BaseProviderTestCase):
         provider = HeProvider("", self.auth_token)
         self.assertEqual(provider.auth_id, "")
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://dyn.dns.he.net")
+        self.assertEqual(provider.endpoint, "https://dyn.dns.he.net")
         self.assertFalse(provider.decode_response)
 
     def test_class_constants(self):
         """Test HeProvider class constants"""
         provider = HeProvider("", self.auth_token)
-        self.assertEqual(provider.API, "https://dyn.dns.he.net")
+        self.assertEqual(provider.endpoint, "https://dyn.dns.he.net")
         self.assertFalse(provider.decode_response)
         # ContentType should be form-encoded
         from ddns.provider._base import TYPE_FORM

+ 2 - 2
tests/test_provider_huaweidns.py

@@ -31,7 +31,7 @@ class TestHuaweiDNSProvider(BaseProviderTestCase):
 
     def test_class_constants(self):
         """Test HuaweiDNSProvider class constants"""
-        self.assertEqual(HuaweiDNSProvider.API, "https://dns.myhuaweicloud.com")
+        self.assertEqual(HuaweiDNSProvider.endpoint, "https://dns.myhuaweicloud.com")
         self.assertEqual(HuaweiDNSProvider.content_type, "application/json")
         self.assertTrue(HuaweiDNSProvider.decode_response)
         self.assertEqual(HuaweiDNSProvider.algorithm, "SDK-HMAC-SHA256")
@@ -40,7 +40,7 @@ class TestHuaweiDNSProvider(BaseProviderTestCase):
         """Test HuaweiDNSProvider initialization with basic configuration"""
         self.assertEqual(self.provider.auth_id, self.auth_id)
         self.assertEqual(self.provider.auth_token, self.auth_token)
-        self.assertEqual(self.provider.API, "https://dns.myhuaweicloud.com")
+        self.assertEqual(self.provider.endpoint, "https://dns.myhuaweicloud.com")
 
     def test_request_get_method(self):
         """Test _request method with GET method"""

+ 9 - 22
tests/test_provider_noip.py

@@ -24,17 +24,18 @@ class TestNoipProvider(BaseProviderTestCase):
         provider = NoipProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://dynupdate.no-ip.com")
+        self.assertEqual(provider.endpoint, "https://dynupdate.no-ip.com")
         self.assertFalse(provider.decode_response)
 
     def test_class_constants(self):
         """Test NoipProvider class constants"""
         provider = NoipProvider(self.auth_id, self.auth_token)
-        self.assertEqual(provider.API, "https://dynupdate.no-ip.com")
+        self.assertEqual(provider.endpoint, "https://dynupdate.no-ip.com")
         self.assertFalse(provider.decode_response)
         self.assertIsNone(provider.accept)
         # ContentType should be form-encoded
         from ddns.provider._base import TYPE_FORM
+
         self.assertEqual(provider.content_type, TYPE_FORM)
 
     def test_validate_success_with_credentials(self):
@@ -138,12 +139,7 @@ class TestNoipProvider(BaseProviderTestCase):
         provider = NoipProvider(self.auth_id, self.auth_token)
 
         result = provider.set_record(
-            domain="full.example.com",
-            value="10.0.0.1",
-            record_type="A",
-            ttl=300,
-            line="default",
-            extra_param="test"
+            domain="full.example.com", value="10.0.0.1", record_type="A", ttl=300, line="default", extra_param="test"
         )
 
         # Verify the result
@@ -307,14 +303,8 @@ class TestNoipProvider(BaseProviderTestCase):
         provider = NoipProvider("test_user", "test_pass")
 
         # Test the auth header creation manually
-        import base64
-        auth_string = "test_user:test_pass"
-        if hasattr(auth_string, 'encode'):  # Python 3
-            auth_bytes = auth_string.encode('utf-8')
-        else:  # Python 2
-            auth_bytes = auth_string
-
-        expected_auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
+
+        expected_auth_b64 = "dGVzdF91c2VyOnRlc3RfcGFzcw=="
         expected_header = "Basic {0}".format(expected_auth_b64)
 
         with patch.object(provider, "_http") as mock_http:
@@ -336,8 +326,7 @@ class TestNoipProvider(BaseProviderTestCase):
             provider.set_record("example.com", "192.168.1.1", "A")
 
         # Verify logger.info was called for initial log
-        provider.logger.info.assert_any_call(
-            "%s => %s(%s)", "example.com", "192.168.1.1", "A")
+        provider.logger.info.assert_any_call("%s => %s(%s)", "example.com", "192.168.1.1", "A")
 
 
 class TestNoipProviderIntegration(BaseProviderTestCase):
@@ -350,8 +339,7 @@ class TestNoipProviderIntegration(BaseProviderTestCase):
         with patch.object(provider, "_http") as mock_http:
             mock_http.return_value = "good 1.2.3.4"
 
-            result = provider.set_record(
-                "test.com", "1.2.3.4", "A", 300, "default")
+            result = provider.set_record("test.com", "1.2.3.4", "A", 300, "default")
 
             self.assertTrue(result)
             mock_http.assert_called_once()
@@ -374,8 +362,7 @@ class TestNoipProviderIntegration(BaseProviderTestCase):
         with patch.object(provider, "_http") as mock_http:
             mock_http.return_value = "good ::1"
 
-            result = provider.set_record(
-                "test.com", "::1", "AAAA", 600, "telecom")
+            result = provider.set_record("test.com", "::1", "AAAA", 600, "telecom")
 
             self.assertTrue(result)
             mock_http.assert_called_once()

+ 4 - 47
tests/test_provider_simple.py

@@ -6,13 +6,13 @@ Unit tests for SimpleProvider
 """
 
 from base_test import BaseProviderTestCase, unittest, MagicMock
-from ddns.provider._base import SimpleProvider, TYPE_FORM
+from ddns.provider._base import SimpleProvider, TYPE_FORM, encode_params
 
 
 class _TestableSimpleProvider(SimpleProvider):
     """Test implementation of SimpleProvider for testing purposes"""
 
-    API = "https://api.example.com"
+    endpoint = "https://api.example.com"
 
     def set_record(self, domain, value, record_type="A", ttl=None, line=None, **extra):
         """Test implementation of set_record"""
@@ -32,7 +32,7 @@ class _TestableSimpleProviderClass(BaseProviderTestCase):
         provider = _TestableSimpleProvider(self.auth_id, self.auth_token)
         self.assertEqual(provider.auth_id, self.auth_id)
         self.assertEqual(provider.auth_token, self.auth_token)
-        self.assertEqual(provider.API, "https://api.example.com")
+        self.assertEqual(provider.endpoint, "https://api.example.com")
         self.assertEqual(provider.content_type, TYPE_FORM)
         self.assertTrue(provider.decode_response)
         self.assertEqual(provider.verify_ssl, "auto")  # Default verify_ssl should be "auto"
@@ -118,56 +118,13 @@ class _TestableSimpleProviderClass(BaseProviderTestCase):
     def test_encode_dict(self):
         """Test _encode method with dictionary"""
         params = {"key1": "value1", "key2": "value2"}
-        result = _TestableSimpleProvider._encode(params)
+        result = encode_params(params)
 
         # Result should be URL-encoded string
         self.assertIn("key1=value1", result)
         self.assertIn("key2=value2", result)
         self.assertIn("&", result)
 
-    def test_encode_list(self):
-        """Test _encode method with list"""
-        params = [("key1", "value1"), ("key2", "value2")]
-        result = _TestableSimpleProvider._encode(params)
-
-        self.assertIn("key1=value1", result)
-        self.assertIn("key2=value2", result)
-
-    def test_encode_string(self):
-        """Test _encode method with string"""
-        params = "key1=value1&key2=value2"
-        result = _TestableSimpleProvider._encode(params)
-
-        self.assertEqual(result, params)
-
-    def test_encode_none(self):
-        """Test _encode method with None"""
-        result = _TestableSimpleProvider._encode(None)
-        self.assertEqual(result, "")
-
-    def test_encode_empty_dict(self):
-        """Test _encode method with empty dictionary"""
-        result = _TestableSimpleProvider._encode({})
-        self.assertEqual(result, "")
-
-    def test_quote_basic(self):
-        """Test _quote method with basic string"""
-        data = "hello world"
-        result = _TestableSimpleProvider._quote(data)
-        self.assertEqual(result, "hello%20world")
-
-    def test_quote_with_safe_chars(self):
-        """Test _quote method with safe characters"""
-        data = "hello/world"
-        result = _TestableSimpleProvider._quote(data, safe="/")
-        self.assertEqual(result, "hello/world")
-
-    def test_quote_without_safe_chars(self):
-        """Test _quote method without safe characters"""
-        data = "hello/world"
-        result = _TestableSimpleProvider._quote(data, safe="")
-        self.assertEqual(result, "hello%2Fworld")
-
     def test_mask_sensitive_data_basic(self):
         """Test _mask_sensitive_data method with basic token"""
         provider = _TestableSimpleProvider(self.auth_id, "secret123")

+ 2 - 2
tests/test_provider_tencentcloud.py

@@ -24,7 +24,7 @@ class TestTencentCloudProvider(BaseProviderTestCase):
         self.assertProviderInitialized(self.provider)
         self.assertEqual(self.provider.service, "dnspod")
         self.assertEqual(self.provider.version_date, "2021-03-23")
-        self.assertEqual(self.provider.API, "https://dnspod.tencentcloudapi.com")
+        self.assertEqual(self.provider.endpoint, "https://dnspod.tencentcloudapi.com")
         self.assertEqual(self.provider.content_type, "application/json")
 
     def test_validate_success(self):
@@ -375,7 +375,7 @@ class TestTencentCloudProvider(BaseProviderTestCase):
             "Name": "www",
             "Line": "默认",
             "Domain": "example.com",
-            "DomainId": 12345678
+            "DomainId": 12345678,
         }
 
         with patch.object(self.provider, "_request") as mock_request:

+ 2 - 10
tests/test_util_cache.py

@@ -6,20 +6,12 @@ Test cases for cache module
 """
 
 import unittest
-import sys
+
 import os
 import tempfile
 from time import sleep
 
-try:
-    from unittest.mock import patch
-except ImportError:
-    # Python 2.7 compatibility
-    from mock import patch  # type: ignore
-
-# Add the parent directory to the path so we can import the ddns module
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
+from __init__ import patch
 from ddns.util.cache import Cache  # noqa: E402