Browse Source

Refactor(base): HMAC-SHA256 signature logic (#496)

* Refactor provider HMAC-SHA256 signature logic

Centralizes HMAC-SHA256 signature generation for AliDNS, HuaweiDNS, and TencentCloud providers into a unified function in _base.py. Removes provider-specific signature and hash helper methods, updating each provider to use the shared implementation. Cleans up and removes related unit tests for the old signature methods.

* Fix test config key from signing_string_template to signing_string_format

Corrects the test case configuration by renaming 'signing_string_template' to 'signing_string_format' to match the expected parameter name.

* rename hmac_sha256_digest to hmac_sha256

refactor(hmac): rename hmac_sha256_digest to hmac_sha256 and update its implementation

* Fix: Ensure Python 2/3 compatibility for HMAC-SHA256 encoding and hex conversion

* Refactor: Remove unnecessary warning log in CloudflareProvider and streamline test cases for CallbackProvider

* Enhance tests: Add random delay to avoid rate limiting in CallbackProvider integration tests

* Refactor: Simplify host extraction in AlidnsProvider's _request method
New Future 5 months ago
parent
commit
b3a5ff9817

+ 5 - 3
README.md

@@ -38,14 +38,16 @@
   - 多代理自动切换
 - 服务商支持:
   - [DNSPOD](https://www.dnspod.cn/) ([配置指南](doc/providers/dnspod.md))
-  - [阿里 DNS](http://www.alidns.com/) ([配置指南](doc/providers/alidns.md))
+  - [阿里 DNS](http://www.alidns.com/) ([配置指南](doc/providers/alidns.md))
   - [DNS.COM](https://www.dns.com/) (@loftor-git)
   - [DNSPOD 国际版](https://www.dnspod.com/)
   - [CloudFlare](https://www.cloudflare.com/) (@tongyifan)
   - [HE.net](https://dns.he.net/) (@NN708) (不支持自动创建记录)
-  - [华为云](https://huaweicloud.com/) (@cybmp3)
-  - [腾讯云](https://cloud.tencent.com/) ([配置指南](doc/providers/tencentcloud.md))
+  - [华为云](https://huaweicloud.com/) (@cybmp3)
+  - [腾讯云](https://cloud.tencent.com/) ([配置指南](doc/providers/tencentcloud.md))
   - 自定义回调 API ([配置指南](doc/providers/callback.md))
+  
+  > ⚡ 标记的服务商使用高级 HMAC-SHA256 签名认证,提供企业级安全保障
 - 其他:
   - 可设置定时任务
   - TTL 配置支持

+ 104 - 1
ddns/provider/_base.py

@@ -57,10 +57,12 @@ Defines a unified interface to support extension and adaptation across providers
 @author: NewFuture
 """
 
-from os import environ
 from abc import ABCMeta, abstractmethod
+from hashlib import sha256
+from hmac import HMAC
 from json import loads as jsondecode, dumps as jsonencode
 from logging import Logger, getLogger  # noqa:F401 # type: ignore[no-redef]
+from os import environ
 from ..util.http import send_http_request
 
 try:  # python 3
@@ -72,6 +74,107 @@ TYPE_FORM = "application/x-www-form-urlencoded"
 TYPE_JSON = "application/json"
 
 
+def hmac_sha256(key, message):
+    # type: (str | bytes, str | bytes) -> HMAC
+    """
+    计算 HMAC-SHA256 签名对象
+
+    Compute HMAC-SHA256 signature object.
+
+    Args:
+        key (str | bytes): 签名密钥 / Signing key
+        message (str | bytes): 待签名消息 / Message to sign
+
+    Returns:
+        HMAC: HMAC签名对象,可调用.digest()获取字节或.hexdigest()获取十六进制字符串
+              HMAC signature object, call .digest() for bytes or .hexdigest() for hex string
+    """
+    # Python 2/3 compatible encoding - avoid double encoding in Python 2
+    if not isinstance(key, bytes):
+        key = key.encode("utf-8")
+    if not isinstance(message, bytes):
+        message = message.encode("utf-8")
+    return HMAC(key, message, sha256)
+
+
+def sha256_hash(data):
+    # type: (str | bytes) -> str
+    """
+    计算 SHA256 哈希值
+
+    Compute SHA256 hash.
+
+    Args:
+        data (str | bytes): 待哈希数据 / Data to hash
+
+    Returns:
+        str: 十六进制哈希字符串 / Hexadecimal hash string
+    """
+    # Python 2/3 compatible encoding - avoid double encoding in Python 2
+    if not isinstance(data, bytes):
+        data = data.encode("utf-8")
+    return sha256(data).hexdigest()
+
+
+def hmac_sha256_authorization(
+    secret_key,  # type: str | bytes
+    method,  # type: str
+    path,  # type: str
+    query,  # type: str
+    headers,  # type: dict[str, str]
+    body_hash,  # type: str
+    signing_string_format,  # type: str
+    authorization_format,  # type: str
+):
+    # type: (...) -> str
+    """
+    HMAC-SHA256 云服务商通用认证签名生成器
+
+    Universal cloud provider authentication signature generator using HMAC-SHA256.
+
+    通用的云服务商API认证签名生成函数,使用HMAC-SHA256算法生成符合各云服务商规范的Authorization头部。
+
+    模板变量格式:{HashedCanonicalRequest}, {SignedHeaders}, {Signature}
+
+    Args:
+        secret_key (str | bytes): 签名密钥,已经过密钥派生处理 / Signing key (already derived if needed)
+        method (str): HTTP请求方法 / HTTP request method (GET, POST, etc.)
+        path (str): API请求路径 / API request path
+        query (str): URL查询字符串 / URL query string
+        headers (dict[str, str]): HTTP请求头部 / HTTP request headers
+        body_hash (str): 请求体的SHA256哈希值 / SHA256 hash of request payload
+        signing_string_format (str): 待签名字符串模板,包含 {HashedCanonicalRequest} 占位符
+        authorization_format (str): Authorization头部模板,包含 {SignedHeaders}, {Signature} 占位符
+
+    Returns:
+        str: 完整的Authorization头部值 / Complete Authorization header value
+    """
+    # 1. 构建规范化头部 - 所有传入的头部都参与签名
+    headers_to_sign = {k.lower(): str(v).strip() for k, v in headers.items()}
+    signed_headers_list = sorted(headers_to_sign.keys())
+
+    # 2. 构建规范请求字符串
+    canonical_headers = ""
+    for header_name in signed_headers_list:
+        canonical_headers += "{}:{}\n".format(header_name, headers_to_sign[header_name])
+
+    # 构建完整的规范请求字符串
+    signed_headers = ";".join(signed_headers_list)
+    canonical_request = "\n".join([method.upper(), path, query, canonical_headers, signed_headers, body_hash])
+
+    # 3. 构建待签名字符串 - 只需要替换 HashedCanonicalRequest
+    hashed_canonical_request = sha256_hash(canonical_request)
+    string_to_sign = signing_string_format.format(HashedCanonicalRequest=hashed_canonical_request)
+
+    # 4. 计算最终签名
+    signature = hmac_sha256(secret_key, string_to_sign).hexdigest()
+
+    # 5. 构建Authorization头部 - 只需要替换 SignedHeaders 和 Signature
+    authorization = authorization_format.format(SignedHeaders=signed_headers, Signature=signature)
+
+    return authorization
+
+
 class SimpleProvider(object):
     """
     简单DNS服务商接口的抽象基类, 必须实现 `set_record` 方法。

+ 17 - 38
ddns/provider/alidns.py

@@ -5,9 +5,7 @@ AliDNS API
 @author: NewFuture
 """
 
-from ._base import TYPE_FORM, BaseProvider
-from hashlib import sha256
-from hmac import new as hmac
+from ._base import TYPE_FORM, BaseProvider, hmac_sha256_authorization, sha256_hash
 from time import strftime, gmtime, time
 
 
@@ -17,45 +15,15 @@ class AlidnsProvider(BaseProvider):
 
     api_version = "2015-01-09"  # API版本,v3签名需要
 
-    def _signature_v3(self, method, path, headers, query="", body_hash=""):
-        # type: (str, str, dict, str, str) -> str
-        """阿里云API v3签名算法 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
-        # 构造规范化头部
-        headers_to_sign = {k.lower(): str(v).strip() for k, v in headers.items()}
-
-        # 按字母顺序排序并构造
-        signed_headers_list = sorted(headers_to_sign.keys())
-        canonical_headers = "".join("{}:{}\n".format(key, headers_to_sign[key]) for key in signed_headers_list)
-        signed_headers = ";".join(signed_headers_list)
-
-        # 构造规范化请求
-        canonical_request = "\n".join([method, path, query, canonical_headers, signed_headers, body_hash])
-
-        # 5. 构造待签名字符串
-        algorithm = "ACS3-HMAC-SHA256"
-        hashed_canonical_request = sha256(canonical_request.encode("utf-8")).hexdigest()
-        string_to_sign = "\n".join([algorithm, hashed_canonical_request])
-        self.logger.debug("String to sign: %s", string_to_sign)
-
-        # 6. 计算签名
-        signature = hmac(self.auth_token.encode("utf-8"), string_to_sign.encode("utf-8"), sha256).hexdigest()
-
-        # 7. 构造Authorization头
-        authorization = "{} Credential={},SignedHeaders={},Signature={}".format(
-            algorithm, self.auth_id, signed_headers, signature
-        )
-        return authorization
-
     def _request(self, action, **params):
         # type: (str, **(str | int | bytes | bool | None)) -> dict
+        """Aliyun v3 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
         params = {k: v for k, v in params.items() if v is not None}
-        # 从API URL中提取host
-        host = self.API.replace("https://", "").replace("http://", "").strip("/")
         body_content = self._encode(params) if len(params) > 0 else ""
-        content_hash = sha256(body_content.encode("utf-8")).hexdigest()
+        content_hash = sha256_hash(body_content)
         # 构造请求头部
         headers = {
-            "host": host,
+            "host": self.API.split("://", 1)[1].strip("/"),
             "content-type": self.content_type,
             "x-acs-action": action,
             "x-acs-content-sha256": content_hash,
@@ -64,8 +32,19 @@ class AlidnsProvider(BaseProvider):
             "x-acs-version": self.api_version,
         }
 
-        # 生成Authorization头
-        authorization = self._signature_v3("POST", "/", headers, body_hash=content_hash)
+        # 使用通用签名函数
+        authorization = hmac_sha256_authorization(
+            secret_key=self.auth_token,
+            method="POST",
+            path="/",
+            query="",
+            headers=headers,
+            body_hash=content_hash,
+            signing_string_format="ACS3-HMAC-SHA256\n{HashedCanonicalRequest}",
+            authorization_format=(
+                "ACS3-HMAC-SHA256 Credential=" + self.auth_id + ",SignedHeaders={SignedHeaders},Signature={Signature}"
+            ),
+        )
         headers["Authorization"] = authorization
         # 对于v3签名的RPC API,参数在request body中
         return self._http("POST", "/", body=body_content, headers=headers)

+ 0 - 4
ddns/provider/cloudflare.py

@@ -12,10 +12,6 @@ class CloudflareProvider(BaseProvider):
     content_type = TYPE_JSON
 
     def _validate(self):
-        self.logger.warning(
-            "Cloudflare provider 缺少充分的真实环境测试,如遇问题请及时在 GitHub Issues 中反馈: %s",
-            "https://github.com/NewFuture/DDNS/issues",
-        )
         if not self.auth_token:
             raise ValueError("token must be configured")
         if self.auth_id:

+ 1 - 1
ddns/provider/dnscom.py

@@ -21,7 +21,7 @@ class DnscomProvider(BaseProvider):
 
     def _validate(self):
         self.logger.warning(
-            "DNS.COM provider 缺少充分的真实环境测试,如遇问题请及时在 GitHub Issues 中反馈: %s",
+            "DNS.COM 缺少充分的真实环境测试,请及时在 GitHub Issues 中反馈: %s",
             "https://github.com/NewFuture/DDNS/issues",
         )
         super(DnscomProvider, self)._validate()

+ 1 - 1
ddns/provider/he.py

@@ -15,7 +15,7 @@ class HeProvider(SimpleProvider):
 
     def _validate(self):
         self.logger.warning(
-            "HE.net provider 缺少充分的真实环境测试,如遇问题请及时在 GitHub Issues 中反馈: %s",
+            "HE.net 缺少充分的真实环境测试,请及时在 GitHub Issues 中反馈: %s",
             "https://github.com/NewFuture/DDNS/issues",
         )
         if self.auth_id:

+ 34 - 50
ddns/provider/huaweidns.py

@@ -2,13 +2,10 @@
 """
 HuaweiDNS API
 华为DNS解析操作库
-https://support.huaweicloud.com/api-dns/zh-cn_topic_0037134406.html
-@author: cybmp3, NewFuture
+@author: NewFuture
 """
 
-from ._base import BaseProvider, TYPE_JSON
-from hashlib import sha256
-from hmac import new as hmac
+from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash
 from json import dumps as jsonencode
 from time import strftime, gmtime
 
@@ -20,29 +17,18 @@ class HuaweiDNSProvider(BaseProvider):
 
     def _validate(self):
         self.logger.warning(
-            "华为云 DNS provider 缺少充分的真实环境测试,如遇问题请及时在 GitHub Issues 中反馈: %s",
+            "华为云 DNS 缺少充分的真实环境测试,请及时在 GitHub Issues 中反馈: %s",
             "https://github.com/NewFuture/DDNS/issues",
         )
         super(HuaweiDNSProvider, self)._validate()
 
-    def _sign_headers(self, headers, signed_headers):
-        a = []
-        _headers = {}
-        for key in headers:
-            key_encoded = key.lower()
-            value = headers[key]
-            value_encoded = value.strip()
-            _headers[key_encoded] = value_encoded
-        for key in signed_headers:
-            a.append(key + ":" + _headers[key])
-        return "\n".join(a) + "\n"
-
-    def _hex_encode_sha256(self, data):
-        sha = sha256()
-        sha.update(data)
-        return sha.hexdigest()
-
     def _request(self, method, path, **params):
+        """
+        https://support.huaweicloud.com/api-dns/zh-cn_topic_0037134406.html
+        https://support.huaweicloud.com/devg-apisign/api-sign-algorithm-002.html
+        https://support.huaweicloud.com/devg-apisign/api-sign-algorithm-003.html
+        https://support.huaweicloud.com/devg-apisign/api-sign-algorithm-004.html
+        """
         # type: (str, str, **Any) -> dict
         params = {k: v for k, v in params.items() if v is not None}
         if method.upper() == "GET" or method.upper() == "DELETE":
@@ -52,40 +38,36 @@ class HuaweiDNSProvider(BaseProvider):
             query = ""
             body = jsonencode(params)
 
-        date_now = strftime("%Y%m%dT%H%M%SZ", gmtime())
+        now = strftime("%Y%m%dT%H%M%SZ", gmtime())
         headers = {
             "content-type": self.content_type,
             "host": self.API.split("://", 1)[1].strip("/"),
-            "X-Sdk-Date": date_now,
+            "X-Sdk-Date": now,
         }
-        sign_headers = [k.lower() for k in headers]
-        sign_headers.sort()
 
-        hex_encode = self._hex_encode_sha256(body.encode("utf-8"))
-        canonical_headers = self._sign_headers(headers, sign_headers)
-        sign_path = path if path[-1] == "/" else path + "/"
-        canonical_request = "%s\n%s\n%s\n%s\n%s\n%s" % (
-            method.upper(),
-            sign_path,
-            query,
-            canonical_headers,
-            ";".join(sign_headers),
-            hex_encode,
-        )
-        hashed_canonical_request = self._hex_encode_sha256(canonical_request.encode("utf-8"))
-
-        str_to_sign = "%s\n%s\n%s" % (self.algorithm, date_now, hashed_canonical_request)
-        secret = self.auth_token
-        signature = hmac(secret.encode("utf-8"), str_to_sign.encode("utf-8"), digestmod=sha256).hexdigest()
-        auth_header = "%s Access=%s, SignedHeaders=%s, Signature=%s" % (
+        # 使用通用签名函数
+        body_hash = sha256_hash(body)
+        # 华为云需要在签名时使用带尾斜杠的路径
+        sign_path = path if path.endswith("/") else path + "/"
+        authorization_format = "%s Access=%s, SignedHeaders={SignedHeaders}, Signature={Signature}" % (
             self.algorithm,
             self.auth_id,
-            ";".join(sign_headers),
-            signature,
         )
-        headers["Authorization"] = auth_header
-        self.logger.debug("Request headers: %s", headers)
-        data = self._http(method, path + "?" + query, headers=headers, body=body)
+        authorization = hmac_sha256_authorization(
+            secret_key=self.auth_token,
+            method=method,
+            path=sign_path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            signing_string_format=self.algorithm + "\n" + now + "\n{HashedCanonicalRequest}",
+            authorization_format=authorization_format,
+        )
+        headers["Authorization"] = authorization
+
+        # 使用原始路径发送实际请求
+        path = "{}?{}".format(path, query) if query else path
+        data = self._http(method, path, headers=headers, body=body)
         return data
 
     def _query_zone_id(self, domain):
@@ -140,8 +122,10 @@ class HuaweiDNSProvider(BaseProvider):
         return False
 
     def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
-        """https://support.huaweicloud.com/api-dns/UpdateRecordSet.html (无 line 参数)"""
+        """https://support.huaweicloud.com/api-dns/UpdateRecordSets.html"""
         extra["description"] = extra.get("description", self.remark)
+        # Note: The v2.1 update API does not support the line parameter in the request body
+        # The line parameter is returned in the response but cannot be modified
         res = self._request(
             "PUT",
             "/v2.1/zones/" + zone_id + "/recordsets/" + old_record["id"],

+ 39 - 84
ddns/provider/tencentcloud.py

@@ -5,9 +5,7 @@ Tencent Cloud DNSPod API
 
 @author: NewFuture
 """
-from ._base import BaseProvider, TYPE_JSON
-from hashlib import sha256
-from hmac import new as hmac
+from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash, hmac_sha256
 from time import time, strftime, gmtime
 from json import dumps as jsonencode
 
@@ -28,75 +26,6 @@ class TencentCloudProvider(BaseProvider):
     service = "dnspod"
     version_date = "2021-03-23"
 
-    def _sign_tc3(self, method, uri, query, headers, payload, timestamp):
-        """
-        腾讯云 API 3.0 签名算法 (TC3-HMAC-SHA256)
-
-        API 文档: https://cloud.tencent.com/document/api/1427/56189
-
-        Args:
-            method (str): HTTP 方法
-            uri (str): URI 路径
-            query (str): 查询字符串
-            headers (dict): 请求头
-            payload (str): 请求体
-            timestamp (int): 时间戳
-
-        Returns:
-            str: Authorization 头部值
-        """
-        algorithm = "TC3-HMAC-SHA256"
-
-        # Step 1: 构建规范请求串
-        http_request_method = method.upper()
-        canonical_uri = uri
-        canonical_querystring = query or ""
-
-        # 构建规范头部
-        signed_headers_list = []
-        canonical_headers = ""
-        for key in sorted(headers.keys()):
-            if key in ["content-type", "host"]:
-                signed_headers_list.append(key)
-                canonical_headers += "{}:{}\n".format(key, headers[key])
-
-        signed_headers = ";".join(signed_headers_list)
-        hashed_request_payload = sha256(payload.encode("utf-8")).hexdigest()
-
-        canonical_request = "\n".join(
-            [
-                http_request_method,
-                canonical_uri,
-                canonical_querystring,
-                canonical_headers,
-                signed_headers,
-                hashed_request_payload,
-            ]
-        )
-
-        # Step 2: 构建待签名字符串
-        date = strftime("%Y-%m-%d", gmtime())  # 日期
-        credential_scope = "{}/{}/tc3_request".format(date, self.service)
-        hashed_canonical_request = sha256(canonical_request.encode("utf-8")).hexdigest()
-
-        string_to_sign = "\n".join([algorithm, str(timestamp), credential_scope, hashed_canonical_request])
-
-        # Step 3: 计算签名
-        def _sign(key, msg):
-            return hmac(key, msg.encode("utf-8"), sha256).digest()
-
-        secret_date = _sign(("TC3" + self.auth_token).encode("utf-8"), date)
-        secret_service = _sign(secret_date, self.service)
-        secret_signing = _sign(secret_service, "tc3_request")
-        signature = hmac(secret_signing, string_to_sign.encode("utf-8"), sha256).hexdigest()
-
-        # Step 4: 构建 Authorization 头部
-        authorization = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
-            algorithm, self.auth_id, credential_scope, signed_headers, signature
-        )
-
-        return authorization
-
     def _request(self, action, **params):
         # type: (str, **(str | int | bytes | bool | None)) -> dict | None
         """
@@ -111,27 +40,53 @@ class TencentCloudProvider(BaseProvider):
         Returns:
             dict: API 响应结果
         """
+        # 构建请求体
         params = {k: v for k, v in params.items() if v is not None}
-        timestamp = int(time())
-        # 构建请求头,小写
+        body = jsonencode(params)
+
+        # 构建请求头,小写 腾讯云只签名特定头部
         headers = {
             "content-type": self.content_type,
             "host": self.API.split("://", 1)[1].strip("/"),
-            "X-TC-Action": action,
-            "X-TC-Version": self.version_date,
-            "X-TC-Timestamp": str(timestamp),
         }
 
-        # 构建请求体
-        payload = jsonencode(params)
+        # 腾讯云特殊的密钥派生过程
+        date = strftime("%Y-%m-%d", gmtime())
+        credential_scope = "{}/{}/tc3_request".format(date, self.service)
 
-        # 生成签名
-        authorization = self._sign_tc3("POST", "/", "", headers, payload, timestamp)
-        headers["authorization"] = authorization
+        # 派生签名密钥
+        secret_date = hmac_sha256("TC3" + self.auth_token, date).digest()
+        secret_service = hmac_sha256(secret_date, self.service).digest()
+        signing_key = hmac_sha256(secret_service, "tc3_request").digest()
 
-        # 发送请求
-        response = self._http("POST", "/", body=payload, headers=headers)
+        # 预处理模板字符串
+        auth_format = "TC3-HMAC-SHA256 Credential=%s/%s, SignedHeaders={SignedHeaders}, Signature={Signature}" % (
+            self.auth_id,
+            credential_scope,
+        )
+        timestamp = str(int(time()))
+        sign_template = "\n".join(["TC3-HMAC-SHA256", timestamp, credential_scope, "{HashedCanonicalRequest}"])
+        authorization = hmac_sha256_authorization(
+            secret_key=signing_key,
+            method="POST",
+            path="/",
+            query="",
+            headers=headers,
+            body_hash=sha256_hash(body),
+            signing_string_format=sign_template,
+            authorization_format=auth_format,
+        )
+        # X-TC 更新签名之后方可添加
+        headers.update(
+            {
+                "X-TC-Action": action,
+                "X-TC-Version": self.version_date,
+                "X-TC-Timestamp": timestamp,
+                "authorization": authorization,
+            }
+        )
 
+        response = self._http("POST", "/", body=body, headers=headers)
         if response and "Response" in response:
             if "Error" in response["Response"]:
                 error = response["Response"]["Error"]

+ 175 - 8
doc/dev/provider.md

@@ -7,12 +7,15 @@
 ```text
 ddns/
 ├── provider/
-│   ├── _base.py         # 抽象基类 SimpleProvider 和 BaseProvider
+│   ├── _base.py         # 抽象基类 SimpleProvider 和 BaseProvider,签名认证函数
 │   └── myprovider.py    # 你的新服务商实现
 tests/
 ├── base_test.py         # 共享测试工具和基类
 ├── test_provider_*.py   # 各个Provider的单元测试文件
+├── test_module_*.py     # 其他测试
 └── README.md            # 测试指南
+doc/dev/
+└── provider.md          # Provider开发指南 (本文档)
 ```
 
 ---
@@ -124,7 +127,7 @@ class MySimpleProvider(SimpleProvider):
 自定义标准 DNS 服务商示例
 @author: YourGithubUsername
 """
-from ._base import BaseProvider, TYPE_JSON
+from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash
 
 class MyProvider(BaseProvider):
     """
@@ -134,11 +137,6 @@ class MyProvider(BaseProvider):
     API = 'https://api.exampledns.com'
     ContentType = TYPE_JSON  # 或 TYPE_FORM
 
-    def _request(self, action, **params):
-        # type: (str, **(str | int | bytes | bool | None)) -> dict
-        """[推荐]封装通用请求逻辑,处理认证和公共参数"""
-
-
     def _query_zone_id(self, domain):
         # type: (str) -> str
         """查询主域名的Zone ID"""
@@ -156,6 +154,21 @@ class MyProvider(BaseProvider):
     def _update_record(self, zone_id, old_record, value, record_type, ttl=None, line=None, extra=None):
         # type: (str, str, str, str, int | None, str | None, dict | None) -> bool
         """更新现有DNS记录"""
+
+    
+    def _request(self, action, **params):
+        # type: (str, **(str | int | bytes | bool | None)) -> dict
+        """[推荐]封装通用请求逻辑,处理认证和公共参数"""
+        # 构建请求参数
+        request_params = {
+            "Action": action,
+            "Version": "2023-01-01",
+            "AccessKeyId": self.auth_id,
+            **{k: v for k, v in params.items() if v is not None}
+        }
+
+        res = self._http("POST", "/", params=request_params, headers=headers)
+        return res.get("data", {})
 ```
 
 ---
@@ -273,6 +286,160 @@ tests/
 - [`provider/alidns.py`](/ddns/provider/alidns.py) - POST+签名认证
 - [`provider/dnspod.py`](/ddns/provider/dnspod.py) - POST表单数据提交
 
+---
+
+## 🔐 云服务商认证签名算法
+
+对于需要签名认证的云服务商(如阿里云、华为云、腾讯云等),DDNS 提供了通用的 HMAC-SHA256 签名认证函数。
+
+### 签名认证工具函数
+
+#### `hmac_sha256_authorization()` - 通用签名生成器
+
+通用的云服务商API认证签名生成函数,支持阿里云、华为云、腾讯云等多种云服务商。
+使用HMAC-SHA256算法生成符合各云服务商规范的Authorization头部。
+所有云服务商的差异通过模板参数传递,实现完全的服务商无关性。
+
+```python
+from ddns.provider._base import hmac_sha256_authorization, sha256_hash
+
+# 通用签名函数调用示例
+authorization = hmac_sha256_authorization(
+    secret_key=secret_key,                    # 签名密钥(已派生处理)
+    method="POST",                            # HTTP方法
+    path="/v1/domains/records",               # API路径
+    query="limit=20&offset=0",                # 查询字符串
+    headers=request_headers,                  # 请求头部字典
+    body_hash=sha256_hash(request_body),      # 请求体哈希
+    signing_string_format=signing_template,   # 待签名字符串模板
+    authorization_format=auth_template        # Authorization头部模板
+)
+```
+
+**函数参数说明:**
+
+| 参数 | 类型 | 说明 |
+|------|------|------|
+| `secret_key` | `str \| bytes` | 签名密钥,已经过密钥派生处理 |
+| `method` | `str` | HTTP请求方法 (GET, POST, etc.) |
+| `path` | `str` | API请求路径 |
+| `query` | `str` | URL查询字符串 |
+| `headers` | `dict[str, str]` | HTTP请求头部 |
+| `body_hash` | `str` | 请求体的SHA256哈希值 |
+| `signing_string_format` | `str` | 待签名字符串模板,包含 `{HashedCanonicalRequest}` 占位符 |
+| `authorization_format` | `str` | Authorization头部模板,包含 `{SignedHeaders}`, `{Signature}` 占位符 |
+
+**模板变量:**
+
+- `{HashedCanonicalRequest}` - 规范请求的SHA256哈希值
+- `{SignedHeaders}` - 按字母顺序排列的签名头部列表
+- `{Signature}` - 最终的HMAC-SHA256签名值
+
+### 各云服务商签名实现示例
+
+#### 阿里云 (ACS3-HMAC-SHA256)
+
+```python
+def _request(self, action, **params):
+    # 构建请求头部
+    headers = {
+        "host": "alidns.aliyuncs.com",
+        "x-acs-action": action,
+        "x-acs-content-sha256": sha256_hash(body),
+        "x-acs-date": timestamp,
+        "x-acs-signature-nonce": nonce,
+        "x-acs-version": "2015-01-09"
+    }
+    
+    # 阿里云签名模板
+    auth_template = (
+        "ACS3-HMAC-SHA256 Credential={access_key},"
+        "SignedHeaders={{SignedHeaders}},Signature={{Signature}}"
+    )
+    signing_template = "ACS3-HMAC-SHA256\n{timestamp}\n{{HashedCanonicalRequest}}"
+    
+    # 生成签名
+    authorization = hmac_sha256_authorization(
+        secret_key=self.auth_token,
+        method="POST",
+        path="/",
+        query=query_string,
+        headers=headers,
+        body_hash=sha256_hash(body),
+        signing_string_format=signing_template,
+        authorization_format=auth_template
+    )
+    
+    headers["authorization"] = authorization
+    return self._http("POST", "/", body=body, headers=headers)
+```
+
+#### 腾讯云 (TC3-HMAC-SHA256)
+
+```python
+def _request(self, action, **params):
+    # 腾讯云需要派生密钥
+    derived_key = self._derive_signing_key(date, service, self.auth_token)
+    
+    # 构建请求头部
+    headers = {
+        "content-type": "application/json",
+        "host": "dnspod.tencentcloudapi.com",
+        "x-tc-action": action,
+        "x-tc-timestamp": timestamp,
+        "x-tc-version": "2021-03-23"
+    }
+    
+    # 腾讯云签名模板
+    auth_template = (
+        "TC3-HMAC-SHA256 Credential={secret_id}/{date}/{service}/tc3_request, "
+        "SignedHeaders={{SignedHeaders}}, Signature={{Signature}}"
+    )
+    signing_template = "TC3-HMAC-SHA256\n{timestamp}\n{date}/{service}/tc3_request\n{{HashedCanonicalRequest}}"
+    
+    # 生成签名
+    authorization = hmac_sha256_authorization(
+        secret_key=derived_key,  # 注意:使用派生密钥
+        method="POST",
+        path="/",
+        query="",
+        headers=headers,
+        body_hash=sha256_hash(body),
+        signing_string_format=signing_template,
+        authorization_format=auth_template
+    )
+    
+    headers["authorization"] = authorization
+    return self._http("POST", "/", body=body, headers=headers)
+```
+
+### 辅助工具函数
+
+#### `sha256_hash()` - SHA256哈希计算
+
+```python
+from ddns.provider._base import sha256_hash
+
+# 计算字符串的SHA256哈希
+hash_value = sha256_hash("request body content")
+# 计算字节数据的SHA256哈希  
+hash_value = sha256_hash(b"binary data")
+```
+
+#### `hmac_sha256()` - HMAC-SHA256签名对象
+
+```python
+from ddns.provider._base import hmac_sha256
+
+# 生成HMAC-SHA256字节签名
+# 获取 HMAC 对象,可调用 .digest() 获取字节或 .hexdigest() 获取十六进制字符串
+hmac_obj = hmac_sha256("secret_key", "message_to_sign")
+signature_bytes = hmac_obj.digest()        # 字节格式
+signature_hex = hmac_obj.hexdigest()       # 十六进制字符串格式
+```
+
+---
+
 ### 🛠️ 开发工具推荐
 
 - 本地开发环境:VSCode
@@ -296,4 +463,4 @@ tests/
 - [ ] 测试了各种边界情况和错误场景
 - [ ] 更新了相关文档
 
-**Happy Coding! 🚀**
+## Happy Coding! 🚀

+ 359 - 0
tests/test_hmac_sha256_authorization.py

@@ -0,0 +1,359 @@
+# coding=utf-8
+"""
+HMAC-SHA256 Authorization 函数单元测试
+
+针对 ddns.provider._base.hmac_sha256_authorization 函数的完整测试套件。
+测试覆盖多种典型使用场景,包括各大云服务商的认证模式,
+所有期望结果都是预先计算好的,确保测试结果的可复现性。
+
+Test suite for ddns.provider._base.hmac_sha256_authorization function.
+Covers various typical use cases including authentication patterns of major cloud providers.
+All expected results are pre-calculated to ensure reproducible test results.
+"""
+
+import unittest
+from ddns.provider._base import hmac_sha256_authorization, sha256_hash, hmac_sha256
+
+
+class TestHmacSha256Authorization(unittest.TestCase):
+    """HMAC-SHA256 Authorization 函数测试类"""
+
+    def test_basic_functionality(self):
+        """测试基本功能 - 简单的签名生成"""
+        secret_key = "test_secret_key"
+        method = "GET"
+        path = "/api/test"
+        query = ""
+        headers = {"Host": "api.example.com", "Date": "20231201T120000Z"}
+        body_hash = sha256_hash("")  # 空请求体的哈希
+
+        auth_header_template = "TEST {SignedHeaders} {Signature}"
+        signing_string_template = "TEST\n{HashedCanonicalRequest}"
+
+        result = hmac_sha256_authorization(
+            secret_key=secret_key,
+            method=method,
+            path=path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            authorization_format=auth_header_template,
+            signing_string_format=signing_string_template,
+        )
+
+        # 严格验证基本功能测试的完整结果 - 精确匹配
+        expected_result = "TEST date;host b5b3ee3c39b1c749faa31c1b5bd3a5609a3e5408dfb7f90eca5ea17d8aeb1ba2"
+        self.assertEqual(result, expected_result)
+
+    def test_alibaba_cloud_official_example(self):
+        """测试阿里云官方文档示例 - ACS3-HMAC-SHA256签名算法"""
+        # 使用阿里云官方文档中的固定参数示例
+        # 来源: https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature
+        secret_key = "YourAccessKeySecret"
+        method = "POST"
+        path = "/"
+        query = "ImageId=win2019_1809_x64_dtc_zh-cn_40G_alibase_20230811.vhd&RegionId=cn-shanghai"
+        headers = {
+            "host": "ecs.cn-shanghai.aliyuncs.com",
+            "x-acs-action": "RunInstances",
+            "x-acs-content-sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+            "x-acs-date": "2023-10-26T10:22:32Z",
+            "x-acs-signature-nonce": "3156853299f313e23d1673dc12e1703d",
+            "x-acs-version": "2014-05-26",
+        }
+        body_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
+
+        auth_header_template = (
+            "ACS3-HMAC-SHA256 Credential=YourAccessKeyId," "SignedHeaders={SignedHeaders},Signature={Signature}"
+        )
+        signing_string_template = "ACS3-HMAC-SHA256\n{HashedCanonicalRequest}"
+
+        result = hmac_sha256_authorization(
+            secret_key=secret_key,
+            method=method,
+            path=path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            authorization_format=auth_header_template,
+            signing_string_format=signing_string_template,
+        )
+
+        # 严格验证阿里云官方示例的完整授权头 - 精确匹配
+        expected_result = (
+            "ACS3-HMAC-SHA256 Credential=YourAccessKeyId,"
+            "SignedHeaders=host;x-acs-action;x-acs-content-sha256;x-acs-date;x-acs-signature-nonce;x-acs-version,"
+            "Signature=06563a9e1b43f5dfe96b81484da74bceab24a1d853912eee15083a6f0f3283c0"
+        )
+        self.assertEqual(result, expected_result)
+
+    def test_huawei_cloud_official_example(self):
+        """测试华为云官方文档示例 - SDK-HMAC-SHA256签名算法"""
+        # 使用华为云官方文档中的查询VPC列表示例
+        # 来源: https://support.huaweicloud.com/devg-apisign/api-sign-algorithm-002.html
+        secret_key = "your_secret_access_key"
+        method = "GET"
+        path = "/v1/77b6a44cba5143ab91d13ab9a8ff44fd/vpcs/"  # 注意官方示例要求以/结尾
+        query = "limit=2&marker=13551d6b-755d-4757-b956-536f674975c0"
+        headers = {
+            "content-type": "application/json",
+            "host": "service.region.example.com",
+            "x-sdk-date": "20191115T033655Z",
+        }
+        body_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
+
+        auth_header_template = (
+            "SDK-HMAC-SHA256 Access=your_access_key_id, " "SignedHeaders={SignedHeaders}, Signature={Signature}"
+        )
+        signing_string_template = "SDK-HMAC-SHA256\n20191115T033655Z\n{HashedCanonicalRequest}"
+
+        result = hmac_sha256_authorization(
+            secret_key=secret_key,
+            method=method,
+            path=path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            authorization_format=auth_header_template,
+            signing_string_format=signing_string_template,
+        )
+
+        # 严格验证华为云官方示例的完整授权头 - 精确匹配
+        expected_result = (
+            "SDK-HMAC-SHA256 Access=your_access_key_id, "
+            "SignedHeaders=content-type;host;x-sdk-date, "
+            "Signature=8037a231336d8904f667c082dd84fc06d7e6c7c278c2d8599db31d14e8ee19f9"
+        )
+        self.assertEqual(result, expected_result)
+
+    def test_tencent_cloud_official_example(self):
+        """测试腾讯云官方文档示例 - TC3-HMAC-SHA256签名算法"""
+        # 使用腾讯云官方文档中的固定参数示例
+        # 来源: https://cloud.tencent.com/document/api/213/30654
+        # 注意:腾讯云使用派生密钥,这里模拟最终的签名密钥
+        # Python 2/3 compatible hex to bytes conversion
+        hex_string = "b596b923aad85185e2d1f6659d2a062e0a86731226e021e61bfe06f7ed05f5af"
+        try:
+            final_signing_key = bytes.fromhex(hex_string)
+        except AttributeError:  # Python 2.7
+            final_signing_key = hex_string.decode("hex")  # type: ignore[attr-defined]
+        method = "POST"
+        path = "/"
+        query = ""  # POST请求,查询字符串为空
+        headers = {
+            "content-type": "application/json; charset=utf-8",
+            "host": "cvm.tencentcloudapi.com",
+            "x-tc-action": "describeinstances",
+        }
+        # 官方示例中的请求体
+        # body = '{"Limit": 1, "Filters": [{"Values": ["\\u672a\\u547d\\u540d"], "Name": "instance-name"}]}'
+        body_hash = "35e9c5b0e3ae67532d3c9f17ead6c90222632e5b1ff7f6e89887f1398934f064"
+
+        auth_header_template = (
+            "TC3-HMAC-SHA256 Credential=AKID********************************/2019-02-25/cvm/tc3_request, "
+            "SignedHeaders={SignedHeaders}, Signature={Signature}"
+        )
+        signing_string_template = "TC3-HMAC-SHA256\n1551113065\n2019-02-25/cvm/tc3_request\n{HashedCanonicalRequest}"
+
+        result = hmac_sha256_authorization(
+            secret_key=final_signing_key,
+            method=method,
+            path=path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            authorization_format=auth_header_template,
+            signing_string_format=signing_string_template,
+        )
+
+        # 严格验证腾讯云官方示例的完整授权头 - 精确匹配
+        expected_result = (
+            "TC3-HMAC-SHA256 Credential=AKID********************************/2019-02-25/cvm/tc3_request, "
+            "SignedHeaders=content-type;host;x-tc-action, "
+            "Signature=10b1a37a7301a02ca19a647ad722d5e43b4b3cff309d421d85b46093f6ab6c4f"
+        )
+        self.assertEqual(result, expected_result)
+
+    def test_edge_cases(self):
+        """测试边界情况"""
+
+        # 测试空字符串参数 - 严格验证
+        result1 = hmac_sha256_authorization(
+            secret_key="key",
+            method="GET",
+            path="",
+            query="",
+            headers={},
+            body_hash=sha256_hash(""),
+            authorization_format="AUTH {Signature}",
+            signing_string_format="{HashedCanonicalRequest}",
+        )
+        expected_result1 = "AUTH 1d29fda5ce641f10c7e16b1e607ce10f1cad4fd4b071f1b3a4465e051b9a7d6d"
+        self.assertEqual(result1, expected_result1)
+
+        # 测试包含特殊字符的参数 - 严格验证
+        result2 = hmac_sha256_authorization(
+            secret_key="special!@#$%^&*()key",
+            method="POST",
+            path="/path/with spaces",
+            query="param1=value with spaces&param2=value%20encoded",
+            headers={"Custom-Header": "value with spaces and symbols!@#"},
+            body_hash=sha256_hash("body with 中文 and symbols"),
+            authorization_format="SPECIAL {SignedHeaders} {Signature}",
+            signing_string_format="SPECIAL\n{HashedCanonicalRequest}",
+        )
+        expected_result2 = "SPECIAL custom-header edbf4d68ebb33f305e8d0e2f72c012997543a0bdc6a9f42142d1dfa236fa1dd5"
+        self.assertEqual(result2, expected_result2)
+
+    def test_header_normalization(self):
+        """测试头部规范化处理"""
+        secret_key = "test_key"
+        method = "GET"
+        path = "/test"
+        query = ""
+        body_hash = sha256_hash("")
+
+        # 测试头部大小写混合和值的前后空格处理
+        headers = {
+            "Host": "  example.com  ",  # 前后有空格
+            "X-Custom-Header": "value",
+            "x-another-header": "another_value",
+            "CONTENT-TYPE": "application/json",
+        }
+
+        auth_header_template = "TEST {SignedHeaders} {Signature}"
+        signing_string_template = "{HashedCanonicalRequest}"
+
+        result = hmac_sha256_authorization(
+            secret_key=secret_key,
+            method=method,
+            path=path,
+            query=query,
+            headers=headers,
+            body_hash=body_hash,
+            authorization_format=auth_header_template,
+            signing_string_format=signing_string_template,
+        )
+
+        # 验证头部已被正确规范化(小写且按字母顺序排列)
+        # 模板格式是 "TEST {SignedHeaders} {Signature}",所以直接检查signed headers部分
+        self.assertIn("content-type;host;x-another-header;x-custom-header", result)
+
+    def test_reproducible_signatures(self):
+        """测试签名结果的可复现性"""
+        params = {
+            "secret_key": "reproducible_test_key",
+            "method": "POST",
+            "path": "/api/v1/test",
+            "query": "param1=value1&param2=value2",
+            "headers": {"Host": "api.test.com", "Content-Type": "application/json", "Date": "20231201T120000Z"},
+            "body_hash": sha256_hash('{"test": "data"}'),
+            "authorization_format": "REPRO {SignedHeaders} {Signature}",
+            "signing_string_format": "REPRO\n{HashedCanonicalRequest}",
+        }
+
+        # 多次调用应该产生相同的结果
+        result1 = hmac_sha256_authorization(**params)
+        result2 = hmac_sha256_authorization(**params)
+        result3 = hmac_sha256_authorization(**params)
+
+        self.assertEqual(result1, result2)
+        self.assertEqual(result2, result3)
+
+        # 验证结果包含预期的组件
+        self.assertIn("REPRO", result1)
+        self.assertIn("content-type;date;host", result1)
+
+        # 提取签名部分进行验证 - 格式是 "REPRO {SignedHeaders} {Signature}"
+        parts = result1.split()
+        self.assertEqual(len(parts), 3)  # "REPRO", signed_headers, signature
+        signature_part = parts[2]
+        self.assertEqual(len(signature_part), 64)  # SHA256签名应该是64个十六进制字符
+        self.assertTrue(all(c in "0123456789abcdef" for c in signature_part))
+
+    def test_different_key_types(self):
+        """测试不同类型的密钥输入"""
+        base_params = {
+            "method": "GET",
+            "path": "/test",
+            "query": "",
+            "headers": {"Host": "example.com"},
+            "body_hash": sha256_hash(""),
+            "authorization_format": "TYPE {Signature}",
+            "signing_string_format": "{HashedCanonicalRequest}",
+        }
+
+        # 字符串密钥
+        result_str = hmac_sha256_authorization(secret_key="string_key", **base_params)
+
+        # 字节密钥
+        result_bytes = hmac_sha256_authorization(secret_key=b"string_key", **base_params)
+
+        # 相同内容的字符串和字节密钥应该产生相同的签名
+        self.assertEqual(result_str, result_bytes)
+
+    def test_hmac_sha256_basic_functionality(self):
+        """测试 hmac_sha256 基础功能"""
+        key = "test_key"
+        message = "test_message"
+
+        # 测试返回的是 HMAC 对象
+        hmac_obj = hmac_sha256(key, message)
+
+        # 验证可以调用 digest() 和 hexdigest() 方法
+        digest_result = hmac_obj.digest()
+        hexdigest_result = hmac_obj.hexdigest()
+
+        # 验证结果类型
+        self.assertIsInstance(digest_result, bytes)
+        self.assertIsInstance(hexdigest_result, str)
+
+        # 验证 hexdigest 结果长度 (SHA256 产生64个十六进制字符)
+        self.assertEqual(len(hexdigest_result), 64)
+
+        # 验证结果的可复现性
+        hmac_obj2 = hmac_sha256(key, message)
+        self.assertEqual(hmac_obj.hexdigest(), hmac_obj2.hexdigest())
+
+    def test_hmac_sha256_different_key_types(self):
+        """测试 hmac_sha256 不同密钥类型"""
+        message = "test_message"
+
+        # 字符串密钥
+        hmac_str = hmac_sha256("test_key", message)
+
+        # 字节密钥
+        hmac_bytes = hmac_sha256(b"test_key", message)
+
+        # 相同内容的字符串和字节密钥应该产生相同的结果
+        self.assertEqual(hmac_str.hexdigest(), hmac_bytes.hexdigest())
+
+    def test_hmac_sha256_different_message_types(self):
+        """测试 hmac_sha256 不同消息类型"""
+        key = "test_key"
+
+        # 字符串消息
+        hmac_str = hmac_sha256(key, "test_message")
+
+        # 字节消息
+        hmac_bytes = hmac_sha256(key, b"test_message")
+
+        # 相同内容的字符串和字节消息应该产生相同的结果
+        self.assertEqual(hmac_str.hexdigest(), hmac_bytes.hexdigest())
+
+    def test_hmac_sha256_known_vector(self):
+        """测试 hmac_sha256 已知测试向量"""
+        # 使用已知的测试向量验证实现正确性
+        key = "key"
+        message = "The quick brown fox jumps over the lazy dog"
+
+        hmac_obj = hmac_sha256(key, message)
+        result = hmac_obj.hexdigest()
+
+        # 预期结果(可以通过其他实现验证)
+        expected = "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
+        self.assertEqual(result, expected)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 0 - 28
tests/test_provider_alidns.py

@@ -31,34 +31,6 @@ class TestAlidnsProvider(BaseProviderTestCase):
         self.assertEqual(provider.auth_token, self.auth_token)
         self.assertEqual(provider.API, "https://alidns.aliyuncs.com")
 
-    def test_signature_v3_generation(self):
-        """Test _signature_v3 method generates correct v3 signature format"""
-        provider = AlidnsProvider(self.auth_id, self.auth_token)
-
-        # Test headers for v3 signature
-        headers = {
-            "host": "alidns.aliyuncs.com",
-            "content-type": "application/x-www-form-urlencoded",
-            "x-acs-action": "TestAction",
-            "x-acs-content-sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
-            "x-acs-date": "2023-01-01T12:00:00Z",
-            "x-acs-signature-nonce": "23456789",
-            "x-acs-version": "2015-01-09",
-        }
-
-        body_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
-        authorization = provider._signature_v3("POST", "/", headers, body_hash=body_hash)
-
-        # Verify v3 authorization header format - only test the structure, not exact values
-        self.assertTrue(authorization.startswith("ACS3-HMAC-SHA256 Credential="))
-        self.assertIn("Credential={}".format(self.auth_id), authorization)
-        self.assertIn("SignedHeaders=", authorization)
-        self.assertIn("Signature=", authorization)
-        # Verify all headers are included in signed headers
-        self.assertIn("content-type", authorization)
-        self.assertIn("host", authorization)
-        self.assertIn("x-acs-action", authorization)
-
     def test_request_basic(self):
         """Test _request method with basic parameters"""
         provider = AlidnsProvider(self.auth_id, self.auth_token)

+ 40 - 85
tests/test_provider_callback.py

@@ -5,8 +5,11 @@ Unit tests for CallbackProvider
 @author: GitHub Copilot
 """
 
+import os
 import ssl
 import logging
+import random
+from time import sleep
 from base_test import BaseProviderTestCase, unittest, patch
 from ddns.provider.callback import CallbackProvider
 
@@ -284,6 +287,15 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
         mock_logger.setLevel(logging.INFO)
         return mock_logger
 
+    def _random_delay(self):
+        """Add a random delay of 0-3 seconds to avoid rate limiting"""
+        if os.environ.get("CI") or os.environ.get("GITHUB_ACTIONS") or os.environ.get("GITHUB_REF_NAME"):
+            # In CI environments, use a shorter delay to speed up tests
+            delay = random.uniform(0, 3)
+        else:
+            delay = random.uniform(0, 1)
+        sleep(delay)
+
     def _assert_callback_result_logged(self, mock_logger, *expected_strings):
         """
         Helper to assert that 'Callback result: %s' was logged with expected content.
@@ -303,19 +315,17 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
 
     def test_real_callback_get_method(self):
         """Test real callback using GET method with httpbin.org and verify logger calls"""
-        # Use httpbin.org/get endpoint for GET requests
         auth_id = "https://httpbin.org/get?domain=__DOMAIN__&ip=__IP__&record_type=__RECORDTYPE__"
-        provider = CallbackProvider(auth_id, "")
+        domain = "test.example.com"
+        ip = "111.111.111.111"
 
-        # Setup provider with mock logger
+        provider = CallbackProvider(auth_id, "")
         mock_logger = self._setup_provider_with_mock_logger(provider)
 
-        result = provider.set_record("test.example.com", "111.111.111.111", "A")
-        # httpbin.org returns JSON with our parameters, so it should be truthy
+        self._random_delay()  # Add random delay before real request
+        result = provider.set_record(domain, ip, "A")
         self.assertTrue(result)
-
-        # Verify that logger.info was called with response containing domain and IP
-        self._assert_callback_result_logged(mock_logger, "test.example.com", "111.111.111.111")
+        self._assert_callback_result_logged(mock_logger, domain, ip)
 
     def test_real_callback_post_method_with_json(self):
         """Test real callback using POST method with JSON data and verify logger calls"""
@@ -326,6 +336,7 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
         # Setup provider with mock logger
         mock_logger = self._setup_provider_with_mock_logger(provider)
 
+        self._random_delay()  # Add random delay before real request
         result = provider.set_record("test.example.com", "203.0.113.2", "A", 300)
         # httpbin.org returns JSON with our posted data, so it should be truthy
         self.assertTrue(result)
@@ -339,79 +350,44 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
         auth_id = "https://httpbin.org/status/500"  # This returns HTTP 500
         provider = CallbackProvider(auth_id, "")
 
+        self._random_delay()  # Add random delay before real request
         result = provider.set_record("test.example.com", "203.0.113.5")
         self.assertFalse(result)
 
     def test_real_callback_redirects_handling(self):
-        """Test real callback with HTTP redirects and verify logger calls"""
-        # Use httpbin.org redirect endpoint
+        """Test real callback with various HTTP redirect scenarios and verify logger calls"""
+        # Test simple redirect
         auth_id = "https://httpbin.org/redirect-to?url=https://httpbin.org/get&domain=__DOMAIN__&ip=__IP__"
-        provider = CallbackProvider(auth_id, "")
+        domain = "redirect.test.example.com"
+        ip = "203.0.113.21"
 
+        provider = CallbackProvider(auth_id, "")
         try:
-            # Setup provider with mock logger
             mock_logger = self._setup_provider_with_mock_logger(provider)
-
-            result = provider.set_record("redirect.test.example.com", "203.0.113.21", "A")
-            # Should follow redirects and succeed
+            self._random_delay()  # Add random delay before real request
+            result = provider.set_record(domain, ip, "A")
             self.assertTrue(result)
-
-            # Verify that logger.info was called with response containing domain and IP
-            self._assert_callback_result_logged(mock_logger, "redirect.test.example.com", "203.0.113.21")
+            self._assert_callback_result_logged(mock_logger, domain, ip)
 
         except Exception as e:
             error_str = str(e).lower()
-            if "certificate verify failed" in error_str and "basic constraints" in error_str:
-                self.skipTest("SSL Basic Constraints issue (common in test environments): {}".format(e))
-            elif "ssl" in error_str or "certificate" in error_str:
-                self.skipTest("SSL-related issue: {}".format(e))
-
-    def test_real_callback_simple_http_endpoint(self):
-        """Test with a simple endpoint that doesn't require special headers and verify logger calls"""
-        # Use a very simple endpoint that usually has good SSL
-        auth_id = "http://httpbin.org/get?domain=__DOMAIN__&ip=__IP__"
-        provider = CallbackProvider(auth_id, "")
-        # Setup provider with mock logger
-        mock_logger = self._setup_provider_with_mock_logger(provider)
-
-        result = provider.set_record("httpstat.test.example.com", "111.111.111.111", "A")
-        # httpstat.us returns simple status messages, should be truthy
-        self.assertTrue(result)
-
-        # Verify that logger.info was called with the successful result
-        self._assert_callback_result_logged(mock_logger, "httpstat.test.example.com", "111.111.111.111")
-
-    def test_real_callback_redirect_following(self):
-        """Test real callback with HTTP redirects using the improved _send_request method and verify logger calls"""
-        # Use httpbin.org redirect endpoint that returns 302
-        auth_id = "https://httpbin.org/redirect-to?url=https://httpbin.org/get&domain=__DOMAIN__&ip=__IP__"
-        provider = CallbackProvider(auth_id, "")
-
-        # Setup provider with mock logger
-        mock_logger = self._setup_provider_with_mock_logger(provider)
-
-        result = provider.set_record("redirect.follow.test.com", "203.0.113.30", "A")
-        self.assertTrue(result)
+            if "ssl" in error_str or "certificate" in error_str:
+                self.skipTest("SSL certificate issue: {}".format(e))
 
-        # Verify that logger.info was called with the final response after redirection
-        self._assert_callback_result_logged(mock_logger, "redirect.follow.test.com", "203.0.113.30")
+    def test_real_callback_redirects_handling_relative(self):
+        """Test real callback with relative redirect scenarios and verify logger calls"""
+        # Test relative redirect
+        auth_id = "https://httpbin.org/relative-redirect/1?domain=__DOMAIN__&ip=__IP__"
+        domain = "relative-redirect.example.com"
+        ip = "203.0.113.203"
 
-    def test_real_callback_multiple_redirects(self):
-        """Test callback with multiple consecutive redirects and verify logger calls"""
-        # Test with 2 consecutive redirects: httpbin.org/redirect/2
-        auth_id = "https://httpbin.org/redirect/2?domain=__DOMAIN__&ip=__IP__"
         provider = CallbackProvider(auth_id, "")
-
         try:
-            # Setup provider with mock logger
             mock_logger = self._setup_provider_with_mock_logger(provider)
-
-            result = provider.set_record("multi-redirect.example.com", "203.0.113.201", "A")
-            # Should follow multiple redirects and succeed
+            self._random_delay()  # Add random delay before real request
+            result = provider.set_record(domain, ip, "A")
             self.assertTrue(result)
-
-            # Verify that logger.info was called with response containing domain and IP
-            self._assert_callback_result_logged(mock_logger, "multi-redirect.example.com", "203.0.113.201")
+            self._assert_callback_result_logged(mock_logger, domain, ip)
 
         except Exception as e:
             error_str = str(e).lower()
@@ -429,6 +405,7 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
             # Setup provider with mock logger
             mock_logger = self._setup_provider_with_mock_logger(provider)
 
+            self._random_delay()  # Add random delay before real request
             result = provider.set_record("post-redirect.example.com", "203.0.113.202", "A")
             # POST should be redirected as GET and succeed
             self.assertTrue(result)
@@ -441,28 +418,6 @@ class TestCallbackProviderRealIntegration(BaseProviderTestCase):
             if "ssl" in error_str or "certificate" in error_str:
                 self.skipTest("SSL certificate issue: {}".format(e))
 
-    def test_real_callback_absolute_vs_relative_redirects(self):
-        """Test both absolute and relative URL redirects and verify logger calls"""
-        # Test relative redirect (should work with improved _send_request)
-        auth_id = "https://httpbin.org/relative-redirect/1?domain=__DOMAIN__&ip=__IP__"
-        provider = CallbackProvider(auth_id, "")
-
-        try:
-            # Setup provider with mock logger
-            mock_logger = self._setup_provider_with_mock_logger(provider)
-
-            result = provider.set_record("relative-redirect.example.com", "203.0.113.203", "A")
-            # Should handle relative redirects correctly
-            self.assertTrue(result)
-
-            # Verify that logger.info was called with response containing domain and IP
-            self._assert_callback_result_logged(mock_logger, "relative-redirect.example.com", "203.0.113.203")
-
-        except Exception as e:
-            error_str = str(e).lower()
-            if "ssl" in error_str or "certificate" in error_str:
-                self.skipTest("SSL certificate issue: {}".format(e))
-
 
 if __name__ == "__main__":
     unittest.main()

+ 0 - 26
tests/test_provider_huaweidns.py

@@ -42,32 +42,6 @@ class TestHuaweiDNSProvider(BaseProviderTestCase):
         self.assertEqual(self.provider.auth_token, self.auth_token)
         self.assertEqual(self.provider.API, "https://dns.myhuaweicloud.com")
 
-    def test_hex_encode_sha256(self):
-        """Test _hex_encode_sha256 method"""
-        test_data = b"test data"
-        result = self.provider._hex_encode_sha256(test_data)
-
-        # Should return a 64-character hex string (SHA256)
-        self.assertEqual(len(result), 64)
-        self.assertIsInstance(result, str)
-        # SHA256 of "test data"
-        expected_hash = "916f0027a575074ce72a331777c3478d6513f786a591bd892da1a577bf2335f9"
-        self.assertEqual(result, expected_hash)
-
-    def test_sign_headers(self):
-        """Test _sign_headers method"""
-        headers = {
-            "Content-Type": "application/json",
-            "Host": "dns.myhuaweicloud.com",
-            "X-Sdk-Date": "20230101T000000Z",
-        }
-        signed_headers = ["content-type", "host", "x-sdk-date"]
-
-        result = self.provider._sign_headers(headers, signed_headers)
-
-        expected = "content-type:application/json\nhost:dns.myhuaweicloud.com\nx-sdk-date:20230101T000000Z\n"
-        self.assertEqual(result, expected)
-
     def test_request_get_method(self):
         """Test _request method with GET method"""
         with patch.object(self.provider, "_http") as mock_http:

+ 0 - 40
tests/test_provider_tencentcloud.py

@@ -44,29 +44,6 @@ class TestTencentCloudProvider(BaseProviderTestCase):
             TencentCloudProvider(self.auth_id, "", self.logger)
         self.assertIn("token", str(context.exception))
 
-    @patch("ddns.provider.tencentcloud.strftime")
-    @patch("ddns.provider.tencentcloud.time")
-    @patch.object(TencentCloudProvider, "_http")
-    def test_sign_tc3(self, mock_http, mock_time, mock_strftime):
-        """Test TC3 signature generation"""
-        mock_time.return_value = 1609459200  # 2021-01-01
-        mock_strftime.return_value = "2021-01-01"
-
-        self.provider._request("DescribeDomains")
-
-        self.assertTrue(mock_http.called)
-        call_args = mock_http.call_args[1]
-        headers = call_args.get("headers", {})
-        authorization = headers.get("authorization")
-
-        self.assertIn("TC3-HMAC-SHA256", authorization)
-        self.assertIn("Credential=test_id/2021-01-01/dnspod/tc3_request", authorization)
-        self.assertIn("SignedHeaders=", authorization)
-        self.assertIn("content-type", authorization)
-        self.assertIn("host", authorization)
-        self.assertIn("Signature=", authorization)
-        self.assertIn(self.auth_id, authorization)
-
     @patch.object(TencentCloudProvider, "_http")
     def test_query_zone_id_success(self, mock_http):
         """Test successful zone ID query"""
@@ -370,23 +347,6 @@ class TestTencentCloudProvider(BaseProviderTestCase):
         self.assertTrue(result)
         self.assertEqual(mock_http.call_count, 3)
 
-    @patch("ddns.provider.tencentcloud.strftime")
-    def test_sign_tc3_date_format(self, mock_strftime):
-        """Test that the TC3 signature uses the current date in credential scope"""
-        mock_strftime.return_value = "20210323"  # Mock strftime to return a specific date
-
-        method = "POST"
-        uri = "/"
-        query = ""
-        headers = {"content-type": "application/json", "host": "dnspod.tencentcloudapi.com"}
-        payload = "{}"
-        timestamp = 1609459200  # 2021-01-01
-
-        authorization = self.provider._sign_tc3(method, uri, query, headers, payload, timestamp)
-
-        # Check that the mocked date is used in the credential scope
-        self.assertIn("20210323/dnspod/tc3_request", authorization)
-
 
 class TestTencentCloudProviderIntegration(BaseProviderTestCase):
     """Integration tests for TencentCloudProvider"""