Browse Source

chore(lint): replace flake8 and black with ruff for unified linting and formatting (#544)

* Initial plan

* feat: replace flake8 and black with ruff for unified linting and formatting

Co-authored-by: NewFuture <[email protected]>

* refactor: use astral-sh/ruff-action@v3 for cleaner CI workflow

Co-authored-by: NewFuture <[email protected]>

* run format

* use fmt skip

* fix cloudflare

* fix cron

* fix cron

* fix fileio tests

* fix fileio tests

* fix cron

---------

Co-authored-by: copilot-swe-agent[bot] <[email protected]>
Co-authored-by: NewFuture <[email protected]>
Co-authored-by: New Future <[email protected]>
Copilot 5 months ago
parent
commit
f7a92b3903
57 changed files with 365 additions and 610 deletions
  1. 2 2
      .github/copilot-instructions.md
  2. 8 0
      .github/instructions/python.instructions.md
  3. 12 12
      .github/patch.py
  4. 3 7
      .github/workflows/build.yml
  5. 1 1
      .vscode/extensions.json
  6. 17 42
      .vscode/settings.json
  7. 1 0
      ddns/__builtins__.pyi
  8. 3 7
      ddns/config/__init__.py
  9. 2 6
      ddns/config/cli.py
  10. 1 0
      ddns/config/config.py
  11. 1 0
      ddns/config/env.py
  12. 1 0
      ddns/config/file.py
  13. 0 1
      ddns/ip.py
  14. 5 4
      ddns/provider/alidns.py
  15. 5 4
      ddns/provider/aliesa.py
  16. 1 0
      ddns/provider/callback.py
  17. 2 2
      ddns/provider/cloudflare.py
  18. 0 1
      ddns/provider/debug.py
  19. 3 2
      ddns/provider/dnscom.py
  20. 1 0
      ddns/provider/edgeone.py
  21. 5 4
      ddns/provider/huaweidns.py
  22. 9 10
      ddns/provider/tencentcloud.py
  23. 5 5
      ddns/scheduler/__init__.py
  24. 8 8
      ddns/scheduler/cron.py
  25. 30 16
      ddns/scheduler/launchd.py
  26. 14 17
      ddns/scheduler/schtasks.py
  27. 3 10
      ddns/scheduler/systemd.py
  28. 0 112
      ddns/scheduler/windows.py
  29. 1 4
      ddns/util/http.py
  30. 45 43
      pyproject.toml
  31. 1 0
      tests/base_test.py
  32. 3 18
      tests/test_config_cli.py
  33. 1 0
      tests/test_config_cli_task.py
  34. 1 4
      tests/test_config_config.py
  35. 3 13
      tests/test_config_env.py
  36. 1 0
      tests/test_config_file.py
  37. 3 12
      tests/test_config_file_remote.py
  38. 1 1
      tests/test_config_init_multi.py
  39. 1 0
      tests/test_config_schema_v4_1.py
  40. 2 2
      tests/test_provider__signature.py
  41. 3 20
      tests/test_provider_alidns.py
  42. 4 27
      tests/test_provider_aliesa.py
  43. 3 7
      tests/test_provider_cloudflare.py
  44. 0 3
      tests/test_provider_dnscom.py
  45. 10 10
      tests/test_provider_edgeone.py
  46. 1 7
      tests/test_provider_namesilo.py
  47. 3 14
      tests/test_provider_tencentcloud.py
  48. 3 2
      tests/test_scheduler_base.py
  49. 58 50
      tests/test_scheduler_cron.py
  50. 9 8
      tests/test_scheduler_init.py
  51. 28 28
      tests/test_scheduler_launchd.py
  52. 11 13
      tests/test_scheduler_schtasks.py
  53. 12 11
      tests/test_scheduler_systemd.py
  54. 1 0
      tests/test_util_comment.py
  55. 11 30
      tests/test_util_fileio.py
  56. 1 6
      tests/test_util_http.py
  57. 1 4
      tests/test_util_http_retry.py

+ 2 - 2
.github/copilot-instructions.md

@@ -7,11 +7,11 @@ This is a Python-based Dynamic DNS (DDNS) client that automatically updates DNS
 - Use only Python standard library modules (no external dependencies)
 - Ensure Python 2.7 and 3.x compatibility
 - Run tests before committing to ensure all functionality works correctly
-- Check the linting and formatting using `flake8` and `black`
+- Check the linting and formatting using `ruff`
 
 ### Development Flow
 - Test: `python -m unittest discover tests` or `python -m pytest tests/`
-- Format: Use black and flake8 for code formatting
+- Format: Use ruff for code formatting and linting
 
 ### Add a New DNS Provider
 

+ 8 - 0
.github/instructions/python.instructions.md

@@ -27,6 +27,14 @@ applyTo: '**/*.py'
   - NO f-strings (not supported in Python 2.7)
   - NO `async`/`await` syntax
   - Avoid Python 3.6+ exclusive features when compatibility is needed
+- **Unicode String Handling**:
+    - **IMPORTANT**: Ruff formatter automatically converts `u"string"` to `"string"`
+    - In Python 2, `u"\n"` and `"\n"` are different types (unicode vs bytes)
+    - Prefer `# fmt: skip` on the exact line that must preserve a Unicode literal
+    - Example:
+        ```python
+        unicode_string = u"hello\nworld"  # fmt: skip
+        ```
 
 ## Project Architecture
 

+ 12 - 12
.github/patch.py

@@ -249,47 +249,47 @@ def replace_readme_links_for_release(version):
     # 替换各种链接中的 latest 为具体版本
     # GitHub releases download links
     content = re.sub(
-        r'https://github\.com/NewFuture/DDNS/releases/latest/download/',
-        f'https://github.com/NewFuture/DDNS/releases/download/{version}/',
+        r"https://github\.com/NewFuture/DDNS/releases/latest/download/",
+        f"https://github.com/NewFuture/DDNS/releases/download/{version}/",
         content,
     )
 
     # GitHub releases latest links
     content = re.sub(
-        r'https://github\.com/NewFuture/DDNS/releases/latest',
-        f'https://github.com/NewFuture/DDNS/releases/tag/{version}',
+        r"https://github\.com/NewFuture/DDNS/releases/latest",
+        f"https://github.com/NewFuture/DDNS/releases/tag/{version}",
         content,
     )
 
     # Docker tags from latest to version
-    content = re.sub(r'docker pull ([^:\s]+):latest', f'docker pull \\1:{version}', content)
+    content = re.sub(r"docker pull ([^:\s]+):latest", f"docker pull \\1:{version}", content)
 
     # PyPI version-specific links
     content = re.sub(
-        r'https://pypi\.org/project/ddns(?:/latest)?(?=[\s\)])', f'https://pypi.org/project/ddns/{version}', content
+        r"https://pypi\.org/project/ddns(?:/latest)?(?=[\s\)])", f"https://pypi.org/project/ddns/{version}", content
     )
 
     # Shield.io badges - Docker version
     content = re.sub(
-        r'https://img\.shields\.io/docker/v/newfuture/ddns/latest',
-        f'https://img.shields.io/docker/v/newfuture/ddns/{version}',
+        r"https://img\.shields\.io/docker/v/newfuture/ddns/latest",
+        f"https://img.shields.io/docker/v/newfuture/ddns/{version}",
         content,
     )
 
     # Shield.io badges - PyPI version (add version if not present)
     content = re.sub(
-        r'https://img\.shields\.io/pypi/v/ddns(?!\?)', f'https://img.shields.io/pypi/v/ddns/{version}', content
+        r"https://img\.shields\.io/pypi/v/ddns(?!\?)", f"https://img.shields.io/pypi/v/ddns/{version}", content
     )
 
     # GitHub archive links
     content = re.sub(
-        r'https://github\.com/NewFuture/DDNS/archive/refs/tags/latest\.(zip|tar\.gz)',
-        f'https://github.com/NewFuture/DDNS/archive/refs/tags/{version}.\\1',
+        r"https://github\.com/NewFuture/DDNS/archive/refs/tags/latest\.(zip|tar\.gz)",
+        f"https://github.com/NewFuture/DDNS/archive/refs/tags/{version}.\\1",
         content,
     )
 
     # PIP install commands
-    content = re.sub(r'pip install ddns(?!=)', f'pip install ddns=={version}', content)
+    content = re.sub(r"pip install ddns(?!=)", f"pip install ddns=={version}", content)
 
     with open(readme_path, "w", encoding="utf-8") as f:
         f.write(content)

+ 3 - 7
.github/workflows/build.yml

@@ -25,15 +25,11 @@ jobs:
     timeout-minutes: 3
     steps:
       - uses: actions/checkout@v4
-      - uses: actions/setup-python@v5
+      - uses: astral-sh/ruff-action@v3
         with:
-          python-version: "3.x"
-      - run: pip install flake8
+          src: "."
+          args: "check --statistics --output-format=github"
 
-      - name: check Python syntax errors or undefined names
-        run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
-      - name: check complexity and length # the GitHub editor is 127 chars wide
-        run: flake8 . --count --max-complexity=12 --max-line-length=127 --statistics
   python:
     strategy:
       fail-fast: false

+ 1 - 1
.vscode/extensions.json

@@ -4,7 +4,7 @@
         "github.vscode-pull-request-github",
         "ms-python.python",
         "ms-python.vscode-pylance",
-        "ms-python.black-formatter",
+        "charliermarsh.ruff",
         "ms-azuretools.vscode-containers"
     ]
 }

+ 17 - 42
.vscode/settings.json

@@ -4,7 +4,6 @@
     "python.analysis.completeFunctionParens": true,
     "python.analysis.supportAllPythonDocuments": true,
     "python.analysis.generateWithTypeAnnotation": true,
-    "python.analysis.diagnosticMode": "workspace",
     "python.analysis.indexing": true,
     "python.analysis.aiCodeActions": {
         "implementAbstractClasses": true,
@@ -16,47 +15,6 @@
         "addMissingFunctionOverloads": true,
         "addMissingFunctionOverloadsWithReturnType": true
     },
-    "editor.formatOnSaveMode": "file",
-    "python.analysis.fixAll": [
-        "source.convertImportFormat",
-    ],
-    "python.formatting.provider": "black",
-    "editor.formatOnSave": true,
-    "flake8.enabled": true,
-    "editor.bracketPairColorization.enabled": true,
-    "flake8.args": [
-        "--max-line-length=120"
-    ],
-    "github.copilot.chat.agent.terminal.allowList": {
-        "python": true,
-        "python -c": true,
-        "python -m": true,
-        "python -m unittest": true,
-        "python3": true,
-        "python3 -c": true,
-        "python3 -m": true,
-        "python3 -m unittest": true,
-        "echo": true,
-        "cd": true,
-        "ls": true,
-        "dir": true,
-        "git": true,
-        "git status": true,
-        "rm": true,
-        "del": true,
-        "delete": true,
-        "mv": true,
-        "move": true,
-    },
-    "github.copilot.chat.agent.terminal.denyList": {
-        "rm -rf": true,
-        "del /f /s /q": true,
-        "git reset --hard": true,
-        "git clean -fdx": true,
-        "git checkout -- .": true,
-        "sudo": true
-    },
-    "chat.agent.maxRequests": 64,
     "python.testing.unittestArgs": [
         "-v",
         "-s",
@@ -66,4 +24,21 @@
     ],
     "python.testing.pytestEnabled": true,
     "python.testing.unittestEnabled": true,
+    "[python]": {
+        "editor.defaultFormatter": "charliermarsh.ruff",
+        "editor.codeActionsOnSave": {
+            "source.organizeImports.ruff": "explicit",
+            "source.fixAll.ruff": "explicit"
+        }
+    },
+    "editor.formatOnSave": true,
+    "editor.formatOnSaveMode": "modificationsIfAvailable",
+    "editor.codeActionsOnSave": {},
+    "editor.bracketPairColorization.enabled": true,
+    "editor.rulers": [
+        80,
+        100,
+        120
+    ],
+    "chat.agent.maxRequests": 64,
 }

+ 1 - 0
ddns/__builtins__.pyi

@@ -1,5 +1,6 @@
 # coding=utf-8
 # flake8: noqa: F401
+# ruff: noqa: F403
 from typing import *
 from .provider import SimpleProvider
 import logging

+ 3 - 7
ddns/config/__init__.py

