cloudflare.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # coding=utf-8
  2. """
  3. CloudFlare API
  4. @author: TongYifan, NewFuture
  5. """
  6. from ._base import BaseProvider, TYPE_JSON
  7. class CloudflareProvider(BaseProvider):
  8. API = "https://api.cloudflare.com"
  9. content_type = TYPE_JSON
  10. def _validate(self):
  11. self.logger.warning(
  12. "Cloudflare provider 缺少充分的真实环境测试,如遇问题请及时在 GitHub Issues 中反馈: %s",
  13. "https://github.com/NewFuture/DDNS/issues",
  14. )
  15. if not self.auth_token:
  16. raise ValueError("token must be configured")
  17. if self.auth_id:
  18. # must be email for Cloudflare API v4
  19. if "@" not in self.auth_id:
  20. self.logger.critical("ID 必须为空或有效的邮箱地址")
  21. raise ValueError("ID must be a valid email or Empty for Cloudflare API v4")
  22. def _request(self, method, action, **params):
  23. """发送请求数据"""
  24. headers = {}
  25. if self.auth_id:
  26. headers["X-Auth-Email"] = self.auth_id
  27. headers["X-Auth-Key"] = self.auth_token
  28. else:
  29. headers["Authorization"] = "Bearer " + self.auth_token
  30. params = {k: v for k, v in params.items() if v is not None} # 过滤掉None参数
  31. data = self._http(method, "/client/v4/zones" + action, headers=headers, params=params)
  32. if data and data.get("success"):
  33. return data.get("result") # 返回结果或原始数据
  34. else:
  35. self.logger.warning("Cloudflare API error: %s", data.get("errors", "Unknown error"))
  36. return data
  37. def _query_zone_id(self, domain):
  38. """https://developers.cloudflare.com/api/resources/zones/methods/list/"""
  39. params = {"name.exact": domain, "per_page": 50}
  40. zones = self._request("GET", "", **params)
  41. zone = next((z for z in zones if domain == z.get("name", "")), None)
  42. self.logger.debug("Queried zone: %s", zone)
  43. if zone:
  44. return zone["id"]
  45. return None
  46. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  47. # type: (str, str, str, str, str | None, dict) -> dict | None
  48. """https://developers.cloudflare.com/api/resources/dns/subresources/records/methods/list/"""
  49. # cloudflare的域名查询需要完整域名
  50. name = self._join_domain(subdomain, main_domain)
  51. query = {"name.exact": name} # type: dict[str, str|None]
  52. if extra:
  53. query["proxied"] = extra.get("proxied", None) # 代理状态
  54. data = self._request("GET", "/{}/dns_records".format(zone_id), type=record_type, per_page=10000, **query)
  55. record = next((r for r in data if r.get("name") == name and r.get("type") == record_type), None)
  56. self.logger.debug("Record queried: %s", record)
  57. if record:
  58. return record
  59. self.logger.warning("Failed to query record: %s", data)
  60. return None
  61. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  62. # type: (str, str, str, str, str, int | str | None, str | None, dict ) -> bool
  63. """https://developers.cloudflare.com/api/resources/dns/subresources/records/methods/create/"""
  64. name = self._join_domain(subdomain, main_domain)
  65. extra["comment"] = extra.get("comment", self.remark) # 添加注释
  66. data = self._request(
  67. "POST", "/{}/dns_records".format(zone_id), name=name, type=record_type, content=value, ttl=ttl, **extra
  68. )
  69. if data:
  70. self.logger.info("Record created: %s", data)
  71. return True
  72. self.logger.error("Failed to create record: %s", data)
  73. return False
  74. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  75. # type: (str, dict, str, str, int | str | None, str | None, dict) -> bool
  76. """https://developers.cloudflare.com/api/resources/dns/subresources/records/methods/edit/"""
  77. extra["comment"] = extra.get("comment", self.remark) # 注释
  78. extra["proxied"] = old_record.get("proxied", extra.get("proxied")) # 保持原有的代理状态
  79. extra["tags"] = old_record.get("tags", extra.get("tags")) # 保持原有的标签
  80. extra["settings"] = old_record.get("settings", extra.get("settings")) # 保持原有的设置
  81. data = self._request(
  82. "PUT",
  83. "/{}/dns_records/{}".format(zone_id, old_record["id"]),
  84. type=record_type,
  85. name=old_record.get("name"),
  86. content=value,
  87. ttl=ttl,
  88. **extra
  89. )
  90. self.logger.debug("Record updated: %s", data)
  91. if data:
  92. return True
  93. return False