| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from argparse import Action, ArgumentParser, Namespace, RawTextHelpFormatter
- from json import load as loadjson, dump as dumpjson
- from os import stat, environ, path
- from logging import error, getLevelName
- from ast import literal_eval
- import sys
- __cli_args = Namespace()
- __config = {} # type: dict
- log_levels = ['CRITICAL', 'FATAL', 'ERROR',
- 'WARN', 'WARNING', 'INFO', 'DEBUG', 'NOTSET']
- # 支持数组的参数列表
- ARRAY_PARAMS = ['index4', 'index6', 'ipv4', 'ipv6', 'proxy']
- # 简单数组,支持’,’, ‘;’ 分隔的参数列表
- SIMPLE_ARRAY_PARAMS = ['ipv4', 'ipv6', 'proxy']
- def str2bool(v):
- """
- parse string to boolean
- """
- if isinstance(v, bool):
- return v
- if v.lower() in ('yes', 'true', 't', 'y', '1'):
- return True
- elif v.lower() in ('no', 'false', 'f', 'n', '0'):
- return False
- else:
- return v
- def log_level(value):
- """
- parse string to log level
- """
- return getLevelName(value.upper())
- def parse_array_string(value, enable_simple_split):
- """
- 解析数组字符串
- 仅当 trim 之后以 '[' 开头以 ']' 结尾时,才尝试使用 ast.literal_eval 解析
- 默认返回原始字符串
- """
- if not isinstance(value, str):
- return value
- trimmed = value.strip()
- if trimmed.startswith('[') and trimmed.endswith(']'):
- try:
- # 尝试使用 ast.literal_eval 解析数组
- parsed_value = literal_eval(trimmed)
- # 确保解析结果是列表或元组
- if isinstance(parsed_value, (list, tuple)):
- return list(parsed_value)
- except (ValueError, SyntaxError) as e:
- # 解析失败时返回原始字符串
- error('Failed to parse array string: %s. Exception: %s', value, e)
- elif enable_simple_split and ',' in trimmed:
- # 尝试使用逗号或分号分隔符解析
- return [item.strip() for item in trimmed.split(',') if item.strip()]
- return value
- def init_config(description, doc, version):
- """
- 配置
- """
- global __cli_args
- parser = ArgumentParser(description=description,
- epilog=doc, formatter_class=RawTextHelpFormatter)
- parser.add_argument('-v', '--version',
- action='version', version=version)
- parser.add_argument('-c', '--config', help='run with config file [配置文件路径]')
- # 参数定义
- parser.add_argument('--dns', help='DNS Provider [DNS服务提供商]', choices=[
- 'alidns', 'cloudflare', 'dnscom', 'dnspod', 'dnspod_com', 'he', 'huaweidns', 'callback'])
- parser.add_argument('--id', help='api ID [授权账户]')
- parser.add_argument('--token', help='api token or Secret key [授权访问凭证或密钥]')
- parser.add_argument('--index4', nargs='*', action=ExtendAction,
- help='list to get ipv4 [IPV4 获取方式]')
- parser.add_argument('--index6', nargs='*', action=ExtendAction,
- help='list to get ipv6 [IPV6获取方式]')
- parser.add_argument('--ipv4', nargs='*', action=ExtendAction,
- help='ipv4 domain list [IPV4域名列表]')
- parser.add_argument('--ipv6', nargs='*', action=ExtendAction,
- help='ipv6 domain list [IPV6域名列表]')
- parser.add_argument('--ttl', type=int, help='ttl for DNS [DNS 解析 TTL 时间]')
- parser.add_argument('--proxy', nargs='*', action=ExtendAction,
- help='https proxy [设置http 代理,多代理逐个尝试直到成功]')
- parser.add_argument('--cache', type=str2bool, nargs='?',
- const=True, help='cache flag [启用缓存,可配配置路径或开关]')
- parser.add_argument('--debug', action='store_true',
- help='debug mode [调试模式,等同log.level=DEBUG]')
- parser.add_argument('--log.file', metavar='LOG_FILE',
- help='log file [日志文件,默认标准输出]')
- parser.add_argument('--log.level', type=log_level,
- metavar='|'.join(log_levels))
- parser.add_argument('--log.format', metavar='LOG_FORMAT',
- help='log format [日志格式字符串]')
- parser.add_argument('--log.datefmt', metavar='DATE_FORMAT',
- help='date format [日期格式字符串]')
- __cli_args = parser.parse_args()
- if __cli_args.debug:
- # 如果启用调试模式,则设置日志级别为 DEBUG
- setattr(__cli_args, 'log.level', log_level('DEBUG'))
- is_configfile_required = not get_config("token") and not get_config("id")
- config_file = get_config("config")
- if not config_file:
- # 未指定配置文件且需要读取文件时,依次查找
- cfgs = [
- path.abspath('config.json'),
- path.expanduser('~/.ddns/config.json'),
- '/etc/ddns/config.json'
- ]
- config_file = next((cfg for cfg in cfgs if path.isfile(cfg)), cfgs[0])
- if path.isfile(config_file):
- __load_config(config_file)
- __cli_args.config = config_file
- elif is_configfile_required:
- error('Config file is required, but not found: %s', config_file)
- # 如果需要配置文件但没有指定,则自动生成
- if generate_config(config_file):
- sys.stdout.write(
- 'Default configure file %s is generated.\n' % config_file)
- sys.exit(1)
- else:
- sys.exit('fail to load config from file: %s\n' % config_file)
- def __load_config(config_path):
- """
- 加载配置
- """
- global __config
- try:
- with open(config_path, 'r') as configfile:
- __config = loadjson(configfile)
- __config["config_modified_time"] = stat(config_path).st_mtime
- if 'log' in __config:
- if 'level' in __config['log'] and __config['log']['level'] is not None:
- __config['log.level'] = log_level(__config['log']['level'])
- if 'file' in __config['log']:
- __config['log.file'] = __config['log']['file']
- if 'format' in __config['log']:
- __config['log.format'] = __config['log']['format']
- if 'datefmt' in __config['log']:
- __config['log.datefmt'] = __config['log']['datefmt']
- elif 'log.level' in __config:
- __config['log.level'] = log_level(__config['log.level'])
- except Exception as e:
- error('Failed to load config file `%s`: %s', config_path, e)
- raise
- # 重新抛出异常
- def get_config(key, default=None):
- """
- 读取配置
- 1. 命令行参数
- 2. 配置文件
- 3. 环境变量
- """
- if hasattr(__cli_args, key) and getattr(__cli_args, key) is not None:
- return getattr(__cli_args, key)
- if key in __config:
- return __config.get(key)
- # 检查环境变量
- env_name = 'DDNS_' + key.replace('.', '_') # type:str
- variations = [env_name, env_name.upper(), env_name.lower()]
- value = next((environ.get(v) for v in variations if v in environ), None)
- # 如果找到环境变量值且参数支持数组,尝试解析为数组
- if value is not None and key in ARRAY_PARAMS:
- return parse_array_string(value, key in SIMPLE_ARRAY_PARAMS)
- return value if value is not None else default
- class ExtendAction(Action):
- """
- 兼容 Python <3.8 的 extend action
- """
- def __call__(self, parser, namespace, values, option_string=None):
- items = getattr(namespace, self.dest, None)
- if items is None:
- items = []
- # values 可能是单个值或列表
- if isinstance(values, list):
- items.extend(values)
- else:
- items.append(values)
- setattr(namespace, self.dest, items)
- def generate_config(config_path):
- """
- 生成配置文件
- """
- configure = {
- '$schema': 'https://ddns.newfuture.cc/schema/v4.0.json',
- 'id': 'YOUR ID or EMAIL for DNS Provider',
- 'token': 'YOUR TOKEN or KEY for DNS Provider',
- 'dns': 'dnspod',
- 'ipv4': [
- 'newfuture.cc',
- 'ddns.newfuture.cc'
- ],
- 'ipv6': [
- 'newfuture.cc',
- 'ipv6.ddns.newfuture.cc'
- ],
- 'index4': 'default',
- 'index6': 'default',
- 'ttl': None,
- 'proxy': None,
- 'log': {
- 'level': 'INFO'
- }
- }
- try:
- with open(config_path, 'w') as f:
- dumpjson(configure, f, indent=2, sort_keys=True)
- return True
- except IOError:
- error('Cannot open config file to write: `%s`!', config_path)
- return False
- except Exception as e:
- error('Failed to write config file `%s`: %s', config_path, e)
- return False
|