@@ -7,6 +7,7 @@ JSON configuration files, and environment variables.
 
 @author: NewFuture
 """
+
 import os
 import sys
 import logging
@@ -116,9 +117,7 @@ ddns [v{version}@{date}]
 (i) homepage or docs [文档主页]: https://ddns.newfuture.cc/
 (?) issues or bugs [问题和反馈]: https://github.com/NewFuture/DDNS/issues
 Copyright (c) NewFuture (MIT License)
-""".format(
-        version=version, date=date
-    )
+""".format(version=version, date=date)
     # Load CLI configuration first
     cli_config = load_cli_config(description, doc, version, date)
     env_config = load_env_config()
@@ -169,7 +168,4 @@ Copyright (c) NewFuture (MIT License)
     return configs
 
 
-__all__ = [
-    "load_configs",
-    "Config",
-]
+__all__ = ["load_configs", "Config"]

+ 2 - 6
ddns/config/cli.py

@@ -3,6 +3,7 @@
 Configuration loader for DDNS command-line interface.
 @author: NewFuture
 """
+
 import sys
 import platform
 from argparse import Action, ArgumentParser, RawTextHelpFormatter, SUPPRESS
@@ -74,7 +75,6 @@ class NewConfigAction(Action):
     """生成配置文件并退出程序"""
 
     def __call__(self, parser, namespace, values, option_string=None):
-
         # 获取配置文件路径
         if values and values != "true":
             config_path = str(values)  # type: str
@@ -161,11 +161,7 @@ def _add_ddns_args(arg):  # type: (ArgumentParser) -> None
         "--cache", type=str_bool, nargs="?", const=True, help="set cache [启用缓存开关,或传入保存路径]"
     )
     advanced.add_argument(
-        "--no-cache",
-        dest="cache",
-        action="store_const",
-        const=False,
-        help="disable cache [关闭缓存等效 --cache=false]",
+        "--no-cache", dest="cache", action="store_const", const=False, help="disable cache [关闭缓存等效 --cache=false]"
     )
     advanced.add_argument(
         "--ssl",

+ 1 - 0
ddns/config/config.py

@@ -3,6 +3,7 @@
 Configuration class merged from CLI, JSON, and environment variables.
 @author: NewFuture
 """
+
 from hashlib import md5
 from .cli import str_bool, log_level as get_log_level
 

+ 1 - 0
ddns/config/env.py

@@ -3,6 +3,7 @@
 Configuration loader for DDNS environment variables.
 @author: NewFuture
 """
+
 from ast import literal_eval
 from os import environ
 from sys import stderr

+ 1 - 0
ddns/config/file.py

@@ -3,6 +3,7 @@
 Configuration file loader for DDNS. supports both JSON and AST parsing.
 @author: NewFuture
 """
+
 from ast import literal_eval
 from json import loads as json_decode, dumps as json_encode
 from sys import stderr, stdout

+ 0 - 1
ddns/ip.py

@@ -69,7 +69,6 @@ def public_v6(url="https://api-ipv6.ip.sb/ip", reg=IPV6_REG):  # 公网IPV6地
 
 
 def _ip_regex_match(parrent_regex, match_regex):
-
     ip_pattern = compile(parrent_regex)
     matcher = compile(match_regex)
 

+ 5 - 4
ddns/provider/alidns.py

@@ -5,9 +5,10 @@ AliDNS API
 @author: NewFuture
 """
 
-from ._base import TYPE_FORM, BaseProvider, join_domain, encode_params
+from time import gmtime, strftime, time
+
+from ._base import TYPE_FORM, BaseProvider, encode_params, join_domain
 from ._signature import hmac_sha256_authorization, sha256_hash
-from time import strftime, gmtime, time
 
 
 class AliBaseProvider(BaseProvider):
@@ -116,7 +117,7 @@ class AlidnsProvider(AliBaseProvider):
             TTL=ttl,
             Line=line,
             **extra
-        )
+        )  # fmt: skip
         if data and data.get("RecordId"):
             self.logger.info("Record created: %s", data)
             return True
@@ -143,7 +144,7 @@ class AlidnsProvider(AliBaseProvider):
             TTL=ttl,
             Line=line or old_record.get("Line"),
             **extra
-        )
+        )  # fmt: skip
         if data and data.get("RecordId"):
             self.logger.info("Record updated: %s", data)
             return True

+ 5 - 4
ddns/provider/aliesa.py

@@ -5,10 +5,11 @@ AliESA API
 @author: NewFuture, GitHub Copilot
 """
 
-from .alidns import AliBaseProvider
-from ._base import join_domain, TYPE_JSON
 from time import strftime
 
+from ._base import TYPE_JSON, join_domain
+from .alidns import AliBaseProvider
+
 
 class AliesaProvider(AliBaseProvider):
     """阿里云边缘安全加速(ESA) DNS Provider"""
@@ -82,7 +83,7 @@ class AliesaProvider(AliBaseProvider):
             Data={"Value": value},
             Ttl=ttl or 1,
             **extra
-        )
+        )  # fmt: skip
 
         if data and data.get("RecordId"):
             self.logger.info("Record created: %s", data)
@@ -115,7 +116,7 @@ class AliesaProvider(AliBaseProvider):
             Data={"Value": value},
             Ttl=ttl,
             **extra
-        )
+        )  # fmt: skip
 
         if data and data.get("RecordId"):
             self.logger.info("Record updated: %s", data)

+ 1 - 0
ddns/provider/callback.py

@@ -5,6 +5,7 @@ Custom Callback API
 
 @author: 老周部落, NewFuture
 """
+
 from ._base import TYPE_JSON, SimpleProvider
 from time import time
 from json import loads as jsondecode

+ 2 - 2
ddns/provider/cloudflare.py

@@ -4,7 +4,7 @@ CloudFlare API
 @author: TongYifan, NewFuture
 """
 
-from ._base import BaseProvider, TYPE_JSON, join_domain
+from ._base import TYPE_JSON, BaseProvider, join_domain
 
 
 class CloudflareProvider(BaseProvider):
@@ -92,7 +92,7 @@ class CloudflareProvider(BaseProvider):
             content=value,
             ttl=ttl,
             **extra
-        )
+        )  # fmt: skip
         self.logger.debug("Record updated: %s", data)
         if data:
             return True

+ 0 - 1
ddns/provider/debug.py

@@ -8,7 +8,6 @@ from ._base import SimpleProvider
 
 
 class DebugProvider(SimpleProvider):
-
     def _validate(self):
         """无需任何验证"""
         pass

+ 3 - 2
ddns/provider/dnscom.py

@@ -5,10 +5,11 @@ www.51dns.com (原dns.com)
 @author: Bigjin<[email protected]>, NewFuture
 """
 
-from ._base import BaseProvider, TYPE_FORM, encode_params
 from hashlib import md5
 from time import time
 
+from ._base import TYPE_FORM, BaseProvider, encode_params
+
 
 class DnscomProvider(BaseProvider):
     """
@@ -82,7 +83,7 @@ class DnscomProvider(BaseProvider):
             TTL=ttl,
             viewID=line,
             **extra
-        )
+        )  # fmt: skip
         if res and res.get("recordID"):
             self.logger.info("Record created: %s", res)
             return True

+ 1 - 0
ddns/provider/edgeone.py

@@ -5,6 +5,7 @@ Tencent Cloud EdgeOne API
 API Documentation: https://cloud.tencent.com/document/api/1552/80731
 @author: NewFuture
 """
+
 from ddns.provider._base import join_domain
 from .tencentcloud import TencentCloudProvider
 

+ 5 - 4
ddns/provider/huaweidns.py

@@ -5,9 +5,10 @@ HuaweiDNS API
 @author: NewFuture
 """
 
-from ._base import BaseProvider, TYPE_JSON, join_domain, encode_params
+from time import gmtime, strftime
+
+from ._base import TYPE_JSON, BaseProvider, encode_params, join_domain
 from ._signature import hmac_sha256_authorization, sha256_hash
-from time import strftime, gmtime
 
 
 class HuaweiDNSProvider(BaseProvider):
@@ -114,7 +115,7 @@ class HuaweiDNSProvider(BaseProvider):
             ttl=ttl,
             line=line,
             **extra
-        )
+        )  # fmt: skip
         if res and res.get("id"):
             self.logger.info("Record created: %s", res)
             return True
@@ -134,7 +135,7 @@ class HuaweiDNSProvider(BaseProvider):
             records=[value],
             ttl=ttl if ttl is not None else old_record.get("ttl"),
             **extra
-        )
+        )  # fmt: skip
         if res and res.get("id"):
             self.logger.info("Record updated: %s", res)
             return True

+ 9 - 10
ddns/provider/tencentcloud.py

@@ -5,9 +5,11 @@ Tencent Cloud DNSPod API
 
 @author: NewFuture
 """
-from ._base import BaseProvider, TYPE_JSON
-from ._signature import hmac_sha256_authorization, sha256_hash, hmac_sha256
-from time import time, strftime, gmtime
+
+from time import gmtime, strftime, time
+
+from ._base import TYPE_JSON, BaseProvider
+from ._signature import hmac_sha256, hmac_sha256_authorization, sha256_hash
 
 
 class TencentCloudProvider(BaseProvider):
@@ -45,10 +47,7 @@ class TencentCloudProvider(BaseProvider):
         body = self._encode_body(params)
 
         # 构建请求头,小写 腾讯云只签名特定头部
-        headers = {
-            "content-type": self.content_type,
-            "host": self.endpoint.split("://", 1)[1].strip("/"),
-        }
+        headers = {"content-type": self.content_type, "host": self.endpoint.split("://", 1)[1].strip("/")}
 
         # 腾讯云特殊的密钥派生过程
         date = strftime("%Y-%m-%d", gmtime())
@@ -132,7 +131,7 @@ class TencentCloudProvider(BaseProvider):
             RecordType=record_type,
             RecordLine=line,
             **extra
-        )
+        )  # fmt: skip
         if not response or "RecordList" not in response:
             self.logger.debug("No records found or query failed")
             return None
@@ -165,7 +164,7 @@ class TencentCloudProvider(BaseProvider):
             RecordLine=line or "默认",
             TTL=int(ttl) if ttl else None,
             **extra
-        )
+        )  # fmt: skip
         if response and "RecordId" in response:
             self.logger.info("Record created successfully with ID: %s", response["RecordId"])
             return True
@@ -186,7 +185,7 @@ class TencentCloudProvider(BaseProvider):
             Value=value,
             TTL=int(ttl) if ttl else None,
             **extra
-        )
+        )  # fmt: skip
         if response and "RecordId" in response:
             self.logger.info("Record updated successfully")
             return True

+ 5 - 5
ddns/scheduler/__init__.py

