main.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. #!/usr/bin/env python3
  2. DEBUG = False
  3. import re
  4. import os
  5. import sys
  6. import time
  7. import atexit
  8. import signal
  9. import ipaddress
  10. from collections import Counter
  11. from random import randint
  12. from threading import Thread
  13. from threading import Lock
  14. import redis
  15. import json
  16. import dns.resolver
  17. import dns.exception
  18. import uuid
  19. from modules.Logger import Logger
  20. from modules.IPTables import IPTables
  21. from modules.NFTables import NFTables
  22. def logdebug(msg):
  23. if DEBUG:
  24. logger.logInfo("DEBUG: %s" % msg)
  25. # Globals
  26. WHITELIST = []
  27. BLACKLIST = []
  28. bans = {}
  29. quit_now = False
  30. exit_code = 0
  31. lock = Lock()
  32. chain_name = "MAILCOW"
  33. r = None
  34. pubsub = None
  35. clear_before_quit = False
  36. def refreshF2boptions():
  37. global f2boptions
  38. global quit_now
  39. global exit_code
  40. f2boptions = {}
  41. if not r.get('F2B_OPTIONS'):
  42. f2boptions['ban_time'] = r.get('F2B_BAN_TIME')
  43. f2boptions['max_ban_time'] = r.get('F2B_MAX_BAN_TIME')
  44. f2boptions['ban_time_increment'] = r.get('F2B_BAN_TIME_INCREMENT')
  45. f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS')
  46. f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW')
  47. f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4')
  48. f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6')
  49. else:
  50. try:
  51. f2boptions = json.loads(r.get('F2B_OPTIONS'))
  52. except ValueError as e:
  53. logger.logCrit(
  54. 'Error loading F2B options: F2B_OPTIONS is not json. Exception: %s' % e)
  55. quit_now = True
  56. exit_code = 2
  57. verifyF2boptions(f2boptions)
  58. r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
  59. def verifyF2boptions(f2boptions):
  60. verifyF2boption(f2boptions, 'ban_time', 1800)
  61. verifyF2boption(f2boptions, 'max_ban_time', 10000)
  62. verifyF2boption(f2boptions, 'ban_time_increment', True)
  63. verifyF2boption(f2boptions, 'max_attempts', 10)
  64. verifyF2boption(f2boptions, 'retry_window', 600)
  65. verifyF2boption(f2boptions, 'netban_ipv4', 32)
  66. verifyF2boption(f2boptions, 'netban_ipv6', 128)
  67. verifyF2boption(f2boptions, 'banlist_id', str(uuid.uuid4()))
  68. verifyF2boption(f2boptions, 'manage_external', 0)
  69. def verifyF2boption(f2boptions, f2boption, f2bdefault):
  70. f2boptions[f2boption] = f2boptions[f2boption] if f2boption in f2boptions and f2boptions[f2boption] is not None else f2bdefault
  71. def refreshF2bregex():
  72. global f2bregex
  73. global quit_now
  74. global exit_code
  75. if not r.get('F2B_REGEX'):
  76. f2bregex = {}
  77. f2bregex[1] = r'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
  78. f2bregex[2] = r'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
  79. f2bregex[3] = r'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+'
  80. f2bregex[4] = r'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+'
  81. f2bregex[5] = r'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
  82. f2bregex[6] = r'\w+\([^,]+,([0-9a-f\.:]+),<[^>]+>\): Password mismatch \(SHA1 of given password: [a-f0-9]+\)'
  83. f2bregex[7] = r'\w+\([^,]+,([0-9a-f\.:]+),<[^>]+>\): unknown user \(SHA1 of given password: [a-f0-9]+\)'
  84. f2bregex[8] = r'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
  85. f2bregex[9] = r'([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
  86. r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
  87. else:
  88. try:
  89. f2bregex = {}
  90. f2bregex = json.loads(r.get('F2B_REGEX'))
  91. except ValueError:
  92. logger.logCrit('Error loading F2B options: F2B_REGEX is not json')
  93. quit_now = True
  94. exit_code = 2
  95. def get_ip(address):
  96. ip = ipaddress.ip_address(address)
  97. if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
  98. ip = ip.ipv4_mapped
  99. if ip.is_private or ip.is_loopback:
  100. return False
  101. return ip
  102. def ban(address):
  103. global f2boptions
  104. global lock
  105. logdebug("ban() called with address=%s" % address)
  106. refreshF2boptions()
  107. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  108. RETRY_WINDOW = int(f2boptions['retry_window'])
  109. NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
  110. NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
  111. ip = get_ip(address)
  112. if not ip:
  113. logdebug("No valid IP -- skipping ban()")
  114. return
  115. address = str(ip)
  116. self_network = ipaddress.ip_network(address)
  117. with lock:
  118. temp_whitelist = set(WHITELIST)
  119. logdebug("Checking if %s overlaps with any WHITELIST entries" % self_network)
  120. if temp_whitelist:
  121. for wl_key in temp_whitelist:
  122. wl_net = ipaddress.ip_network(wl_key, False)
  123. logdebug("Checking overlap between %s and %s" % (self_network, wl_net))
  124. if wl_net.overlaps(self_network):
  125. logger.logInfo(
  126. 'Address %s is allowlisted by rule %s' % (self_network, wl_net))
  127. return
  128. net = ipaddress.ip_network(
  129. (address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
  130. net = str(net)
  131. logdebug("Ban net: %s" % net)
  132. if not net in bans:
  133. bans[net] = {'attempts': 0, 'last_attempt': 0, 'ban_counter': 0}
  134. logdebug("Initing new ban counter for %s" % net)
  135. current_attempt = time.time()
  136. logdebug("Current attempt ts=%s, previous: %s, retry_window: %s" %
  137. (current_attempt, bans[net]['last_attempt'], RETRY_WINDOW))
  138. if current_attempt - bans[net]['last_attempt'] > RETRY_WINDOW:
  139. bans[net]['attempts'] = 0
  140. logdebug("Ban counter for %s reset as window expired" % net)
  141. bans[net]['attempts'] += 1
  142. bans[net]['last_attempt'] = current_attempt
  143. logdebug("%s attempts now %d" % (net, bans[net]['attempts']))
  144. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  145. cur_time = int(round(time.time()))
  146. NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
  147. logger.logCrit('Banning %s for %d minutes' % (net, NET_BAN_TIME / 60 ))
  148. if type(ip) is ipaddress.IPv4Address and int(f2boptions['manage_external']) != 1:
  149. with lock:
  150. logdebug("Calling tables.banIPv4(%s)" % net)
  151. tables.banIPv4(net)
  152. elif int(f2boptions['manage_external']) != 1:
  153. with lock:
  154. logdebug("Calling tables.banIPv6(%s)" % net)
  155. tables.banIPv6(net)
  156. logdebug("Updating F2B_ACTIVE_BANS[%s]=%d" %
  157. (net, cur_time + NET_BAN_TIME))
  158. r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + NET_BAN_TIME)
  159. else:
  160. logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (
  161. MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
  162. def unban(net):
  163. global lock
  164. logdebug("Calling unban() with net=%s" % net)
  165. if not net in bans:
  166. logger.logInfo(
  167. '%s is not banned, skipping unban and deleting from queue (if any)' % net)
  168. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  169. return
  170. logger.logInfo('Unbanning %s' % net)
  171. if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
  172. with lock:
  173. logdebug("Calling tables.unbanIPv4(%s)" % net)
  174. tables.unbanIPv4(net)
  175. else:
  176. with lock:
  177. logdebug("Calling tables.unbanIPv6(%s)" % net)
  178. tables.unbanIPv6(net)
  179. r.hdel('F2B_ACTIVE_BANS', '%s' % net)
  180. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  181. if net in bans:
  182. logdebug("Unban for %s, setting attempts=0, ban_counter+=1" % net)
  183. bans[net]['attempts'] = 0
  184. bans[net]['ban_counter'] += 1
  185. def permBan(net, unban=False):
  186. global f2boptions
  187. global lock
  188. is_unbanned = False
  189. is_banned = False
  190. if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
  191. with lock:
  192. if unban:
  193. is_unbanned = tables.unbanIPv4(net)
  194. elif int(f2boptions['manage_external']) != 1:
  195. is_banned = tables.banIPv4(net)
  196. else:
  197. with lock:
  198. if unban:
  199. is_unbanned = tables.unbanIPv6(net)
  200. elif int(f2boptions['manage_external']) != 1:
  201. is_banned = tables.banIPv6(net)
  202. if is_unbanned:
  203. r.hdel('F2B_PERM_BANS', '%s' % net)
  204. logger.logCrit('Removed host/network %s from denylist' % net)
  205. elif is_banned:
  206. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  207. logger.logCrit('Added host/network %s to denylist' % net)
  208. def clear():
  209. global lock
  210. logger.logInfo('Clearing all bans')
  211. for net in bans.copy():
  212. logdebug("Unbanning net: %s" % net)
  213. unban(net)
  214. with lock:
  215. logdebug("Clearing IPv4/IPv6 table")
  216. tables.clearIPv4Table()
  217. tables.clearIPv6Table()
  218. try:
  219. if r is not None:
  220. r.delete('F2B_ACTIVE_BANS')
  221. r.delete('F2B_PERM_BANS')
  222. except Exception as ex:
  223. logger.logWarn('Error clearing redis keys F2B_ACTIVE_BANS and F2B_PERM_BANS: %s' % ex)
  224. def watch():
  225. global pubsub
  226. global quit_now
  227. global exit_code
  228. logger.logInfo('Watching Redis channel F2B_CHANNEL')
  229. pubsub.subscribe('F2B_CHANNEL')
  230. while not quit_now:
  231. try:
  232. for item in pubsub.listen():
  233. refreshF2bregex()
  234. for rule_id, rule_regex in f2bregex.items():
  235. if item['data'] and item['type'] == 'message':
  236. try:
  237. result = re.search(rule_regex, item['data'])
  238. except re.error:
  239. result = False
  240. if result:
  241. addr = result.group(1)
  242. ip = ipaddress.ip_address(addr)
  243. if ip.is_private or ip.is_loopback:
  244. continue
  245. logger.logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
  246. ban(addr)
  247. except Exception as ex:
  248. logger.logWarn('Error reading log line from pubsub: %s' % ex)
  249. pubsub = None
  250. quit_now = True
  251. exit_code = 2
  252. def snat4(snat_target):
  253. global lock
  254. global quit_now
  255. while not quit_now:
  256. time.sleep(10)
  257. with lock:
  258. tables.snat4(snat_target, os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24')
  259. def snat6(snat_target):
  260. global lock
  261. global quit_now
  262. while not quit_now:
  263. time.sleep(10)
  264. with lock:
  265. tables.snat6(snat_target, os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64'))
  266. def autopurge():
  267. global f2boptions
  268. logdebug("autopurge thread started")
  269. while not quit_now:
  270. logdebug("autopurge tick")
  271. time.sleep(10)
  272. refreshF2boptions()
  273. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  274. QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
  275. logdebug("QUEUE_UNBAN: %s" % QUEUE_UNBAN)
  276. if QUEUE_UNBAN:
  277. for net in QUEUE_UNBAN:
  278. logdebug("Autopurge: unbanning queued net: %s" % net)
  279. unban(str(net))
  280. # Only check expiry for actively banned IPs:
  281. active_bans = r.hgetall('F2B_ACTIVE_BANS')
  282. now = time.time()
  283. for net_str, expire_str in active_bans.items():
  284. logdebug("Checking ban expiry for (actively banned): %s" % net_str)
  285. # Defensive: always process if timer missing or expired
  286. try:
  287. expire = float(expire_str)
  288. except Exception:
  289. logdebug("Invalid expire time for %s; unbanning" % net_str)
  290. unban(net_str)
  291. continue
  292. time_left = expire - now
  293. logdebug("Time left for %s: %.1f seconds" % (net_str, time_left))
  294. if time_left <= 0:
  295. logdebug("Ban expired for %s" % net_str)
  296. unban(net_str)
  297. def mailcowChainOrder():
  298. global lock
  299. global quit_now
  300. global exit_code
  301. while not quit_now:
  302. time.sleep(10)
  303. with lock:
  304. quit_now, exit_code = tables.checkIPv4ChainOrder()
  305. if quit_now: return
  306. quit_now, exit_code = tables.checkIPv6ChainOrder()
  307. def calcNetBanTime(ban_counter):
  308. global f2boptions
  309. BAN_TIME = int(f2boptions['ban_time'])
  310. MAX_BAN_TIME = int(f2boptions['max_ban_time'])
  311. BAN_TIME_INCREMENT = bool(f2boptions['ban_time_increment'])
  312. NET_BAN_TIME = BAN_TIME if not BAN_TIME_INCREMENT else BAN_TIME * 2 ** ban_counter
  313. NET_BAN_TIME = max([BAN_TIME, min([NET_BAN_TIME, MAX_BAN_TIME])])
  314. return NET_BAN_TIME
  315. def isIpNetwork(address):
  316. try:
  317. ipaddress.ip_network(address, False)
  318. except ValueError:
  319. return False
  320. return True
  321. def genNetworkList(list):
  322. resolver = dns.resolver.Resolver()
  323. hostnames = []
  324. networks = []
  325. for key in list:
  326. if isIpNetwork(key):
  327. networks.append(key)
  328. else:
  329. hostnames.append(key)
  330. for hostname in hostnames:
  331. hostname_ips = []
  332. for rdtype in ['A', 'AAAA']:
  333. try:
  334. answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
  335. except dns.exception.Timeout:
  336. logger.logInfo('Hostname %s timedout on resolve' % hostname)
  337. break
  338. except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
  339. continue
  340. except dns.exception.DNSException as dnsexception:
  341. logger.logInfo('%s' % dnsexception)
  342. continue
  343. for rdata in answer:
  344. hostname_ips.append(rdata.to_text())
  345. networks.extend(hostname_ips)
  346. return set(networks)
  347. def whitelistUpdate():
  348. global lock
  349. global quit_now
  350. global WHITELIST
  351. while not quit_now:
  352. start_time = time.time()
  353. list = r.hgetall('F2B_WHITELIST')
  354. new_whitelist = []
  355. if list:
  356. new_whitelist = genNetworkList(list)
  357. with lock:
  358. if Counter(new_whitelist) != Counter(WHITELIST):
  359. WHITELIST = new_whitelist
  360. logger.logInfo('Allowlist was changed, it has %s entries' % len(WHITELIST))
  361. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  362. def blacklistUpdate():
  363. global quit_now
  364. global BLACKLIST
  365. while not quit_now:
  366. start_time = time.time()
  367. list = r.hgetall('F2B_BLACKLIST')
  368. new_blacklist = []
  369. if list:
  370. new_blacklist = genNetworkList(list)
  371. if Counter(new_blacklist) != Counter(BLACKLIST):
  372. addban = set(new_blacklist).difference(BLACKLIST)
  373. delban = set(BLACKLIST).difference(new_blacklist)
  374. BLACKLIST = new_blacklist
  375. logger.logInfo('Denylist was changed, it has %s entries' % len(BLACKLIST))
  376. if addban:
  377. for net in addban:
  378. permBan(net=net)
  379. if delban:
  380. for net in delban:
  381. permBan(net=net, unban=True)
  382. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  383. def sigterm_quit(signum, frame):
  384. global clear_before_quit
  385. logdebug("SIGTERM received, setting clear_before_quit to True and exiting")
  386. clear_before_quit = True
  387. sys.exit(exit_code)
  388. def before_quit():
  389. logdebug("before_quit called, clear_before_quit=%s" % clear_before_quit)
  390. if clear_before_quit:
  391. clear()
  392. if pubsub is not None:
  393. pubsub.unsubscribe()
  394. if __name__ == '__main__':
  395. logger = Logger()
  396. logdebug("Sys.argv: %s" % sys.argv)
  397. atexit.register(before_quit)
  398. signal.signal(signal.SIGTERM, sigterm_quit)
  399. backend = sys.argv[1]
  400. logdebug("Backend: %s" % backend)
  401. if backend == "nftables":
  402. logger.logInfo('Using NFTables backend')
  403. tables = NFTables(chain_name, logger)
  404. else:
  405. logger.logInfo('Using IPTables backend')
  406. logger.logWarn(
  407. "DEPRECATION: iptables-legacy is deprecated and will be removed in future releases. "
  408. "Please switch to nftables on your host to ensure complete compatibility."
  409. )
  410. time.sleep(5)
  411. tables = IPTables(chain_name, logger)
  412. clear()
  413. logger.logInfo("Initializing mailcow netfilter chain")
  414. tables.initChainIPv4()
  415. tables.initChainIPv6()
  416. if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE", "").lower() in ("y", "yes"):
  417. logger.logInfo(f"Skipping {chain_name} isolation")
  418. else:
  419. logger.logInfo(f"Setting {chain_name} isolation")
  420. tables.create_mailcow_isolation_rule("br-mailcow", [3306, 6379, 8983, 12345], os.getenv("MAILCOW_REPLICA_IP"))
  421. # connect to redis
  422. while True:
  423. try:
  424. redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
  425. redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
  426. logdebug(
  427. "Connecting redis (SLAVEOF_IP:%s, PORT:%s)" % (redis_slaveof_ip, redis_slaveof_port))
  428. if "".__eq__(redis_slaveof_ip):
  429. r = redis.StrictRedis(
  430. host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0, password=os.environ['REDISPASS'])
  431. else:
  432. r = redis.StrictRedis(
  433. host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0, password=os.environ['REDISPASS'])
  434. r.ping()
  435. pubsub = r.pubsub()
  436. except Exception as ex:
  437. logdebug(
  438. 'Redis connection failed: %s - trying again in 3 seconds' % (ex))
  439. time.sleep(3)
  440. else:
  441. break
  442. logger.set_redis(r)
  443. logdebug("Redis connection established, setting up F2B keys")
  444. if r.exists('F2B_LOG'):
  445. logdebug("Renaming F2B_LOG to NETFILTER_LOG")
  446. r.rename('F2B_LOG', 'NETFILTER_LOG')
  447. r.delete('F2B_ACTIVE_BANS')
  448. r.delete('F2B_PERM_BANS')
  449. refreshF2boptions()
  450. watch_thread = Thread(target=watch)
  451. watch_thread.daemon = True
  452. watch_thread.start()
  453. if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
  454. try:
  455. snat_ip = os.getenv('SNAT_TO_SOURCE')
  456. snat_ipo = ipaddress.ip_address(snat_ip)
  457. if type(snat_ipo) is ipaddress.IPv4Address:
  458. snat4_thread = Thread(target=snat4, args=(snat_ip,))
  459. snat4_thread.daemon = True
  460. snat4_thread.start()
  461. except ValueError:
  462. print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
  463. if os.getenv('SNAT6_TO_SOURCE') and os.getenv('SNAT6_TO_SOURCE') != 'n':
  464. try:
  465. snat_ip = os.getenv('SNAT6_TO_SOURCE')
  466. snat_ipo = ipaddress.ip_address(snat_ip)
  467. if type(snat_ipo) is ipaddress.IPv6Address:
  468. snat6_thread = Thread(target=snat6,args=(snat_ip,))
  469. snat6_thread.daemon = True
  470. snat6_thread.start()
  471. except ValueError:
  472. print(os.getenv('SNAT6_TO_SOURCE') + ' is not a valid IPv6 address')
  473. autopurge_thread = Thread(target=autopurge)
  474. autopurge_thread.daemon = True
  475. autopurge_thread.start()
  476. mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
  477. mailcowchainwatch_thread.daemon = True
  478. mailcowchainwatch_thread.start()
  479. blacklistupdate_thread = Thread(target=blacklistUpdate)
  480. blacklistupdate_thread.daemon = True
  481. blacklistupdate_thread.start()
  482. whitelistupdate_thread = Thread(target=whitelistUpdate)
  483. whitelistupdate_thread.daemon = True
  484. whitelistupdate_thread.start()
  485. while not quit_now:
  486. time.sleep(0.5)
  487. logdebug("Exiting with code %s" % exit_code)
  488. sys.exit(exit_code)