main.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. #!/usr/bin/env python3
  2. import re
  3. import os
  4. import sys
  5. import time
  6. import atexit
  7. import signal
  8. import ipaddress
  9. from collections import Counter
  10. from random import randint
  11. from threading import Thread
  12. from threading import Lock
  13. import redis
  14. import json
  15. import dns.resolver
  16. import dns.exception
  17. import uuid
  18. from modules.Logger import Logger
  19. from modules.IPTables import IPTables
  20. from modules.NFTables import NFTables
  21. # globals
  22. WHITELIST = []
  23. BLACKLIST= []
  24. bans = {}
  25. quit_now = False
  26. exit_code = 0
  27. lock = Lock()
  28. chain_name = "MAILCOW"
  29. r = None
  30. pubsub = None
  31. clear_before_quit = False
  32. def refreshF2boptions():
  33. global f2boptions
  34. global quit_now
  35. global exit_code
  36. f2boptions = {}
  37. if not r.get('F2B_OPTIONS'):
  38. f2boptions['ban_time'] = r.get('F2B_BAN_TIME')
  39. f2boptions['max_ban_time'] = r.get('F2B_MAX_BAN_TIME')
  40. f2boptions['ban_time_increment'] = r.get('F2B_BAN_TIME_INCREMENT')
  41. f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS')
  42. f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW')
  43. f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4')
  44. f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6')
  45. else:
  46. try:
  47. f2boptions = json.loads(r.get('F2B_OPTIONS'))
  48. except ValueError:
  49. logger.logCrit('Error loading F2B options: F2B_OPTIONS is not json')
  50. quit_now = True
  51. exit_code = 2
  52. verifyF2boptions(f2boptions)
  53. r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
  54. def verifyF2boptions(f2boptions):
  55. verifyF2boption(f2boptions,'ban_time', 1800)
  56. verifyF2boption(f2boptions,'max_ban_time', 10000)
  57. verifyF2boption(f2boptions,'ban_time_increment', True)
  58. verifyF2boption(f2boptions,'max_attempts', 10)
  59. verifyF2boption(f2boptions,'retry_window', 600)
  60. verifyF2boption(f2boptions,'netban_ipv4', 32)
  61. verifyF2boption(f2boptions,'netban_ipv6', 128)
  62. verifyF2boption(f2boptions,'banlist_id', str(uuid.uuid4()))
  63. verifyF2boption(f2boptions,'manage_external', 0)
  64. def verifyF2boption(f2boptions, f2boption, f2bdefault):
  65. f2boptions[f2boption] = f2boptions[f2boption] if f2boption in f2boptions and f2boptions[f2boption] is not None else f2bdefault
  66. def refreshF2bregex():
  67. global f2bregex
  68. global quit_now
  69. global exit_code
  70. if not r.get('F2B_REGEX'):
  71. f2bregex = {}
  72. f2bregex[1] = r'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
  73. f2bregex[2] = r'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
  74. f2bregex[3] = r'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+'
  75. f2bregex[4] = r'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+'
  76. f2bregex[5] = r'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
  77. f2bregex[6] = r'\w+\([^,]+,([0-9a-f\.:]+),<[^>]+>\): Password mismatch \(SHA1 of given password: [a-f0-9]+\)'
  78. f2bregex[7] = r'\w+\([^,]+,([0-9a-f\.:]+),<[^>]+>\): unknown user \(SHA1 of given password: [a-f0-9]+\)'
  79. f2bregex[8] = r'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
  80. f2bregex[9] = r'([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
  81. r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
  82. else:
  83. try:
  84. f2bregex = {}
  85. f2bregex = json.loads(r.get('F2B_REGEX'))
  86. except ValueError:
  87. logger.logCrit('Error loading F2B options: F2B_REGEX is not json')
  88. quit_now = True
  89. exit_code = 2
  90. def get_ip(address):
  91. ip = ipaddress.ip_address(address)
  92. if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
  93. ip = ip.ipv4_mapped
  94. if ip.is_private or ip.is_loopback:
  95. return False
  96. return ip
  97. def ban(address):
  98. global f2boptions
  99. global lock
  100. refreshF2boptions()
  101. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  102. RETRY_WINDOW = int(f2boptions['retry_window'])
  103. NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
  104. NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
  105. ip = get_ip(address)
  106. if not ip: return
  107. address = str(ip)
  108. self_network = ipaddress.ip_network(address)
  109. with lock:
  110. temp_whitelist = set(WHITELIST)
  111. if temp_whitelist:
  112. for wl_key in temp_whitelist:
  113. wl_net = ipaddress.ip_network(wl_key, False)
  114. if wl_net.overlaps(self_network):
  115. logger.logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
  116. return
  117. net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
  118. net = str(net)
  119. if not net in bans:
  120. bans[net] = {'attempts': 0, 'last_attempt': 0, 'ban_counter': 0}
  121. current_attempt = time.time()
  122. if current_attempt - bans[net]['last_attempt'] > RETRY_WINDOW:
  123. bans[net]['attempts'] = 0
  124. bans[net]['attempts'] += 1
  125. bans[net]['last_attempt'] = current_attempt
  126. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  127. cur_time = int(round(time.time()))
  128. NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
  129. logger.logCrit('Banning %s for %d minutes' % (net, NET_BAN_TIME / 60 ))
  130. if type(ip) is ipaddress.IPv4Address and int(f2boptions['manage_external']) != 1:
  131. with lock:
  132. tables.banIPv4(net)
  133. elif int(f2boptions['manage_external']) != 1:
  134. with lock:
  135. tables.banIPv6(net)
  136. r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + NET_BAN_TIME)
  137. else:
  138. logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
  139. def unban(net):
  140. global lock
  141. if not net in bans:
  142. logger.logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
  143. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  144. return
  145. logger.logInfo('Unbanning %s' % net)
  146. if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
  147. with lock:
  148. tables.unbanIPv4(net)
  149. else:
  150. with lock:
  151. tables.unbanIPv6(net)
  152. r.hdel('F2B_ACTIVE_BANS', '%s' % net)
  153. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  154. if net in bans:
  155. bans[net]['attempts'] = 0
  156. bans[net]['ban_counter'] += 1
  157. def permBan(net, unban=False):
  158. global f2boptions
  159. global lock
  160. is_unbanned = False
  161. is_banned = False
  162. if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
  163. with lock:
  164. if unban:
  165. is_unbanned = tables.unbanIPv4(net)
  166. elif int(f2boptions['manage_external']) != 1:
  167. is_banned = tables.banIPv4(net)
  168. else:
  169. with lock:
  170. if unban:
  171. is_unbanned = tables.unbanIPv6(net)
  172. elif int(f2boptions['manage_external']) != 1:
  173. is_banned = tables.banIPv6(net)
  174. if is_unbanned:
  175. r.hdel('F2B_PERM_BANS', '%s' % net)
  176. logger.logCrit('Removed host/network %s from blacklist' % net)
  177. elif is_banned:
  178. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  179. logger.logCrit('Added host/network %s to blacklist' % net)
  180. def clear():
  181. global lock
  182. logger.logInfo('Clearing all bans')
  183. for net in bans.copy():
  184. unban(net)
  185. with lock:
  186. tables.clearIPv4Table()
  187. tables.clearIPv6Table()
  188. try:
  189. if r is not None:
  190. r.delete('F2B_ACTIVE_BANS')
  191. r.delete('F2B_PERM_BANS')
  192. except Exception as ex:
  193. logger.logWarn('Error clearing redis keys F2B_ACTIVE_BANS and F2B_PERM_BANS: %s' % ex)
  194. def watch():
  195. global pubsub
  196. global quit_now
  197. global exit_code
  198. logger.logInfo('Watching Redis channel F2B_CHANNEL')
  199. pubsub.subscribe('F2B_CHANNEL')
  200. while not quit_now:
  201. try:
  202. for item in pubsub.listen():
  203. refreshF2bregex()
  204. for rule_id, rule_regex in f2bregex.items():
  205. if item['data'] and item['type'] == 'message':
  206. try:
  207. result = re.search(rule_regex, item['data'])
  208. except re.error:
  209. result = False
  210. if result:
  211. addr = result.group(1)
  212. ip = ipaddress.ip_address(addr)
  213. if ip.is_private or ip.is_loopback:
  214. continue
  215. logger.logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
  216. ban(addr)
  217. except Exception as ex:
  218. logger.logWarn('Error reading log line from pubsub: %s' % ex)
  219. pubsub = None
  220. quit_now = True
  221. exit_code = 2
  222. def snat4(snat_target):
  223. global lock
  224. global quit_now
  225. while not quit_now:
  226. time.sleep(10)
  227. with lock:
  228. tables.snat4(snat_target, os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24')
  229. def snat6(snat_target):
  230. global lock
  231. global quit_now
  232. while not quit_now:
  233. time.sleep(10)
  234. with lock:
  235. tables.snat6(snat_target, os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64'))
  236. def autopurge():
  237. global f2boptions
  238. while not quit_now:
  239. time.sleep(10)
  240. refreshF2boptions()
  241. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  242. QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
  243. if QUEUE_UNBAN:
  244. for net in QUEUE_UNBAN:
  245. unban(str(net))
  246. for net in bans.copy():
  247. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  248. NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
  249. TIME_SINCE_LAST_ATTEMPT = time.time() - bans[net]['last_attempt']
  250. if TIME_SINCE_LAST_ATTEMPT > NET_BAN_TIME:
  251. unban(net)
  252. def mailcowChainOrder():
  253. global lock
  254. global quit_now
  255. global exit_code
  256. while not quit_now:
  257. time.sleep(10)
  258. with lock:
  259. quit_now, exit_code = tables.checkIPv4ChainOrder()
  260. if quit_now: return
  261. quit_now, exit_code = tables.checkIPv6ChainOrder()
  262. def calcNetBanTime(ban_counter):
  263. global f2boptions
  264. BAN_TIME = int(f2boptions['ban_time'])
  265. MAX_BAN_TIME = int(f2boptions['max_ban_time'])
  266. BAN_TIME_INCREMENT = bool(f2boptions['ban_time_increment'])
  267. NET_BAN_TIME = BAN_TIME if not BAN_TIME_INCREMENT else BAN_TIME * 2 ** ban_counter
  268. NET_BAN_TIME = max([BAN_TIME, min([NET_BAN_TIME, MAX_BAN_TIME])])
  269. return NET_BAN_TIME
  270. def isIpNetwork(address):
  271. try:
  272. ipaddress.ip_network(address, False)
  273. except ValueError:
  274. return False
  275. return True
  276. def genNetworkList(list):
  277. resolver = dns.resolver.Resolver()
  278. hostnames = []
  279. networks = []
  280. for key in list:
  281. if isIpNetwork(key):
  282. networks.append(key)
  283. else:
  284. hostnames.append(key)
  285. for hostname in hostnames:
  286. hostname_ips = []
  287. for rdtype in ['A', 'AAAA']:
  288. try:
  289. answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
  290. except dns.exception.Timeout:
  291. logger.logInfo('Hostname %s timedout on resolve' % hostname)
  292. break
  293. except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
  294. continue
  295. except dns.exception.DNSException as dnsexception:
  296. logger.logInfo('%s' % dnsexception)
  297. continue
  298. for rdata in answer:
  299. hostname_ips.append(rdata.to_text())
  300. networks.extend(hostname_ips)
  301. return set(networks)
  302. def whitelistUpdate():
  303. global lock
  304. global quit_now
  305. global WHITELIST
  306. while not quit_now:
  307. start_time = time.time()
  308. list = r.hgetall('F2B_WHITELIST')
  309. new_whitelist = []
  310. if list:
  311. new_whitelist = genNetworkList(list)
  312. with lock:
  313. if Counter(new_whitelist) != Counter(WHITELIST):
  314. WHITELIST = new_whitelist
  315. logger.logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
  316. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  317. def blacklistUpdate():
  318. global quit_now
  319. global BLACKLIST
  320. while not quit_now:
  321. start_time = time.time()
  322. list = r.hgetall('F2B_BLACKLIST')
  323. new_blacklist = []
  324. if list:
  325. new_blacklist = genNetworkList(list)
  326. if Counter(new_blacklist) != Counter(BLACKLIST):
  327. addban = set(new_blacklist).difference(BLACKLIST)
  328. delban = set(BLACKLIST).difference(new_blacklist)
  329. BLACKLIST = new_blacklist
  330. logger.logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
  331. if addban:
  332. for net in addban:
  333. permBan(net=net)
  334. if delban:
  335. for net in delban:
  336. permBan(net=net, unban=True)
  337. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  338. def sigterm_quit(signum, frame):
  339. global clear_before_quit
  340. clear_before_quit = True
  341. sys.exit(exit_code)
  342. def berfore_quit():
  343. if clear_before_quit:
  344. clear()
  345. if pubsub is not None:
  346. pubsub.unsubscribe()
  347. if __name__ == '__main__':
  348. atexit.register(berfore_quit)
  349. signal.signal(signal.SIGTERM, sigterm_quit)
  350. # init Logger
  351. logger = Logger()
  352. # init backend
  353. backend = sys.argv[1]
  354. if backend == "nftables":
  355. logger.logInfo('Using NFTables backend')
  356. tables = NFTables(chain_name, logger)
  357. else:
  358. logger.logInfo('Using IPTables backend')
  359. tables = IPTables(chain_name, logger)
  360. # In case a previous session was killed without cleanup
  361. clear()
  362. # Reinit MAILCOW chain
  363. # Is called before threads start, no locking
  364. logger.logInfo("Initializing mailcow netfilter chain")
  365. tables.initChainIPv4()
  366. tables.initChainIPv6()
  367. if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE").lower() in ("y", "yes"):
  368. logger.logInfo(f"Skipping {chain_name} isolation")
  369. else:
  370. logger.logInfo(f"Setting {chain_name} isolation")
  371. tables.create_mailcow_isolation_rule("br-mailcow", [3306, 6379, 8983, 12345], os.getenv("MAILCOW_REPLICA_IP"))
  372. # connect to redis
  373. while True:
  374. try:
  375. redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
  376. redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
  377. if "".__eq__(redis_slaveof_ip):
  378. r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0, password=os.environ['REDISPASS'])
  379. else:
  380. r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0, password=os.environ['REDISPASS'])
  381. r.ping()
  382. pubsub = r.pubsub()
  383. except Exception as ex:
  384. print('%s - trying again in 3 seconds' % (ex))
  385. time.sleep(3)
  386. else:
  387. break
  388. logger.set_redis(r)
  389. # rename fail2ban to netfilter
  390. if r.exists('F2B_LOG'):
  391. r.rename('F2B_LOG', 'NETFILTER_LOG')
  392. # clear bans in redis
  393. r.delete('F2B_ACTIVE_BANS')
  394. r.delete('F2B_PERM_BANS')
  395. refreshF2boptions()
  396. watch_thread = Thread(target=watch)
  397. watch_thread.daemon = True
  398. watch_thread.start()
  399. if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
  400. try:
  401. snat_ip = os.getenv('SNAT_TO_SOURCE')
  402. snat_ipo = ipaddress.ip_address(snat_ip)
  403. if type(snat_ipo) is ipaddress.IPv4Address:
  404. snat4_thread = Thread(target=snat4,args=(snat_ip,))
  405. snat4_thread.daemon = True
  406. snat4_thread.start()
  407. except ValueError:
  408. print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
  409. if os.getenv('SNAT6_TO_SOURCE') and os.getenv('SNAT6_TO_SOURCE') != 'n':
  410. try:
  411. snat_ip = os.getenv('SNAT6_TO_SOURCE')
  412. snat_ipo = ipaddress.ip_address(snat_ip)
  413. if type(snat_ipo) is ipaddress.IPv6Address:
  414. snat6_thread = Thread(target=snat6,args=(snat_ip,))
  415. snat6_thread.daemon = True
  416. snat6_thread.start()
  417. except ValueError:
  418. print(os.getenv('SNAT6_TO_SOURCE') + ' is not a valid IPv6 address')
  419. autopurge_thread = Thread(target=autopurge)
  420. autopurge_thread.daemon = True
  421. autopurge_thread.start()
  422. mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
  423. mailcowchainwatch_thread.daemon = True
  424. mailcowchainwatch_thread.start()
  425. blacklistupdate_thread = Thread(target=blacklistUpdate)
  426. blacklistupdate_thread.daemon = True
  427. blacklistupdate_thread.start()
  428. whitelistupdate_thread = Thread(target=whitelistUpdate)
  429. whitelistupdate_thread.daemon = True
  430. whitelistupdate_thread.start()
  431. while not quit_now:
  432. time.sleep(0.5)
  433. sys.exit(exit_code)