@@ -39,7 +39,7 @@ def get_scheduler(scheduler=None):
         NotImplementedError: If scheduler not available on current platform
     """
     # Auto-detect if not specified
-    if scheduler is None or scheduler == 'auto':
+    if scheduler is None or scheduler == "auto":
         system = platform.system().lower()
         if system == "windows":
             return SchtasksScheduler()
@@ -51,13 +51,13 @@ def get_scheduler(scheduler=None):
         elif system == "linux" and read_file_safely("/proc/1/comm", default="").strip() == "systemd":  # type: ignore
             return SystemdScheduler()
         return CronScheduler()  # Other Unix-like systems, use cron
-    elif scheduler == 'systemd':
+    elif scheduler == "systemd":
         return SystemdScheduler()
-    elif scheduler == 'cron':
+    elif scheduler == "cron":
         return CronScheduler()
-    elif scheduler == 'launchd' or scheduler == 'mac':
+    elif scheduler == "launchd" or scheduler == "mac":
         return LaunchdScheduler()
-    elif scheduler == 'schtasks' or scheduler == 'windows':
+    elif scheduler == "schtasks" or scheduler == "windows":
         return SchtasksScheduler()
     else:
         raise ValueError("Invalid scheduler: {}. ".format(scheduler))

+ 8 - 8
ddns/scheduler/cron.py

@@ -9,9 +9,9 @@ import subprocess
 import tempfile
 from datetime import datetime
 
-from ._base import BaseScheduler
-from ..util.fileio import write_file
 from .. import __version__ as version
+from ..util.fileio import write_file
+from ._base import BaseScheduler
 
 
 class CronScheduler(BaseScheduler):
@@ -21,11 +21,11 @@ class CronScheduler(BaseScheduler):
 
     KEY = "# DDNS:"
 
-    def _update_crontab(self, new_cron):
+    def _update_crontab(self, lines):  # type: (list[str]) -> bool
         """Update crontab with new content"""
         try:
-            temp_path = tempfile.mktemp(suffix='.cron')
-            write_file(temp_path, new_cron)
+            temp_path = tempfile.mktemp(suffix=".cron")
+            write_file(temp_path, u"\n".join(lines) + u"\n")  # fmt: skip
             subprocess.check_call(["crontab", temp_path])
             os.unlink(temp_path)
             return True
@@ -51,7 +51,7 @@ class CronScheduler(BaseScheduler):
             status["enabled"] = False
 
         cmd_groups = line.split(self.KEY, 1) if line else ["", ""]
-        parts = cmd_groups[0].strip(' #\t').split() if cmd_groups[0] else []
+        parts = cmd_groups[0].strip(" #\t").split() if cmd_groups[0] else []
         status["interval"] = int(parts[0][2:]) if len(parts) >= 5 and parts[0].startswith("*/") else None
         status["command"] = " ".join(parts[5:]) if len(parts) >= 6 else None
         status["description"] = cmd_groups[1].strip() if len(cmd_groups) > 1 else None
@@ -69,7 +69,7 @@ class CronScheduler(BaseScheduler):
         lines = [line for line in crontext.splitlines() if self.KEY not in line]
         lines.append(cron_entry)
 
-        if self._update_crontab(u"\n".join(lines) + u"\n"):
+        if self._update_crontab(lines):
             return True
         else:
             self.logger.error("Failed to install DDNS cron job")
@@ -105,7 +105,7 @@ class CronScheduler(BaseScheduler):
             else:
                 raise ValueError("Invalid action: {}".format(action))
 
-        if self._update_crontab(u"\n".join(modified_lines) + u"\n"):
+        if self._update_crontab(modified_lines):
             return True
         else:
             self.logger.error("Failed to %s DDNS cron job", action)

+ 30 - 16
ddns/scheduler/launchd.py

@@ -7,9 +7,10 @@ macOS launchd-based task scheduler
 import os
 import re
 from datetime import datetime
-from ._base import BaseScheduler
-from ..util.fileio import read_file_safely, write_file
+
 from .. import __version__ as version
+from ..util.fileio import read_file_safely, write_file
+from ._base import BaseScheduler
 
 
 class LaunchdScheduler(BaseScheduler):
@@ -59,24 +60,37 @@ class LaunchdScheduler(BaseScheduler):
         plist_path = self._get_plist_path()
         ddns_command = self._build_ddns_command(ddns_args)
         program_args = ddns_command.split()
-        program_args_xml = u"\n".join(u"        <string>{}</string>".format(arg.strip('"')) for arg in program_args)
+        program_args_xml = "\n".join("        <string>{}</string>".format(arg.strip('"')) for arg in program_args)
 
         # Create comment with version and install date (consistent with Windows)
         date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
         comment = "auto-update v{} installed on {}".format(version, date)
-
-        plist_content = (
-            u'<?xml version="1.0" encoding="UTF-8"?>\n'
-            + u'<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" '
-            + u'"http://www.apple.com/DTDs/PropertyList-1.0.dtd">\n'
-            + u'<plist version="1.0">\n<dict>\n'
-            + u"    <key>Label</key>\n    <string>{}</string>\n".format(self.LABEL)
-            + u"    <key>Description</key>\n    <string>{}</string>\n".format(comment)
-            + u"    <key>ProgramArguments</key>\n    <array>\n{}\n    </array>\n".format(program_args_xml)
-            + u"    <key>StartInterval</key>\n    <integer>{}</integer>\n".format(interval * 60)
-            + u"    <key>RunAtLoad</key>\n    <true/>\n"
-            + u"    <key>WorkingDirectory</key>\n    <string>{}</string>\n".format(os.getcwd())
-            + u"</dict>\n</plist>\n"
+        plist_content = """<?xml version="1.0" encoding="UTF-8"?>
+    <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
+    <plist version="1.0">
+    <dict>
+        <key>Label</key>
+        <string>{label}</string>
+        <key>Description</key>
+        <string>{comment}</string>
+        <key>ProgramArguments</key>
+        <array>
+    {program_args_xml}
+        </array>
+        <key>StartInterval</key>
+        <integer>{interval}</integer>
+        <key>RunAtLoad</key>
+        <true/>
+        <key>WorkingDirectory</key>
+        <string>{cwd}</string>
+    </dict>
+    </plist>
+    """.format(
+            label=self.LABEL,
+            comment=comment,
+            program_args_xml=program_args_xml,
+            interval=interval * 60,
+            cwd=os.getcwd(),
         )
 
         write_file(plist_path, plist_content)

+ 14 - 17
ddns/scheduler/schtasks.py

@@ -6,8 +6,9 @@ schtasks-based task scheduler
 
 import os
 import re
-from ._base import BaseScheduler
+
 from ..util.fileio import write_file
+from ._base import BaseScheduler
 
 # Constants
 VBS_SCRIPT = "~\\AppData\\Local\\DDNS\\ddns_silent.vbs"
@@ -29,11 +30,10 @@ class SchtasksScheduler(BaseScheduler):
         work_dir = os.getcwd()
 
         # Build VBS content with proper escaping
-        vbs_content = (
-            u'Set objShell = CreateObject("WScript.Shell")\n'
-            u'objShell.CurrentDirectory = "{}"\n'
-            u'objShell.Run "{}", 0, False\n'
-        ).format(work_dir.replace("\\", "\\\\"), ddns_command.replace('"', '""'))
+        vbs_content = """Set objShell = CreateObject("WScript.Shell")
+    objShell.CurrentDirectory = "{work_dir}"
+    objShell.Run "{ddns_command}", 0, False
+    """.format(work_dir=work_dir.replace("\\", "\\\\"), ddns_command=ddns_command.replace('"', '""'))
 
         # Try locations in order: AppData, then working directory
         vbs_paths = [os.path.expanduser(VBS_SCRIPT), os.path.join(work_dir, ".ddns_silent.vbs")]
@@ -49,7 +49,7 @@ class SchtasksScheduler(BaseScheduler):
 
     def _extract_xml(self, xml_text, tag_name):  # type: (str, str) -> str | None
         """Extract XML tag content using regex for better performance and flexibility"""
-        pattern = r'<{0}[^>]*>(.*?)</{0}>'.format(re.escape(tag_name))
+        pattern = r"<{0}[^>]*>(.*?)</{0}>".format(re.escape(tag_name))
         match = re.search(pattern, xml_text, re.DOTALL)
         return match.group(1).strip() if match else None
 
@@ -60,26 +60,23 @@ class SchtasksScheduler(BaseScheduler):
     def get_status(self):
         # Use XML format for language-independent parsing
         task_xml = self._run_command(["schtasks", "/query", "/tn", self.NAME, "/xml"])
-        status = {
-            "scheduler": "schtasks",
-            "installed": bool(task_xml),
-        }
+        status = {"scheduler": "schtasks", "installed": bool(task_xml)}
 
         if not task_xml:
             return status  # Task not installed, return minimal status
 
-        status['enabled'] = self._extract_xml(task_xml, 'Enabled') != 'false'
-        command = self._extract_xml(task_xml, 'Command')
-        arguments = self._extract_xml(task_xml, 'Arguments')
+        status["enabled"] = self._extract_xml(task_xml, "Enabled") != "false"
+        command = self._extract_xml(task_xml, "Command")
+        arguments = self._extract_xml(task_xml, "Arguments")
         status["command"] = "{} {}".format(command, arguments) if command and arguments else command
 
         # Parse interval: PT10M -> 10, fallback to original string
-        interval_str = self._extract_xml(task_xml, 'Interval')
-        interval_match = re.search(r'PT(\d+)M', interval_str) if interval_str else None
+        interval_str = self._extract_xml(task_xml, "Interval")
+        interval_match = re.search(r"PT(\d+)M", interval_str) if interval_str else None
         status["interval"] = int(interval_match.group(1)) if interval_match else interval_str
 
         # Show description if exists, otherwise show installation date
-        description = self._extract_xml(task_xml, 'Description') or self._extract_xml(task_xml, 'Date')
+        description = self._extract_xml(task_xml, "Description") or self._extract_xml(task_xml, "Date")
         if description:
             status["description"] = description
         return status

+ 3 - 10
ddns/scheduler/systemd.py

@@ -37,10 +37,7 @@ class SystemdScheduler(BaseScheduler):
     def get_status(self):
         """Get comprehensive status information"""
         installed = self.is_installed()
-        status = {
-            "scheduler": "systemd",
-            "installed": installed,
-        }
+        status = {"scheduler": "systemd", "installed": installed}
         if not installed:
             return status
 
@@ -77,9 +74,7 @@ After=network.target
 Type=oneshot
 WorkingDirectory={}
 ExecStart={}
-""".format(
-            version, date, work_dir, ddns_command
-        )
+""".format(version, date, work_dir, ddns_command)  # fmt: skip
 
         # Create timer file content
         timer_content = u"""[Unit]
@@ -92,9 +87,7 @@ Unit={}
 
 [Install]
 WantedBy=multi-user.target
-""".format(
-            self.SERVICE_NAME, interval, self.SERVICE_NAME
-        )
+""".format(self.SERVICE_NAME, interval, self.SERVICE_NAME)  # fmt: skip
 
         try:
             # Write service and timer files

+ 0 - 112
ddns/scheduler/windows.py

@@ -1,112 +0,0 @@
-# -*- coding:utf-8 -*-
-"""
-schtasks-based task scheduler
-@author: NewFuture
-"""
-
-import os
-import re
-import subprocess
-from ._base import BaseScheduler
-from ..util.fileio import write_file
-
-# Constants
-VBS_SCRIPT = "~\\AppData\\Local\\DDNS\\ddns_silent.vbs"
-
-
-class WindowsScheduler(BaseScheduler):
-    """schtasks-based task scheduler"""
-
-    NAME = "DDNS"
-
-    def _schtasks(self, *args):  # type: (*str) -> bool
-        """Helper to run schtasks commands with consistent error handling"""
-        subprocess.check_call(["schtasks"] + list(args))
-        return True
-
-    def _create_vbs_script(self, ddns_args=None):  # type: (dict | None) -> str
-        """Create VBS script for silent execution and return its path"""
-        ddns_command = self._build_ddns_command(ddns_args)
-        work_dir = os.getcwd()
-
-        # Build VBS content with proper escaping
-        vbs_content = (
-            u'Set objShell = CreateObject("WScript.Shell")\n'
-            u'objShell.CurrentDirectory = "{}"\n'
-            u'objShell.Run "{}", 0, False\n'
-        ).format(work_dir.replace("\\", "\\\\"), ddns_command.replace('"', '""'))
-
-        # Try locations in order: AppData, then working directory
-        vbs_paths = [os.path.expanduser(VBS_SCRIPT), os.path.join(work_dir, ".ddns_silent.vbs")]
-
-        for path in vbs_paths:
-            try:
-                write_file(path, vbs_content)
-                return path
-            except Exception as e:
-                self.logger.warning("Failed to create VBS in %s: %s", path, e)
-
-        raise Exception("Failed to create VBS script in any location")
-
-    def _extract_xml(self, xml_text, tag_name):  # type: (str, str) -> str | None
-        """Extract XML tag content using regex for better performance and flexibility"""
-        pattern = r'<{0}[^>]*>(.*?)</{0}>'.format(re.escape(tag_name))
-        match = re.search(pattern, xml_text, re.DOTALL)
-        return match.group(1).strip() if match else None
-
-    def is_installed(self):
-        result = self._run_command(["schtasks", "/query", "/tn", self.NAME]) or ""
-        return self.NAME in result
-
-    def get_status(self):
-        # Use XML format for language-independent parsing
-        task_xml = self._run_command(["schtasks", "/query", "/tn", self.NAME, "/xml"])
-        status = {
-            "scheduler": "schtasks",
-            "installed": bool(task_xml),
-        }
-
-        if not task_xml:
-            return status  # Task not installed, return minimal status
-
-        status['enabled'] = self._extract_xml(task_xml, 'Enabled') != 'false'
-        command = self._extract_xml(task_xml, 'Command')
-        arguments = self._extract_xml(task_xml, 'Arguments')
-        status["command"] = "{} {}".format(command, arguments) if command and arguments else command
-
-        # Parse interval: PT10M -> 10, fallback to original string
-        interval_str = self._extract_xml(task_xml, 'Interval')
-        interval_match = re.search(r'PT(\d+)M', interval_str) if interval_str else None
-        status["interval"] = int(interval_match.group(1)) if interval_match else interval_str
-
-        # Show description if exists, otherwise show installation date
-        description = self._extract_xml(task_xml, 'Description') or self._extract_xml(task_xml, 'Date')
-        if description:
-            status["description"] = description
-        return status
-
-    def install(self, interval, ddns_args=None):
-        vbs_path = self._create_vbs_script(ddns_args)
-        cmd = 'wscript.exe "{}"'.format(vbs_path)
-        return self._schtasks("/Create", "/SC", "MINUTE", "/MO", str(interval), "/TR", cmd, "/TN", self.NAME, "/F")
-
-    def uninstall(self):
-        success = self._schtasks("/Delete", "/TN", self.NAME, "/F")
-        if success:
-            # Clean up VBS script files
-            vbs_paths = [os.path.expanduser(VBS_SCRIPT), os.path.join(os.getcwd(), ".ddns_silent.vbs")]
-            for vbs_path in vbs_paths:
-                if os.path.exists(vbs_path):
-                    try:
-                        os.remove(vbs_path)
-                        self.logger.info("Cleaned up VBS script file: %s", vbs_path)
-                    except OSError:
-                        self.logger.info("fail to remove %s", vbs_path)
-                        pass  # Ignore cleanup failures
-        return success
-
-    def enable(self):
-        return self._schtasks("/Change", "/TN", self.NAME, "/Enable")
-
-    def disable(self):
-        return self._schtasks("/Change", "/TN", self.NAME, "/Disable")

+ 1 - 4
ddns/util/http.py

@@ -219,10 +219,7 @@ class AutoSSLHandler(HTTPSHandler):  # type: ignore[misc]
         try:
             return self._open(req)
         except OSError as e:  # SSL auto模式:处理本地证书错误
-            ssl_errors = (
-                "unable to get local issuer certificate",
-                "Basic Constraints of CA cert not marked critical",
-            )
+            ssl_errors = ("unable to get local issuer certificate", "Basic Constraints of CA cert not marked critical")
             if self._verify == "auto" and any(err in str(e) for err in ssl_errors):
                 logger.warning("SSL error (%s), switching to unverified connection", str(e))
                 self._verify = False  # 不验证SSL

+ 45 - 43
pyproject.toml

@@ -49,8 +49,7 @@ ddns = "ddns.__main__:main"
 # Optional dependencies
 [project.optional-dependencies]
 dev = [
-    "black",
-    "flake8",
+    "ruff",
     "mock;python_version<'3.3'",  # For Python 2.7 compatibility
 ]
 # 可选的 pytest 支持(不是默认测试框架)
@@ -93,26 +92,51 @@ addopts = ["-v", "--tb=short"]
 # 确保 pytest 可以找到 test_base 模块
 pythonpath = [".", "tests"]
 
-# 代码格式化配置
-[tool.black]
+# Ruff configuration - unified formatting and linting
+[tool.ruff]
+# Same line length as black was using
 line-length = 120
-skip-string-normalization = true
-# py34删除尾部空格兼容py2
-target-version = ['py34', 'py38', 'py311']
-include = '\.py$'
-extend-exclude = '''
-/(
-  # directories
-  \.eggs
-  | \.git
-  | \.hg
-  | \.mypy_cache
-  | \.tox
-  | \.venv
-  | build
-  | dist
-)/
-'''
+# Ruff minimum supported version is py37, but project supports py27+ 
+target-version = "py37"
+
+exclude = [
+    ".eggs",
+    ".git", 
+    ".hg",
+    ".mypy_cache",
+    ".tox",
+    ".venv",
+    "build",
+    "dist",
+    "__pycache__",
+    "*.egg-info",
+]
+
+[tool.ruff.lint]
+# Enable pycodestyle (E, W), pyflakes (F), and mccabe (C) - same as flake8 defaults
+# Deliberately exclude pyupgrade (UP) rules to maintain Python 2 compatibility
+# (UP rules would convert u"" strings to "" which breaks py2 compatibility)
+select = ["E", "W", "F", "C"]
+# Same ignores as flake8 configuration, but using ruff rule codes
+ignore = [
+    # "E203",  # whitespace before ':' - not needed in ruff, formatter handles this
+    "E501",  # line too long (handled by formatter)
+    "UP025", # unicode-kind-prefix (keep u"..." for Py2 compatibility)
+]
+# Same max complexity as flake8
+mccabe = { max-complexity = 12 }
+
+[tool.ruff.lint.per-file-ignores]
+# Allow unused imports and redefined names in tests (same as flake8)
+"tests/*" = ["F401", "F811"]
+
+[tool.ruff.format]
+line-ending = "auto"
+indent-style = "space"
+quote-style = "double"
+skip-magic-trailing-comma = true # py2
+docstring-code-format = true
+docstring-code-line-length = "dynamic"
 
 # 类型检查配置
 [tool.pyright]
@@ -154,25 +178,3 @@ exclude_lines = [
 ]
 show_missing = true
 precision = 2
-
-# Flake8 configuration
-[tool.flake8]
-max-line-length = 118
-extend-ignore = [
-    "E203",  # whitespace before ':'
-    "E501",  # line too long (handled by black)
-    "W503",  # line break before binary operator
-]
-exclude = [
-    ".git",
-    "__pycache__",
-    "build",
-    "dist",
-    ".eggs",
-    "*.egg-info",
-    ".tox",
-    ".venv",
-]
-per-file-ignores = [
-    "tests/*:F401,F811",  # Allow unused imports and redefined names in tests
-]

+ 1 - 0
tests/base_test.py

@@ -3,6 +3,7 @@
 Base test utilities and common imports for all provider tests
 @author: NewFuture
 """
+
 from __init__ import unittest, patch, MagicMock  # noqa: F401 # Ensure the package is initialized
 
 

+ 3 - 18
tests/test_config_cli.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.config.cli module
 @author: GitHub Copilot
 """
+
 from __init__ import unittest
 import sys
 import io
@@ -10,7 +11,6 @@ from ddns.config.cli import load_config, str_bool, log_level  # noqa: E402
 
 
 class TestCliConfig(unittest.TestCase):
-
     def setUp(self):
         encode = sys.stdout.encoding
         if encode is not None and encode.lower() != "utf-8" and hasattr(sys.stdout, "buffer"):
@@ -150,15 +150,7 @@ class TestCliConfig(unittest.TestCase):
         self.assertEqual(config["log_file"], "/var/log/ddns.log")
         self.assertEqual(config["log_format"], "%(asctime)s %(message)s")
 
-        sys.argv = [
-            "ddns",
-            "--log_level",
-            "INFO",
-            "--log_file",
-            "/tmp/ddns.log",
-            "--log_datefmt",
-            "%Y-%m-%d %H:%M:%S",
-        ]
+        sys.argv = ["ddns", "--log_level", "INFO", "--log_file", "/tmp/ddns.log", "--log_datefmt", "%Y-%m-%d %H:%M:%S"]
         config = load_config("Test DDNS", "Test doc", "1.0.0", "2025-07-04")
         self.assertEqual(config["log_level"], 20)
         self.assertEqual(config["log_file"], "/tmp/ddns.log")
@@ -186,7 +178,6 @@ class TestCliConfig(unittest.TestCase):
         self.assertEqual(action.nargs, "?")
 
     def test_load_config_other_flags(self):
-
         sys.argv = ["ddns", "--ttl", "300", "--line", "unicom"]
         config = load_config("Test DDNS", "Test doc", "1.0.0", "2025-07-04")
         self.assertEqual(config["ttl"], 300)
@@ -402,13 +393,7 @@ class TestCliConfig(unittest.TestCase):
         self.assertEqual(config["ssl"], "~/.ssl/cert.pem")
 
         # Test multiple configs
-        sys.argv = [
-            "ddns",
-            "--config",
-            "/path/to/config1.json",
-            "--config",
-            "/path/to/config2.json",
-        ]
+        sys.argv = ["ddns", "--config", "/path/to/config1.json", "--config", "/path/to/config2.json"]
         config = load_config("Test DDNS", "Test doc", "1.0.0", "2025-07-04")
         self.assertEqual(config["config"], ["/path/to/config1.json", "/path/to/config2.json"])
 

+ 1 - 0
tests/test_config_cli_task.py

@@ -3,6 +3,7 @@
 Unit tests for ddns task subcommand functionality
 @author: GitHub Copilot
 """
+
 from __init__ import unittest, patch
 import sys
 import io

+ 1 - 4
tests/test_config_config.py

@@ -331,10 +331,7 @@ class TestConfig(unittest.TestCase):
         normal_cases = [
             (
                 {"index4": "public,regex:192\\.168\\..*", "index6": "public,cmd:curl -s ipv6.icanhazip.com"},
-                {
-                    "index4": ["public", "regex:192\\.168\\..*"],
-                    "index6": ["public", "cmd:curl -s ipv6.icanhazip.com"],
-                },
+                {"index4": ["public", "regex:192\\.168\\..*"], "index6": ["public", "cmd:curl -s ipv6.icanhazip.com"]},
             ),
             (
                 {"index4": "public,default", "index6": "public;default"},

+ 3 - 13
tests/test_config_env.py

@@ -2,6 +2,7 @@
 """
 Configuration loader tests for environment variables.
 """
+
 import os
 import unittest
 from ddns.config.env import load_config
@@ -78,13 +79,7 @@ class TestConfigEnv(unittest.TestCase):
 
     def test_pythonhttpsverify_values(self):
         """Test PYTHONHTTPSVERIFY with different values"""
-        test_cases = [
-            ("0", "0"),
-            ("1", "1"),
-            ("false", "false"),
-            ("true", "true"),
-            ("anything", "anything"),
-        ]
+        test_cases = [("0", "0"), ("1", "1"), ("false", "false"), ("true", "true"), ("anything", "anything")]
 
         for env_value, expected in test_cases:
             os.environ["PYTHONHTTPSVERIFY"] = env_value
@@ -206,12 +201,7 @@ class TestConfigEnv(unittest.TestCase):
 
     def test_numeric_and_boolean_strings(self):
         """Test that numeric and boolean strings are preserved"""
-        test_cases = [
-            ("123", "123"),
-            ("true", "true"),
-            ("false", "false"),
-            ("3.14", "3.14"),
-        ]
+        test_cases = [("123", "123"), ("true", "true"), ("false", "false"), ("3.14", "3.14")]
 
         for value, expected in test_cases:
             os.environ["DDNS_TEST_VALUE"] = value

+ 1 - 0
tests/test_config_file.py

@@ -4,6 +4,7 @@
 Unit tests for ddns.config.file module
 @author: GitHub Copilot
 """
+
 from __future__ import unicode_literals
 from __init__ import unittest
 import tempfile

+ 3 - 12
tests/test_config_file_remote.py

@@ -4,6 +4,7 @@
 Unit tests for remote configuration loading in ddns.config.file module
 @author: GitHub Copilot
 """
+
 from __future__ import unicode_literals
 from __init__ import unittest, patch
 import tempfile
@@ -189,18 +190,8 @@ class TestRemoteConfigFile(unittest.TestCase):
             "ssl": "auto",
             "cache": True,
             "providers": [
-                {
-                    "provider": "cloudflare",
-                    "id": "[email protected]",
-                    "token": "token1",
-                    "ipv4": ["test1.example.com"],
-                },
-                {
-                    "provider": "dnspod",
-                    "id": "[email protected]",
-                    "token": "token2",
-                    "ipv4": ["test2.example.com"],
-                },
+                {"provider": "cloudflare", "id": "[email protected]", "token": "token1", "ipv4": ["test1.example.com"]},
+                {"provider": "dnspod", "id": "[email protected]", "token": "token2", "ipv4": ["test2.example.com"]},
             ],
         }
         mock_http.return_value = HttpResponse(200, "OK", {}, json.dumps(config_data))

+ 1 - 1
tests/test_config_init_multi.py

@@ -4,6 +4,7 @@
 Unit tests for multi-config functionality
 @author: GitHub Copilot
 """
+
 from __init__ import unittest, patch
 import tempfile
 import json
@@ -14,7 +15,6 @@ from ddns.config.file import load_config as load_file_config, _process_multi_pro
 
 
 class TestMultiConfig(unittest.TestCase):
-
     def setUp(self):
         self.temp_dir = tempfile.mkdtemp()
         self.original_argv = sys.argv[:]

+ 1 - 0
tests/test_config_schema_v4_1.py

@@ -4,6 +4,7 @@
 Integration test for all config formats including v4.1 providers
 @author: GitHub Copilot
 """
+
 from __future__ import unicode_literals
 from __init__ import unittest
 import tempfile

+ 2 - 2
tests/test_provider__signature.py

@@ -64,7 +64,7 @@ class TestHmacSha256Authorization(unittest.TestCase):
         body_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
 
         auth_header_template = (
-            "ACS3-HMAC-SHA256 Credential=YourAccessKeyId," "SignedHeaders={SignedHeaders},Signature={Signature}"
+            "ACS3-HMAC-SHA256 Credential=YourAccessKeyId,SignedHeaders={SignedHeaders},Signature={Signature}"
         )
         signing_string_template = "ACS3-HMAC-SHA256\n{HashedCanonicalRequest}"
 
@@ -103,7 +103,7 @@ class TestHmacSha256Authorization(unittest.TestCase):
         body_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
 
         auth_header_template = (
-            "SDK-HMAC-SHA256 Access=your_access_key_id, " "SignedHeaders={SignedHeaders}, Signature={Signature}"
+            "SDK-HMAC-SHA256 Access=your_access_key_id, SignedHeaders={SignedHeaders}, Signature={Signature}"
         )
         signing_string_template = "SDK-HMAC-SHA256\n20191115T033655Z\n{HashedCanonicalRequest}"
 

+ 3 - 20
tests/test_provider_alidns.py

@@ -126,11 +126,7 @@ class TestAlidnsProvider(BaseProviderTestCase):
 
         with patch.object(provider, "_http") as mock_http:
             mock_http.return_value = {
-                "DomainRecords": {
-                    "Record": [
-                        {"RR": "www", "RecordId": "123", "Value": "1.2.3.4", "Type": "A"},
-                    ]
-                }
+                "DomainRecords": {"Record": [{"RR": "www", "RecordId": "123", "Value": "1.2.3.4", "Type": "A"}]}
             }
 
             result = provider._query_record("example.com", "www", "example.com", "A", None, {})
@@ -299,14 +295,7 @@ class TestAlidnsProvider(BaseProviderTestCase):
         """Test _update_record method when no changes are detected"""
         provider = AlidnsProvider(self.authid, self.token)
 
-        old_record = {
-            "RecordId": "123456",
-            "RR": "www",
-            "Value": "1.2.3.4",
-            "Type": "A",
-            "TTL": 300,
-            "Line": "default",
-        }
+        old_record = {"RecordId": "123456", "RR": "www", "Value": "1.2.3.4", "Type": "A", "TTL": 300, "Line": "default"}
 
         with patch.object(provider, "_request") as mock_request:
             # Same value, type, and TTL should skip update
@@ -374,13 +363,7 @@ class TestAlidnsProvider(BaseProviderTestCase):
             result = provider._create_record("example.com", "www", "example.com", "1.2.3.4", "A", 300, "unicom", {})
 
             mock_request.assert_called_once_with(
-                "AddDomainRecord",
-                DomainName="example.com",
-                RR="www",
-                Value="1.2.3.4",
-                Type="A",
-                TTL=300,
-                Line="unicom",
+                "AddDomainRecord", DomainName="example.com", RR="www", Value="1.2.3.4", Type="A", TTL=300, Line="unicom"
             )
             self.assertTrue(result)
 

+ 4 - 27
tests/test_provider_aliesa.py

@@ -433,27 +433,9 @@ class TestAliesaProviderIntegration(BaseProviderTestCase):
         # Mock API response format
         api_response = {
             "Records": [
-                {
-                    "RecordId": "111",
-                    "RecordName": "www.example.com",
-                    "Type": "A",
-                    "Value": "192.168.1.1",
-                    "TTL": 300,
-                },
-                {
-                    "RecordId": "222",
-                    "RecordName": "www.example.com",
-                    "Type": "A",
-                    "Value": "192.168.1.2",
-                    "TTL": 600,
-                },
-                {
-                    "RecordId": "333",
-                    "RecordName": "mail.example.com",
-                    "Type": "A",
-                    "Value": "192.168.1.3",
-                    "TTL": 300,
-                },
+                {"RecordId": "111", "RecordName": "www.example.com", "Type": "A", "Value": "192.168.1.1", "TTL": 300},
+                {"RecordId": "222", "RecordName": "www.example.com", "Type": "A", "Value": "192.168.1.2", "TTL": 600},
+                {"RecordId": "333", "RecordName": "mail.example.com", "Type": "A", "Value": "192.168.1.3", "TTL": 300},
             ]
         }
 
@@ -505,12 +487,7 @@ class TestAliesaProviderAPIResponse(BaseProviderTestCase):
         self.assertTrue(":" in record_params["Value"])  # IPv6 format
 
         # Test CNAME record parameters
-        record_params = {
-            "RecordName": "alias.example.com",
-            "Type": "CNAME",
-            "Value": "target.example.com",
-            "TTL": 300,
-        }
+        record_params = {"RecordName": "alias.example.com", "Type": "CNAME", "Value": "target.example.com", "TTL": 300}
         self.assertEqual(record_params["Type"], "CNAME")
         self.assertTrue(record_params["Value"].endswith(".com"))
 

+ 3 - 7
tests/test_provider_cloudflare.py

@@ -5,7 +5,8 @@ Unit tests for CloudflareProvider
 @author: GitHub Copilot
 """
 
-from base_test import BaseProviderTestCase, unittest, patch
+from base_test import BaseProviderTestCase, patch, unittest
+
 from ddns.provider.cloudflare import CloudflareProvider
 
 
@@ -182,7 +183,6 @@ class TestCloudflareProvider(BaseProviderTestCase):
         with patch("ddns.provider.cloudflare.join_domain", autospec=True) as mock_join, patch.object(
             provider, "_request", autospec=True
         ) as mock_request:
-
             mock_join.return_value = "www.example.com"
             mock_request.return_value = [
                 {"id": "rec456", "name": "mail.example.com", "type": "A", "content": "5.6.7.8"}
@@ -199,7 +199,6 @@ class TestCloudflareProvider(BaseProviderTestCase):
         with patch("ddns.provider.cloudflare.join_domain") as mock_join, patch.object(
             provider, "_request"
         ) as mock_request:
-
             mock_join.return_value = "www.example.com"
             mock_request.return_value = []
 
@@ -212,7 +211,7 @@ class TestCloudflareProvider(BaseProviderTestCase):
                 type="A",
                 per_page=10000,
                 **{"name.exact": "www.example.com", "proxied": True}
-            )
+            )  # fmt: skip
 
     def test_create_record_success(self):
         """Test _create_record method with successful creation"""
@@ -221,7 +220,6 @@ class TestCloudflareProvider(BaseProviderTestCase):
         with patch("ddns.provider.cloudflare.join_domain", autospec=True) as mock_join, patch.object(
             provider, "_request"
         ) as mock_request:
-
             mock_join.return_value = "www.example.com"
             mock_request.return_value = {"id": "rec123", "name": "www.example.com"}
 
@@ -246,7 +244,6 @@ class TestCloudflareProvider(BaseProviderTestCase):
         with patch("ddns.provider.cloudflare.join_domain") as mock_join, patch.object(
             provider, "_request"
         ) as mock_request:
-
             mock_join.return_value = "www.example.com"
             mock_request.return_value = None  # API request failed
 
@@ -261,7 +258,6 @@ class TestCloudflareProvider(BaseProviderTestCase):
         with patch("ddns.provider.cloudflare.join_domain") as mock_join, patch.object(
             provider, "_request"
         ) as mock_request:
-
             mock_join.return_value = "www.example.com"
             mock_request.return_value = {"id": "rec123"}
 

+ 0 - 3
tests/test_provider_dnscom.py

@@ -76,7 +76,6 @@ class TestDnscomProvider(BaseProviderTestCase):
         provider = DnscomProvider(self.authid, self.token)
 
         with patch.object(provider, "_signature") as mock_signature, patch.object(provider, "_http") as mock_http:
-
             mock_signature.return_value = {"apiKey": self.authid, "hash": "test_hash"}
             mock_http.return_value = {"code": 0, "data": {"result": "success"}}
 
@@ -91,7 +90,6 @@ class TestDnscomProvider(BaseProviderTestCase):
         provider = DnscomProvider(self.authid, self.token)
 
         with patch.object(provider, "_signature") as mock_signature, patch.object(provider, "_http") as mock_http:
-
             mock_signature.return_value = {"apiKey": self.authid}
             mock_http.return_value = None
 
@@ -105,7 +103,6 @@ class TestDnscomProvider(BaseProviderTestCase):
         provider = DnscomProvider(self.authid, self.token)
 
         with patch.object(provider, "_signature") as mock_signature, patch.object(provider, "_http") as mock_http:
-
             mock_signature.return_value = {"apiKey": self.authid}
             mock_http.return_value = {"code": 1, "message": "Invalid API key"}
 

+ 10 - 10
tests/test_provider_edgeone.py

@@ -111,9 +111,7 @@ class TestEdgeOneProvider(BaseProviderTestCase):
         """Test acceleration domain query when domain not found"""
         mock_request.return_value = {"AccelerationDomains": []}
 
-        record = self.provider._query_record(
-            "zone-123456789", "www", "example.com", "A", None, {}
-        )  # type: dict # type: ignore
+        record = self.provider._query_record("zone-123456789", "www", "example.com", "A", None, {})  # type: dict # type: ignore
 
         self.assertIsNone(record)
 
@@ -131,9 +129,7 @@ class TestEdgeOneProvider(BaseProviderTestCase):
             ]
         }
 
-        record = self.provider._query_record(
-            "zone-123456789", "@", "example.com", "A", None, {}
-        )  # type: dict # type: ignore
+        record = self.provider._query_record("zone-123456789", "@", "example.com", "A", None, {})  # type: dict # type: ignore
 
         self.assertIsNotNone(record)
         self.assertEqual(record["DomainName"], "example.com")
@@ -266,7 +262,8 @@ class TestEdgeOneProvider(BaseProviderTestCase):
         mock_http.return_value = {"Response": {"ZoneId": "zone-123456", "RequestId": "test-request-id"}}
 
         result = self.provider._request(
-            "DescribeZones", Filters=[{"Name": "zone-name", "Values": ["example.com"]}]  # type: ignore[arg-type]
+            "DescribeZones",
+            Filters=[{"Name": "zone-name", "Values": ["example.com"]}],  # type: ignore[arg-type]
         )
 
         self.assertIsNotNone(result)
@@ -284,7 +281,8 @@ class TestEdgeOneProvider(BaseProviderTestCase):
         mock_http.return_value = {"Response": {"Error": {"Code": "InvalidParameter", "Message": "Invalid zone name"}}}
 
         result = self.provider._request(
-            "DescribeZones", Filters=[{"Name": "zone-name", "Values": ["invalid"]}]  # type: ignore[arg-type]
+            "DescribeZones",
+            Filters=[{"Name": "zone-name", "Values": ["invalid"]}],  # type: ignore[arg-type]
         )
 
         self.assertIsNone(result)
@@ -299,7 +297,8 @@ class TestEdgeOneProvider(BaseProviderTestCase):
         mock_http.return_value = {"UnexpectedField": "value"}
 
         result = self.provider._request(
-            "DescribeZones", Filters=[{"Name": "zone-name", "Values": ["example.com"]}]  # type: ignore[arg-type]
+            "DescribeZones",
+            Filters=[{"Name": "zone-name", "Values": ["example.com"]}],  # type: ignore[arg-type]
         )
 
         self.assertIsNone(result)
@@ -316,7 +315,8 @@ class TestEdgeOneProvider(BaseProviderTestCase):
         # The implementation doesn't catch exceptions, so it will propagate
         with self.assertRaises(Exception) as cm:
             self.provider._request(
-                "DescribeZones", Filters=[{"Name": "zone-name", "Values": ["example.com"]}]  # type: ignore[arg-type]
+                "DescribeZones",
+                Filters=[{"Name": "zone-name", "Values": ["example.com"]}],  # type: ignore[arg-type]
             )
 
         self.assertEqual(str(cm.exception), "Network error")

+ 1 - 7
tests/test_provider_namesilo.py

@@ -263,13 +263,7 @@ class TestNamesiloProvider(BaseProviderTestCase):
                 "reply": {
                     "code": "300",
                     "resource_record": [
-                        {
-                            "record_id": "12345",
-                            "host": "test",
-                            "type": "A",
-                            "value": "1.2.3.4",
-                            "ttl": "3600",
-                        }
+                        {"record_id": "12345", "host": "test", "type": "A", "value": "1.2.3.4", "ttl": "3600"}
                     ],
                 }
             },

+ 3 - 14
tests/test_provider_tencentcloud.py

@@ -64,10 +64,7 @@ class TestTencentCloudProvider(BaseProviderTestCase):
 
         mock_http.return_value = {
             "Response": {
-                "Error": {
-                    "Code": "InvalidParameterValue.DomainNotExists",
-                    "Message": "当前域名有误,请返回重新操作。",
-                }
+                "Error": {"Code": "InvalidParameterValue.DomainNotExists", "Message": "当前域名有误,请返回重新操作。"}
             }
         }
 
@@ -114,9 +111,7 @@ class TestTencentCloudProvider(BaseProviderTestCase):
         """Test record query when record not found"""
         mock_http.return_value = {"Response": {"RecordList": []}}
 
-        record = self.provider._query_record(
-            "12345678", "www", "example.com", "A", None, {}
-        )  # type: dict # type: ignore
+        record = self.provider._query_record("12345678", "www", "example.com", "A", None, {})  # type: dict # type: ignore
 
         self.assertIsNone(record)
 
@@ -366,13 +361,7 @@ class TestTencentCloudProvider(BaseProviderTestCase):
 
     def test_update_record_with_line(self):
         """Test _update_record method with line parameter"""
-        old_record = {
-            "RecordId": 123456,
-            "Name": "www",
-            "Line": "默认",
-            "Domain": "example.com",
-            "DomainId": 12345678,
-        }
+        old_record = {"RecordId": 123456, "Name": "www", "Line": "默认", "Domain": "example.com", "DomainId": 12345678}
 
         with patch.object(self.provider, "_request") as mock_request:
             mock_request.return_value = {"RecordId": 123456}

+ 3 - 2
tests/test_scheduler_base.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.scheduler._base module
 @author: NewFuture
 """
+
 from __init__ import unittest, patch
 from ddns.scheduler._base import BaseScheduler
 
@@ -44,7 +45,7 @@ class TestBaseScheduler(unittest.TestCase):
 
     def test_abstract_methods_exist(self):
         """Test that all abstract methods are implemented"""
-        required_methods = ['get_status', 'is_installed', 'install', 'uninstall', 'enable', 'disable']
+        required_methods = ["get_status", "is_installed", "install", "uninstall", "enable", "disable"]
 
         for method_name in required_methods:
             self.assertTrue(hasattr(self.scheduler, method_name))
@@ -145,7 +146,7 @@ class TestBaseScheduler(unittest.TestCase):
         self.assertNotIn("--ipv6", command)
         self.assertNotIn("--config", command)
 
-    @patch('sys.executable', '/usr/bin/python3.9')
+    @patch("sys.executable", "/usr/bin/python3.9")
     def test_build_ddns_command_uses_current_python(self):
         """Test _build_ddns_command uses current Python executable"""
         ddns_args = {"dns": "debug", "ipv4": ["test.com"]}

+ 58 - 50
tests/test_scheduler_cron.py

@@ -3,8 +3,11 @@
 Unit tests for ddns.scheduler.cron module
 @author: NewFuture
 """
+
 import platform
-from __init__ import unittest, patch
+
+from __init__ import patch, unittest
+
 from ddns.scheduler.cron import CronScheduler
 
 
@@ -22,57 +25,58 @@ class TestCronScheduler(unittest.TestCase):
         mock_datetime.now.return_value.strftime.return_value = "2025-08-01 14:30:00"
 
         # Mock the methods to avoid actual system calls
-        with patch.object(self.scheduler, '_run_command') as mock_run:
-            with patch.object(self.scheduler, '_update_crontab') as mock_update:
-                with patch.object(self.scheduler, '_build_ddns_command') as mock_build:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
+            with patch.object(self.scheduler, "_update_crontab") as mock_update:
+                with patch.object(self.scheduler, "_build_ddns_command") as mock_build:
                     mock_run.return_value = ""
                     mock_update.return_value = True
                     mock_build.return_value = "python3 -m ddns -c test.json"
 
-                    result = self.scheduler.install(5, {'config': ['test.json']})
+                    result = self.scheduler.install(5, {"config": ["test.json"]})
 
                     self.assertTrue(result)
                     mock_update.assert_called_once()
 
                     # Verify the cron entry contains version and date
                     call_args = mock_update.call_args[0][0]
-                    self.assertIn("# DDNS: auto-update vtest-version installed on 2025-08-01 14:30:00", call_args)
+                    cron_entry = u"\n".join(call_args)  # fmt: skip
+                    self.assertIn("# DDNS: auto-update vtest-version installed on 2025-08-01 14:30:00", cron_entry)
 
     def test_get_status_extracts_comments(self):
         """Test get_status method extracts comments from cron entry"""
         cron_entry = (
             '*/10 * * * * cd "/home/user" && python3 -m ddns -c test.json '
-            '# DDNS: auto-update v4.0 installed on 2025-08-01 14:30:00'
+            "# DDNS: auto-update v4.0 installed on 2025-08-01 14:30:00"
         )
 
-        with patch.object(self.scheduler, '_run_command') as mock_run:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
 
             def mock_command(cmd):
-                if cmd == ['crontab', '-l']:
+                if cmd == ["crontab", "-l"]:
                     return cron_entry
-                elif cmd == ['pgrep', '-f', 'cron']:
-                    return '12345'
+                elif cmd == ["pgrep", "-f", "cron"]:
+                    return "12345"
                 return None
 
             mock_run.side_effect = mock_command
 
             status = self.scheduler.get_status()
 
-            self.assertEqual(status['scheduler'], 'cron')
-            self.assertTrue(status['enabled'])
-            self.assertEqual(status['interval'], 10)
-            self.assertEqual(status['description'], 'auto-update v4.0 installed on 2025-08-01 14:30:00')
+            self.assertEqual(status["scheduler"], "cron")
+            self.assertTrue(status["enabled"])
+            self.assertEqual(status["interval"], 10)
+            self.assertEqual(status["description"], "auto-update v4.0 installed on 2025-08-01 14:30:00")
 
     def test_get_status_handles_missing_comment_info(self):
         """Test get_status handles cron entries without full comment info gracefully"""
         cron_entry = '*/5 * * * * cd "/home/user" && python3 -m ddns -c test.json # DDNS: auto-update'
 
-        with patch.object(self.scheduler, '_run_command') as mock_run:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
 
             def mock_command(cmd):
-                if cmd == ['crontab', '-l']:
+                if cmd == ["crontab", "-l"]:
                     return cron_entry
-                elif cmd == ['pgrep', '-f', 'cron']:
+                elif cmd == ["pgrep", "-f", "cron"]:
                     return None
                 return None
 
@@ -80,41 +84,42 @@ class TestCronScheduler(unittest.TestCase):
 
             status = self.scheduler.get_status()
 
-            self.assertEqual(status['scheduler'], 'cron')
-            self.assertTrue(status['enabled'])
-            self.assertEqual(status['interval'], 5)
-            self.assertEqual(status['description'], 'auto-update')
+            self.assertEqual(status["scheduler"], "cron")
+            self.assertTrue(status["enabled"])
+            self.assertEqual(status["interval"], 5)
+            self.assertEqual(status["description"], "auto-update")
 
     def test_version_in_cron_entry(self):
         """Test that install method includes version in cron entry"""
         with patch("ddns.scheduler.cron.datetime") as mock_datetime:
             mock_datetime.now.return_value.strftime.return_value = "2025-08-01 14:30:00"
 
-            with patch.object(self.scheduler, '_run_command') as mock_run:
-                with patch.object(self.scheduler, '_update_crontab') as mock_update:
-                    with patch.object(self.scheduler, '_build_ddns_command') as mock_build:
+            with patch.object(self.scheduler, "_run_command") as mock_run:
+                with patch.object(self.scheduler, "_update_crontab") as mock_update:
+                    with patch.object(self.scheduler, "_build_ddns_command") as mock_build:
                         mock_run.return_value = ""
                         mock_update.return_value = True
                         mock_build.return_value = "python3 -m ddns"
 
                         # Test that version is included in cron entry
-                        with patch('ddns.scheduler.cron.version', 'test-version'):
+                        with patch("ddns.scheduler.cron.version", "test-version"):
                             result = self.scheduler.install(10)
 
                             self.assertTrue(result)
                             call_args = mock_update.call_args[0][0]
-                            self.assertIn("vtest-version", call_args)  # Should include the version
+                            cron_entry = u"\n".join(call_args)  # fmt: skip
+                            self.assertIn("vtest-version", cron_entry)  # Should include the version
 
     def test_get_status_with_no_comment(self):
         """Test get_status handles cron entries with no DDNS comment"""
         cron_entry = '*/15 * * * * cd "/home/user" && python3 -m ddns -c test.json'
 
-        with patch.object(self.scheduler, '_run_command') as mock_run:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
 
             def mock_command(cmd):
-                if cmd == ['crontab', '-l']:
+                if cmd == ["crontab", "-l"]:
                     return cron_entry
-                elif cmd == ['pgrep', '-f', 'cron']:
+                elif cmd == ["pgrep", "-f", "cron"]:
                     return None
                 return None
 
@@ -122,19 +127,19 @@ class TestCronScheduler(unittest.TestCase):
 
             status = self.scheduler.get_status()
 
-            self.assertEqual(status['scheduler'], 'cron')
-            self.assertEqual(status['enabled'], False)  # False when no DDNS line found
+            self.assertEqual(status["scheduler"], "cron")
+            self.assertEqual(status["enabled"], False)  # False when no DDNS line found
             # When no DDNS line is found, the method still tries to parse the empty line
             # This results in None values for interval, command, and empty string for comments
-            self.assertIsNone(status.get('interval'))
-            self.assertIsNone(status.get('command'))
-            self.assertEqual(status.get('description'), '')
+            self.assertIsNone(status.get("interval"))
+            self.assertIsNone(status.get("command"))
+            self.assertEqual(status.get("description"), "")
 
     def test_modify_cron_lines_enable_disable(self):
         """Test _modify_cron_lines method for enable and disable operations"""
         # Test enable operation on commented line
-        with patch.object(self.scheduler, '_run_command') as mock_run:
-            with patch.object(self.scheduler, '_update_crontab') as mock_update:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
+            with patch.object(self.scheduler, "_update_crontab") as mock_update:
                 mock_run.return_value = "# */5 * * * * cd /path && python3 -m ddns # DDNS: auto-update"
                 mock_update.return_value = True
 
@@ -142,11 +147,12 @@ class TestCronScheduler(unittest.TestCase):
                 self.assertTrue(result)
                 mock_update.assert_called_once()
                 call_args = mock_update.call_args[0][0]
-                self.assertIn("*/5 * * * * cd /path && python3 -m ddns # DDNS: auto-update", call_args)
+                cron_entry = u"\n".join(call_args)  # fmt: skip
+                self.assertIn("*/5 * * * * cd /path && python3 -m ddns # DDNS: auto-update", cron_entry)
 
         # Test disable operation on active line
-        with patch.object(self.scheduler, '_run_command') as mock_run:
-            with patch.object(self.scheduler, '_update_crontab') as mock_update:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
+            with patch.object(self.scheduler, "_update_crontab") as mock_update:
                 mock_run.return_value = "*/5 * * * * cd /path && python3 -m ddns # DDNS: auto-update"
                 mock_update.return_value = True
 
@@ -154,12 +160,13 @@ class TestCronScheduler(unittest.TestCase):
                 self.assertTrue(result)
                 mock_update.assert_called_once()
                 call_args = mock_update.call_args[0][0]
-                self.assertIn("# */5 * * * * cd /path && python3 -m ddns # DDNS: auto-update", call_args)
+                cron_entry = u"\n".join(call_args)  # fmt: skip
+                self.assertIn("# */5 * * * * cd /path && python3 -m ddns # DDNS: auto-update", cron_entry)
 
     def test_modify_cron_lines_uninstall(self):
         """Test _modify_cron_lines method for uninstall operation"""
-        with patch.object(self.scheduler, '_run_command') as mock_run:
-            with patch.object(self.scheduler, '_update_crontab') as mock_update:
+        with patch.object(self.scheduler, "_run_command") as mock_run:
+            with patch.object(self.scheduler, "_update_crontab") as mock_update:
                 mock_run.return_value = "*/5 * * * * cd /path && python3 -m ddns # DDNS: auto-update\nother cron job"
                 mock_update.return_value = True
 
@@ -167,8 +174,9 @@ class TestCronScheduler(unittest.TestCase):
                 self.assertTrue(result)
                 mock_update.assert_called_once()
                 call_args = mock_update.call_args[0][0]
-                self.assertNotIn("DDNS", call_args)
-                self.assertIn("other cron job", call_args)
+                cron_entry = u"\n".join(call_args)  # fmt: skip
+                self.assertNotIn("DDNS", cron_entry)
+                self.assertIn("other cron job", cron_entry)
 
     @unittest.skipIf(platform.system().lower() == "windows", "Unix/Linux/macOS-specific test")
     def test_real_cron_integration(self):
@@ -289,8 +297,8 @@ class TestCronScheduler(unittest.TestCase):
                 self.assertIn("*/5", crontab_content, "Crontab should contain correct interval")
 
             # Validate cron entry format
-            lines = crontab_content.strip().split('\n') if crontab_content else []
-            ddns_lines = [line for line in lines if 'DDNS' in line and not line.strip().startswith('#')]
+            lines = crontab_content.strip().split("\n") if crontab_content else []
+            ddns_lines = [line for line in lines if "DDNS" in line and not line.strip().startswith("#")]
             self.assertTrue(len(ddns_lines) > 0, "Should have active DDNS cron entry")
 
             ddns_line = ddns_lines[0]
@@ -312,9 +320,9 @@ class TestCronScheduler(unittest.TestCase):
             # Verify cron entry is commented out
             disabled_crontab = self.scheduler._run_command(["crontab", "-l"])
             if disabled_crontab:
-                disabled_lines = [line for line in disabled_crontab.split('\n') if 'DDNS' in line]
+                disabled_lines = [line for line in disabled_crontab.split("\n") if "DDNS" in line]
                 self.assertTrue(
-                    all(line.strip().startswith('#') for line in disabled_lines),
+                    all(line.strip().startswith("#") for line in disabled_lines),
                     "All DDNS lines should be commented when disabled",
                 )
 
@@ -348,5 +356,5 @@ class TestCronScheduler(unittest.TestCase):
             self._cleanup_real_cron_test()
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main()

+ 9 - 8
tests/test_scheduler_init.py

@@ -3,6 +3,7 @@
 Test scheduler initialization and real functionality
 @author: NewFuture
 """
+
 import os
 import platform
 import subprocess
@@ -85,7 +86,7 @@ class TestSchedulerRealFunctionality(unittest.TestCase):
                 # Check if systemd is running and we have systemctl
                 return (
                     self.current_system == "linux"
-                    and os.path.exists('/proc/1/comm')
+                    and os.path.exists("/proc/1/comm")
                     and self._is_command_available("systemctl")
                 )
             elif scheduler_name == "cron":
@@ -227,7 +228,7 @@ class TestSchedulerRealFunctionality(unittest.TestCase):
 
     def test_scheduler_methods_exist(self):
         """Test that required scheduler methods exist and are callable"""
-        required_methods = ['get_status', 'is_installed', 'install', 'uninstall', 'enable', 'disable']
+        required_methods = ["get_status", "is_installed", "install", "uninstall", "enable", "disable"]
 
         for method_name in required_methods:
             try:
@@ -261,18 +262,18 @@ class TestSchedulerRealFunctionality(unittest.TestCase):
 
         # Test basic properties
         self.assertIsNotNone(current_scheduler)
-        self.assertTrue(hasattr(current_scheduler, 'get_status'))
-        self.assertTrue(hasattr(current_scheduler, 'is_installed'))
+        self.assertTrue(hasattr(current_scheduler, "get_status"))
+        self.assertTrue(hasattr(current_scheduler, "is_installed"))
 
         # Test status method returns valid structure
         status = current_scheduler.get_status()
         self.assertIsInstance(status, dict)
-        self.assertIn('scheduler', status)
-        self.assertIn('installed', status)
+        self.assertIn("scheduler", status)
+        self.assertIn("installed", status)
 
         # Additional keys are only present when installed
-        if status.get('installed', False):
-            self.assertIn('enabled', status)
+        if status.get("installed", False):
+            self.assertIn("enabled", status)
 
         # Test is_installed returns boolean
         installed = current_scheduler.is_installed()

+ 28 - 28
tests/test_scheduler_launchd.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.scheduler.launchd module
 @author: NewFuture
 """
+
 import os
 import platform
 import sys
@@ -11,11 +12,11 @@ from ddns.scheduler.launchd import LaunchdScheduler
 
 # Handle builtins import for Python 2/3 compatibility
 if sys.version_info[0] >= 3:
-    builtins_module = 'builtins'
+    builtins_module = "builtins"
     permission_error = PermissionError
 else:
     # Python 2
-    builtins_module = '__builtin__'
+    builtins_module = "__builtin__"
     permission_error = OSError
 
 
@@ -39,7 +40,7 @@ class TestLaunchdScheduler(unittest.TestCase):
     def test_get_status_loaded_enabled(self):
         """Test get_status when service is loaded and enabled"""
         # Mock plist file exists and has content
-        plist_content = '''<?xml version="1.0" encoding="UTF-8"?>
+        plist_content = """<?xml version="1.0" encoding="UTF-8"?>
 <plist version="1.0">
 <dict>
     <key>Label</key>
@@ -47,14 +48,13 @@ class TestLaunchdScheduler(unittest.TestCase):
     <key>StartInterval</key>
     <integer>300</integer>
 </dict>
-</plist>'''
-
-        with patch('os.path.exists', return_value=True), patch(
-            'ddns.scheduler.launchd.read_file_safely', return_value=plist_content
-        ), patch.object(self.scheduler, '_run_command') as mock_run_command:
+</plist>"""
 
+        with patch("os.path.exists", return_value=True), patch(
+            "ddns.scheduler.launchd.read_file_safely", return_value=plist_content
+        ), patch.object(self.scheduler, "_run_command") as mock_run_command:
             # Mock _run_command to return service is loaded
-            mock_run_command.return_value = '123\t0\tcc.newfuture.ddns'
+            mock_run_command.return_value = "123\t0\tcc.newfuture.ddns"
 
             status = self.scheduler.get_status()
 
@@ -72,8 +72,8 @@ class TestLaunchdScheduler(unittest.TestCase):
     def test_get_status_not_loaded(self):
         """Test get_status when service is not loaded"""
         # Mock plist file doesn't exist
-        with patch('os.path.exists', return_value=False), patch(
-            'ddns.scheduler.launchd.read_file_safely', return_value=None
+        with patch("os.path.exists", return_value=False), patch(
+            "ddns.scheduler.launchd.read_file_safely", return_value=None
         ):
             status = self.scheduler.get_status()
 
@@ -84,7 +84,7 @@ class TestLaunchdScheduler(unittest.TestCase):
             self.assertNotIn("enabled", status)
             self.assertNotIn("interval", status)
 
-    @patch('os.path.exists')
+    @patch("os.path.exists")
     def test_is_installed_true(self, mock_exists):
         """Test is_installed returns True when plist file exists"""
         mock_exists.return_value = True
@@ -92,7 +92,7 @@ class TestLaunchdScheduler(unittest.TestCase):
         result = self.scheduler.is_installed()
         self.assertTrue(result)
 
-    @patch('os.path.exists')
+    @patch("os.path.exists")
     def test_is_installed_false(self, mock_exists):
         """Test is_installed returns False when plist file doesn't exist"""
         mock_exists.return_value = False
@@ -100,12 +100,12 @@ class TestLaunchdScheduler(unittest.TestCase):
         result = self.scheduler.is_installed()
         self.assertFalse(result)
 
-    @patch('ddns.scheduler.launchd.write_file')
+    @patch("ddns.scheduler.launchd.write_file")
     def test_install_with_sudo_fallback(self, mock_write_file):
         """Test install with sudo fallback for permission issues"""
         mock_write_file.return_value = None  # write_file succeeds
 
-        with patch.object(self.scheduler, '_run_command', return_value="loaded successfully"):
+        with patch.object(self.scheduler, "_run_command", return_value="loaded successfully"):
             ddns_args = {"dns": "debug", "ipv4": ["test.com"]}
             result = self.scheduler.install(5, ddns_args)
             self.assertTrue(result)
@@ -113,30 +113,30 @@ class TestLaunchdScheduler(unittest.TestCase):
     @unittest.skipUnless(platform.system().lower() == "darwin", "macOS-specific test")
     def test_launchctl_with_sudo_retry(self):
         """Test launchctl command with automatic sudo retry on permission error"""
-        with patch.object(self.scheduler, '_run_command') as mock_run_cmd:
+        with patch.object(self.scheduler, "_run_command") as mock_run_cmd:
             # Test that launchctl operations use _run_command directly
             mock_run_cmd.return_value = "success"
 
             # Test enable which uses launchctl load
             plist_path = self.scheduler._get_plist_path()
-            with patch('os.path.exists', return_value=True):
+            with patch("os.path.exists", return_value=True):
                 result = self.scheduler.enable()
                 self.assertTrue(result)
                 mock_run_cmd.assert_called_with(["launchctl", "load", plist_path])
 
-    @patch('os.path.exists')
-    @patch('os.remove')
+    @patch("os.path.exists")
+    @patch("os.remove")
     def test_uninstall_success(self, mock_remove, mock_exists):
         """Test successful uninstall"""
         mock_exists.return_value = True
 
-        with patch.object(self.scheduler, '_run_command', return_value="unloaded successfully"):
+        with patch.object(self.scheduler, "_run_command", return_value="unloaded successfully"):
             result = self.scheduler.uninstall()
             self.assertTrue(result)
             mock_remove.assert_called_once()
 
-    @patch('os.path.exists')
-    @patch('os.remove')
+    @patch("os.path.exists")
+    @patch("os.remove")
     def test_uninstall_with_permission_handling(self, mock_remove, mock_exists):
         """Test uninstall handles permission errors gracefully"""
         mock_exists.return_value = True
@@ -144,7 +144,7 @@ class TestLaunchdScheduler(unittest.TestCase):
         # Mock file removal failure - use appropriate error type for Python version
         mock_remove.side_effect = permission_error("Permission denied")
 
-        with patch.object(self.scheduler, '_run_command', return_value="") as mock_run:
+        with patch.object(self.scheduler, "_run_command", return_value="") as mock_run:
             result = self.scheduler.uninstall()
 
             # Should handle permission error gracefully and still return True
@@ -156,14 +156,14 @@ class TestLaunchdScheduler(unittest.TestCase):
 
     def test_enable_success(self):
         """Test successful enable"""
-        with patch.object(self.scheduler, '_run_command', return_value="loaded successfully"):
-            with patch('os.path.exists', return_value=True):
+        with patch.object(self.scheduler, "_run_command", return_value="loaded successfully"):
+            with patch("os.path.exists", return_value=True):
                 result = self.scheduler.enable()
                 self.assertTrue(result)
 
     def test_disable_success(self):
         """Test successful disable"""
-        with patch.object(self.scheduler, '_run_command', return_value="unloaded successfully"):
+        with patch.object(self.scheduler, "_run_command", return_value="unloaded successfully"):
             result = self.scheduler.disable()
             self.assertTrue(result)
 
@@ -388,7 +388,7 @@ class TestLaunchdScheduler(unittest.TestCase):
             self.assertTrue(os.access(plist_path, os.R_OK), "Plist file should be readable")
 
             # Validate plist content
-            with open(plist_path, 'r') as f:
+            with open(plist_path, "r") as f:
                 content = f.read()
             self.assertIn(test_service_label, content, "Plist should contain correct service label")
             self.assertIn("StartInterval", content, "Plist should contain StartInterval")
@@ -421,7 +421,7 @@ class TestLaunchdScheduler(unittest.TestCase):
                 # Test file creation/removal
                 test_file = os.path.join(agents_dir, "test_write_access.tmp")
                 try:
-                    with open(test_file, 'w') as f:
+                    with open(test_file, "w") as f:
                         f.write("test")
                     self.assertTrue(os.path.exists(test_file), "Should be able to create test file")
                     os.remove(test_file)

+ 11 - 13
tests/test_scheduler_schtasks.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.scheduler.schtasks module
 @author: NewFuture
 """
+
 import platform
 from __init__ import unittest, patch
 from ddns.scheduler.schtasks import SchtasksScheduler
@@ -20,7 +21,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         expected_name = "DDNS"
         self.assertEqual(self.scheduler.NAME, expected_name)
 
-    @patch.object(SchtasksScheduler, '_run_command')
+    @patch.object(SchtasksScheduler, "_run_command")
     def test_get_status_installed_enabled(self, mock_run_command):
         """Test get_status when task is installed and enabled"""
         # Mock XML output from schtasks query
@@ -61,7 +62,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         self.assertEqual(status["enabled"], expected_status["enabled"])
         self.assertEqual(status["interval"], expected_status["interval"])
 
-    @patch.object(SchtasksScheduler, '_run_command')
+    @patch.object(SchtasksScheduler, "_run_command")
     def test_get_status_not_installed(self, mock_run_command):
         """Test get_status when task is not installed"""
         # Mock _run_command to return None (task not found)
@@ -69,17 +70,14 @@ class TestSchtasksScheduler(unittest.TestCase):
 
         status = self.scheduler.get_status()
 
-        expected_status = {
-            "scheduler": "schtasks",
-            "installed": False,
-        }
+        expected_status = {"scheduler": "schtasks", "installed": False}
 
         # Check main fields - only scheduler and installed are included when task not found
         self.assertEqual(status["scheduler"], expected_status["scheduler"])
         self.assertEqual(status["installed"], expected_status["installed"])
         # When task is not installed, enabled and interval are not included in status
 
-    @patch.object(SchtasksScheduler, '_run_command')
+    @patch.object(SchtasksScheduler, "_run_command")
     def test_is_installed_true(self, mock_run_command):
         """Test is_installed returns True when task exists"""
         mock_run_command.return_value = "TaskName: DDNS\nStatus: Ready"
@@ -87,7 +85,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         result = self.scheduler.is_installed()
         self.assertTrue(result)
 
-    @patch.object(SchtasksScheduler, '_run_command')
+    @patch.object(SchtasksScheduler, "_run_command")
     def test_is_installed_false(self, mock_run_command):
         """Test is_installed returns False when task doesn't exist"""
         mock_run_command.return_value = None
@@ -95,8 +93,8 @@ class TestSchtasksScheduler(unittest.TestCase):
         result = self.scheduler.is_installed()
         self.assertFalse(result)
 
-    @patch.object(SchtasksScheduler, '_schtasks')
-    @patch.object(SchtasksScheduler, '_create_vbs_script')
+    @patch.object(SchtasksScheduler, "_schtasks")
+    @patch.object(SchtasksScheduler, "_create_vbs_script")
     def test_install_success(self, mock_vbs, mock_schtasks):
         """Test successful installation"""
         mock_vbs.return_value = "test.vbs"
@@ -108,7 +106,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         mock_schtasks.assert_called_once()
         mock_vbs.assert_called_once()
 
-    @patch.object(SchtasksScheduler, '_schtasks')
+    @patch.object(SchtasksScheduler, "_schtasks")
     def test_uninstall_success(self, mock_schtasks):
         """Test successful uninstall"""
         mock_schtasks.return_value = True
@@ -117,7 +115,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         self.assertTrue(result)
         mock_schtasks.assert_called_once()
 
-    @patch.object(SchtasksScheduler, '_schtasks')
+    @patch.object(SchtasksScheduler, "_schtasks")
     def test_enable_success(self, mock_schtasks):
         """Test successful enable"""
         mock_schtasks.return_value = True
@@ -126,7 +124,7 @@ class TestSchtasksScheduler(unittest.TestCase):
         self.assertTrue(result)
         mock_schtasks.assert_called_once()
 
-    @patch.object(SchtasksScheduler, '_schtasks')
+    @patch.object(SchtasksScheduler, "_schtasks")
     def test_disable_success(self, mock_schtasks):
         """Test successful disable"""
         mock_schtasks.return_value = True

+ 12 - 11
tests/test_scheduler_systemd.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.scheduler.systemd module
 @author: NewFuture
 """
+
 import os
 import platform
 from __init__ import unittest, patch
@@ -24,23 +25,23 @@ class TestSystemdScheduler(unittest.TestCase):
         """Test timer name constant"""
         self.assertEqual(self.scheduler.TIMER_NAME, "ddns.timer")
 
-    @patch('os.path.exists')
+    @patch("os.path.exists")
     def test_is_installed_true(self, mock_exists):
         """Test is_installed returns True when service exists"""
         mock_exists.return_value = True
         result = self.scheduler.is_installed()
         self.assertTrue(result)
 
-    @patch('os.path.exists')
+    @patch("os.path.exists")
     def test_is_installed_false(self, mock_exists):
         """Test is_installed returns False when service doesn't exist"""
         mock_exists.return_value = False
         result = self.scheduler.is_installed()
         self.assertFalse(result)
 
-    @patch('subprocess.check_output')
-    @patch('ddns.scheduler.systemd.read_file_safely')
-    @patch('os.path.exists')
+    @patch("subprocess.check_output")
+    @patch("ddns.scheduler.systemd.read_file_safely")
+    @patch("os.path.exists")
     def test_get_status_success(self, mock_exists, mock_read_file, mock_check_output):
         """Test get_status with proper file reading"""
         mock_exists.return_value = True
@@ -55,7 +56,7 @@ class TestSystemdScheduler(unittest.TestCase):
 
         mock_read_file.side_effect = mock_read_side_effect
         # Mock subprocess.check_output to return "enabled" status
-        mock_check_output.return_value = 'enabled'
+        mock_check_output.return_value = "enabled"
 
         status = self.scheduler.get_status()
 
@@ -64,8 +65,8 @@ class TestSystemdScheduler(unittest.TestCase):
         self.assertTrue(status["enabled"])
         self.assertEqual(status["interval"], 5)
 
-    @patch('ddns.scheduler.systemd.write_file')
-    @patch.object(SystemdScheduler, '_systemctl')
+    @patch("ddns.scheduler.systemd.write_file")
+    @patch.object(SystemdScheduler, "_systemctl")
     def test_install_with_sudo_fallback(self, mock_systemctl, mock_write_file):
         """Test install with sudo fallback for permission issues"""
         # Mock successful file writing and systemctl calls
@@ -84,14 +85,14 @@ class TestSystemdScheduler(unittest.TestCase):
     def test_systemctl_with_sudo_retry(self):
         """Test systemctl command with automatic sudo retry on permission error"""
         # Test that systemctl automatically retries with sudo when permission fails
-        with patch.object(self.scheduler, '_run_command') as mock_run_cmd:
+        with patch.object(self.scheduler, "_run_command") as mock_run_cmd:
             mock_run_cmd.side_effect = [None, "success"]  # First fails, sudo succeeds
             self.scheduler._systemctl("enable", "ddns.timer")
             # Should still return success after sudo retry
             mock_run_cmd.assert_called()
 
-    @patch('os.remove')
-    @patch.object(SystemdScheduler, '_systemctl')
+    @patch("os.remove")
+    @patch.object(SystemdScheduler, "_systemctl")
     def test_uninstall_with_permission_handling(self, mock_systemctl, mock_remove):
         """Test uninstall with proper permission handling"""
         mock_systemctl.return_value = True  # disable() succeeds

+ 1 - 0
tests/test_util_comment.py

@@ -3,6 +3,7 @@
 Unit tests for ddns.util.comment module
 @author: GitHub Copilot
 """
+
 from __future__ import unicode_literals
 from __init__ import unittest
 from ddns.util.comment import remove_comment

+ 11 - 30
tests/test_util_fileio.py

@@ -2,25 +2,21 @@
 """
 Tests for ddns.util.fileio module
 """
-from __init__ import unittest, patch, MagicMock
-import tempfile
+
 import os
 import shutil
+import tempfile
 from io import open  # Python 2/3 compatible UTF-8 file operations
 
-import ddns.util.fileio as fileio
+from __init__ import MagicMock, patch, unittest
 
+import ddns.util.fileio as fileio
 
 # Test constants
 TEST_ENCODING_UTF8 = "utf-8"
 TEST_ENCODING_ASCII = "ascii"
-# Ensure content is unicode for Python 2 compatibility
-try:
-    # Python 2
-    TEST_CONTENT_MULTILINGUAL = u"Hello World! 测试内容"
-except NameError:
-    # Python 3
-    TEST_CONTENT_MULTILINGUAL = "Hello World! 测试内容"
+# Ensure content is unicode f
+TEST_CONTENT_MULTILINGUAL = u"Hello World! 测试内容"  # fmt: skip
 
 
 class TestFileIOModule(unittest.TestCase):
@@ -110,12 +106,7 @@ class TestFileIOModule(unittest.TestCase):
     def test_read_file_different_encoding(self):
         """Test read_file with different encoding"""
         # Use ASCII-safe content for encoding test
-        try:
-            # Python 2 - ensure unicode
-            content = u"ASCII content"
-        except NameError:
-            # Python 3
-            content = "ASCII content"
+        content = u"ASCII content"  # fmt: skip
 
         temp_fd, temp_path = tempfile.mkstemp()
         try:
@@ -254,7 +245,7 @@ class TestFileIOModule(unittest.TestCase):
         # Create UnicodeEncodeError with proper arguments for Python 2/3 compatibility
         try:
             # Python 2 - need unicode objects
-            error = UnicodeEncodeError("utf-8", u"", 0, 1, "invalid")
+            error = UnicodeEncodeError("utf-8", u"", 0, 1, "invalid")  # fmt: skip
         except TypeError:
             # Python 3 - accepts str objects
             error = UnicodeEncodeError("utf-8", "", 0, 1, "invalid")
@@ -324,12 +315,7 @@ class TestFileIOModule(unittest.TestCase):
             self.assertEqual(content, self.test_content)
 
             # 4. Safe operations
-            try:
-                # Python 2 - ensure unicode
-                updated_content_str = u"Updated content"
-            except NameError:
-                # Python 3
-                updated_content_str = "Updated content"
+            updated_content_str = u"Updated content"  # fmt: skip
 
             self.assertTrue(fileio.write_file_safely(test_file, updated_content_str))
             updated_content = fileio.read_file_safely(test_file)
@@ -368,7 +354,7 @@ class TestFileIOModule(unittest.TestCase):
                 u"العربية",
                 u"🌟✨🎉",
                 u"Mixed: English 中文 🎉",
-            ]
+            ]  # fmt: skip
         except NameError:
             # Python 3
             test_contents = ["English text", "中文测试", "Русский текст", "العربية", "🌟✨🎉", "Mixed: English 中文 🎉"]
@@ -393,12 +379,7 @@ class TestFileIOModule(unittest.TestCase):
     def test_different_encodings(self):
         """Test support for different encodings"""
         # Use ASCII-safe content for encoding test
-        try:
-            # Python 2 - ensure unicode
-            ascii_content = u"ASCII only content"
-        except NameError:
-            # Python 3
-            ascii_content = "ASCII only content"
+        ascii_content = u"ASCII only content"  # fmt: skip
 
         temp_dir = tempfile.mkdtemp()
         try:

+ 1 - 6
tests/test_util_http.py

@@ -11,12 +11,7 @@ import json
 import socket
 import random
 
-from ddns.util.http import (
-    HttpResponse,
-    _decode_response_body,
-    quote,
-    USER_AGENT,
-)
+from ddns.util.http import HttpResponse, _decode_response_body, quote, USER_AGENT
 
 # Python 2/3 compatibility
 if sys.version_info[0] == 2:  # python 2

+ 1 - 4
tests/test_util_http_retry.py

@@ -18,10 +18,7 @@ except ImportError:
     from StringIO import StringIO  # type: ignore[no-redef]
     from urllib2 import URLError  # type: ignore[no-redef]
 
-from ddns.util.http import (
-    RetryHandler,
-    request,
-)
+from ddns.util.http import RetryHandler, request
 
 
 class TestRetryHandler(unittest.TestCase):