main.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. #!/usr/bin/env python3
  2. import re
  3. import os
  4. import sys
  5. import time
  6. import atexit
  7. import signal
  8. import ipaddress
  9. from collections import Counter
  10. from random import randint
  11. from threading import Thread
  12. from threading import Lock
  13. import redis
  14. import json
  15. import dns.resolver
  16. import dns.exception
  17. import uuid
  18. from modules.Logger import Logger
  19. from modules.IPTables import IPTables
  20. from modules.NFTables import NFTables
  21. # globals
  22. WHITELIST = []
  23. BLACKLIST= []
  24. bans = {}
  25. quit_now = False
  26. exit_code = 0
  27. lock = Lock()
  28. chain_name = "MAILCOW"
  29. r = None
  30. pubsub = None
  31. clear_before_quit = False
  32. def refreshF2boptions():
  33. global f2boptions
  34. global quit_now
  35. global exit_code
  36. f2boptions = {}
  37. if not r.get('F2B_OPTIONS'):
  38. f2boptions['ban_time'] = r.get('F2B_BAN_TIME')
  39. f2boptions['max_ban_time'] = r.get('F2B_MAX_BAN_TIME')
  40. f2boptions['ban_time_increment'] = r.get('F2B_BAN_TIME_INCREMENT')
  41. f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS')
  42. f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW')
  43. f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4')
  44. f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6')
  45. else:
  46. try:
  47. f2boptions = json.loads(r.get('F2B_OPTIONS'))
  48. except ValueError:
  49. logger.logCrit('Error loading F2B options: F2B_OPTIONS is not json')
  50. quit_now = True
  51. exit_code = 2
  52. verifyF2boptions(f2boptions)
  53. r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
  54. def verifyF2boptions(f2boptions):
  55. verifyF2boption(f2boptions,'ban_time', 1800)
  56. verifyF2boption(f2boptions,'max_ban_time', 10000)
  57. verifyF2boption(f2boptions,'ban_time_increment', True)
  58. verifyF2boption(f2boptions,'max_attempts', 10)
  59. verifyF2boption(f2boptions,'retry_window', 600)
  60. verifyF2boption(f2boptions,'netban_ipv4', 32)
  61. verifyF2boption(f2boptions,'netban_ipv6', 128)
  62. verifyF2boption(f2boptions,'banlist_id', str(uuid.uuid4()))
  63. verifyF2boption(f2boptions,'manage_external', 0)
  64. def verifyF2boption(f2boptions, f2boption, f2bdefault):
  65. f2boptions[f2boption] = f2boptions[f2boption] if f2boption in f2boptions and f2boptions[f2boption] is not None else f2bdefault
  66. def refreshF2bregex():
  67. global f2bregex
  68. global quit_now
  69. global exit_code
  70. if not r.get('F2B_REGEX'):
  71. f2bregex = {}
  72. f2bregex[1] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
  73. f2bregex[2] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
  74. f2bregex[3] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+'
  75. f2bregex[4] = 'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+'
  76. f2bregex[5] = 'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
  77. f2bregex[6] = '-login: Disconnected.+ \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
  78. f2bregex[7] = '-login: Aborted login.+ \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  79. f2bregex[8] = '-login: Aborted login.+ \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  80. f2bregex[9] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
  81. f2bregex[10] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
  82. r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
  83. else:
  84. try:
  85. f2bregex = {}
  86. f2bregex = json.loads(r.get('F2B_REGEX'))
  87. except ValueError:
  88. logger.logCrit('Error loading F2B options: F2B_REGEX is not json')
  89. quit_now = True
  90. exit_code = 2
  91. def get_ip(address):
  92. ip = ipaddress.ip_address(address)
  93. if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
  94. ip = ip.ipv4_mapped
  95. if ip.is_private or ip.is_loopback:
  96. return False
  97. return ip
  98. def ban(address):
  99. global f2boptions
  100. global lock
  101. refreshF2boptions()
  102. BAN_TIME = int(f2boptions['ban_time'])
  103. BAN_TIME_INCREMENT = bool(f2boptions['ban_time_increment'])
  104. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  105. RETRY_WINDOW = int(f2boptions['retry_window'])
  106. NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
  107. NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
  108. ip = get_ip(address)
  109. if not ip: return
  110. address = str(ip)
  111. self_network = ipaddress.ip_network(address)
  112. with lock:
  113. temp_whitelist = set(WHITELIST)
  114. if temp_whitelist:
  115. for wl_key in temp_whitelist:
  116. wl_net = ipaddress.ip_network(wl_key, False)
  117. if wl_net.overlaps(self_network):
  118. logger.logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
  119. return
  120. net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
  121. net = str(net)
  122. if not net in bans:
  123. bans[net] = {'attempts': 0, 'last_attempt': 0, 'ban_counter': 0}
  124. current_attempt = time.time()
  125. if current_attempt - bans[net]['last_attempt'] > RETRY_WINDOW:
  126. bans[net]['attempts'] = 0
  127. bans[net]['attempts'] += 1
  128. bans[net]['last_attempt'] = current_attempt
  129. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  130. cur_time = int(round(time.time()))
  131. NET_BAN_TIME = BAN_TIME if not BAN_TIME_INCREMENT else BAN_TIME * 2 ** bans[net]['ban_counter']
  132. logger.logCrit('Banning %s for %d minutes' % (net, NET_BAN_TIME / 60 ))
  133. if type(ip) is ipaddress.IPv4Address and int(f2boptions['manage_external']) != 1:
  134. with lock:
  135. tables.banIPv4(net)
  136. elif int(f2boptions['manage_external']) != 1:
  137. with lock:
  138. tables.banIPv6(net)
  139. r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + NET_BAN_TIME)
  140. else:
  141. logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
  142. def unban(net):
  143. global lock
  144. if not net in bans:
  145. logger.logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
  146. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  147. return
  148. logger.logInfo('Unbanning %s' % net)
  149. if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
  150. with lock:
  151. tables.unbanIPv4(net)
  152. else:
  153. with lock:
  154. tables.unbanIPv6(net)
  155. r.hdel('F2B_ACTIVE_BANS', '%s' % net)
  156. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  157. if net in bans:
  158. bans[net]['attempts'] = 0
  159. bans[net]['ban_counter'] += 1
  160. def permBan(net, unban=False):
  161. global f2boptions
  162. global lock
  163. is_unbanned = False
  164. is_banned = False
  165. if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
  166. with lock:
  167. if unban:
  168. is_unbanned = tables.unbanIPv4(net)
  169. elif int(f2boptions['manage_external']) != 1:
  170. is_banned = tables.banIPv4(net)
  171. else:
  172. with lock:
  173. if unban:
  174. is_unbanned = tables.unbanIPv6(net)
  175. elif int(f2boptions['manage_external']) != 1:
  176. is_banned = tables.banIPv6(net)
  177. if is_unbanned:
  178. r.hdel('F2B_PERM_BANS', '%s' % net)
  179. logger.logCrit('Removed host/network %s from blacklist' % net)
  180. elif is_banned:
  181. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  182. logger.logCrit('Added host/network %s to blacklist' % net)
  183. def clear():
  184. global lock
  185. logger.logInfo('Clearing all bans')
  186. for net in bans.copy():
  187. unban(net)
  188. with lock:
  189. tables.clearIPv4Table()
  190. tables.clearIPv6Table()
  191. try:
  192. if r is not None:
  193. r.delete('F2B_ACTIVE_BANS')
  194. r.delete('F2B_PERM_BANS')
  195. except Exception as ex:
  196. logger.logWarn('Error clearing redis keys F2B_ACTIVE_BANS and F2B_PERM_BANS: %s' % ex)
  197. def watch():
  198. global pubsub
  199. global quit_now
  200. global exit_code
  201. logger.logInfo('Watching Redis channel F2B_CHANNEL')
  202. pubsub.subscribe('F2B_CHANNEL')
  203. while not quit_now:
  204. try:
  205. for item in pubsub.listen():
  206. refreshF2bregex()
  207. for rule_id, rule_regex in f2bregex.items():
  208. if item['data'] and item['type'] == 'message':
  209. try:
  210. result = re.search(rule_regex, item['data'])
  211. except re.error:
  212. result = False
  213. if result:
  214. addr = result.group(1)
  215. ip = ipaddress.ip_address(addr)
  216. if ip.is_private or ip.is_loopback:
  217. continue
  218. logger.logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
  219. ban(addr)
  220. except Exception as ex:
  221. logger.logWarn('Error reading log line from pubsub: %s' % ex)
  222. pubsub = None
  223. quit_now = True
  224. exit_code = 2
  225. def snat4(snat_target):
  226. global lock
  227. global quit_now
  228. while not quit_now:
  229. time.sleep(10)
  230. with lock:
  231. tables.snat4(snat_target, os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24')
  232. def snat6(snat_target):
  233. global lock
  234. global quit_now
  235. while not quit_now:
  236. time.sleep(10)
  237. with lock:
  238. tables.snat6(snat_target, os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64'))
  239. def autopurge():
  240. while not quit_now:
  241. time.sleep(10)
  242. refreshF2boptions()
  243. BAN_TIME = int(f2boptions['ban_time'])
  244. MAX_BAN_TIME = int(f2boptions['max_ban_time'])
  245. BAN_TIME_INCREMENT = bool(f2boptions['ban_time_increment'])
  246. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  247. QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
  248. if QUEUE_UNBAN:
  249. for net in QUEUE_UNBAN:
  250. unban(str(net))
  251. for net in bans.copy():
  252. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  253. NET_BAN_TIME = BAN_TIME if not BAN_TIME_INCREMENT else BAN_TIME * 2 ** bans[net]['ban_counter']
  254. TIME_SINCE_LAST_ATTEMPT = time.time() - bans[net]['last_attempt']
  255. if TIME_SINCE_LAST_ATTEMPT > NET_BAN_TIME or TIME_SINCE_LAST_ATTEMPT > MAX_BAN_TIME:
  256. unban(net)
  257. def mailcowChainOrder():
  258. global lock
  259. global quit_now
  260. global exit_code
  261. while not quit_now:
  262. time.sleep(10)
  263. with lock:
  264. quit_now, exit_code = tables.checkIPv4ChainOrder()
  265. if quit_now: return
  266. quit_now, exit_code = tables.checkIPv6ChainOrder()
  267. def isIpNetwork(address):
  268. try:
  269. ipaddress.ip_network(address, False)
  270. except ValueError:
  271. return False
  272. return True
  273. def genNetworkList(list):
  274. resolver = dns.resolver.Resolver()
  275. hostnames = []
  276. networks = []
  277. for key in list:
  278. if isIpNetwork(key):
  279. networks.append(key)
  280. else:
  281. hostnames.append(key)
  282. for hostname in hostnames:
  283. hostname_ips = []
  284. for rdtype in ['A', 'AAAA']:
  285. try:
  286. answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
  287. except dns.exception.Timeout:
  288. logger.logInfo('Hostname %s timedout on resolve' % hostname)
  289. break
  290. except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
  291. continue
  292. except dns.exception.DNSException as dnsexception:
  293. logger.logInfo('%s' % dnsexception)
  294. continue
  295. for rdata in answer:
  296. hostname_ips.append(rdata.to_text())
  297. networks.extend(hostname_ips)
  298. return set(networks)
  299. def whitelistUpdate():
  300. global lock
  301. global quit_now
  302. global WHITELIST
  303. while not quit_now:
  304. start_time = time.time()
  305. list = r.hgetall('F2B_WHITELIST')
  306. new_whitelist = []
  307. if list:
  308. new_whitelist = genNetworkList(list)
  309. with lock:
  310. if Counter(new_whitelist) != Counter(WHITELIST):
  311. WHITELIST = new_whitelist
  312. logger.logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
  313. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  314. def blacklistUpdate():
  315. global quit_now
  316. global BLACKLIST
  317. while not quit_now:
  318. start_time = time.time()
  319. list = r.hgetall('F2B_BLACKLIST')
  320. new_blacklist = []
  321. if list:
  322. new_blacklist = genNetworkList(list)
  323. if Counter(new_blacklist) != Counter(BLACKLIST):
  324. addban = set(new_blacklist).difference(BLACKLIST)
  325. delban = set(BLACKLIST).difference(new_blacklist)
  326. BLACKLIST = new_blacklist
  327. logger.logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
  328. if addban:
  329. for net in addban:
  330. permBan(net=net)
  331. if delban:
  332. for net in delban:
  333. permBan(net=net, unban=True)
  334. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  335. def sigterm_quit(signum, frame):
  336. global clear_before_quit
  337. clear_before_quit = True
  338. sys.exit(exit_code)
  339. def berfore_quit():
  340. if clear_before_quit:
  341. clear()
  342. if pubsub is not None:
  343. pubsub.unsubscribe()
  344. if __name__ == '__main__':
  345. atexit.register(berfore_quit)
  346. signal.signal(signal.SIGTERM, sigterm_quit)
  347. # init Logger
  348. logger = Logger()
  349. # init backend
  350. backend = sys.argv[1]
  351. if backend == "nftables":
  352. logger.logInfo('Using NFTables backend')
  353. tables = NFTables(chain_name, logger)
  354. else:
  355. logger.logInfo('Using IPTables backend')
  356. tables = IPTables(chain_name, logger)
  357. # In case a previous session was killed without cleanup
  358. clear()
  359. # Reinit MAILCOW chain
  360. # Is called before threads start, no locking
  361. logger.logInfo("Initializing mailcow netfilter chain")
  362. tables.initChainIPv4()
  363. tables.initChainIPv6()
  364. if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE").lower() in ("y", "yes"):
  365. logger.logInfo(f"Skipping {chain_name} isolation")
  366. else:
  367. logger.logInfo(f"Setting {chain_name} isolation")
  368. tables.create_mailcow_isolation_rule("br-mailcow", [3306, 6379, 8983, 12345], os.getenv("MAILCOW_REPLICA_IP"))
  369. # connect to redis
  370. while True:
  371. try:
  372. redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
  373. redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
  374. if "".__eq__(redis_slaveof_ip):
  375. r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0)
  376. else:
  377. r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0)
  378. r.ping()
  379. pubsub = r.pubsub()
  380. except Exception as ex:
  381. print('%s - trying again in 3 seconds' % (ex))
  382. time.sleep(3)
  383. else:
  384. break
  385. logger.set_redis(r)
  386. # rename fail2ban to netfilter
  387. if r.exists('F2B_LOG'):
  388. r.rename('F2B_LOG', 'NETFILTER_LOG')
  389. # clear bans in redis
  390. r.delete('F2B_ACTIVE_BANS')
  391. r.delete('F2B_PERM_BANS')
  392. refreshF2boptions()
  393. watch_thread = Thread(target=watch)
  394. watch_thread.daemon = True
  395. watch_thread.start()
  396. if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
  397. try:
  398. snat_ip = os.getenv('SNAT_TO_SOURCE')
  399. snat_ipo = ipaddress.ip_address(snat_ip)
  400. if type(snat_ipo) is ipaddress.IPv4Address:
  401. snat4_thread = Thread(target=snat4,args=(snat_ip,))
  402. snat4_thread.daemon = True
  403. snat4_thread.start()
  404. except ValueError:
  405. print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
  406. if os.getenv('SNAT6_TO_SOURCE') and os.getenv('SNAT6_TO_SOURCE') != 'n':
  407. try:
  408. snat_ip = os.getenv('SNAT6_TO_SOURCE')
  409. snat_ipo = ipaddress.ip_address(snat_ip)
  410. if type(snat_ipo) is ipaddress.IPv6Address:
  411. snat6_thread = Thread(target=snat6,args=(snat_ip,))
  412. snat6_thread.daemon = True
  413. snat6_thread.start()
  414. except ValueError:
  415. print(os.getenv('SNAT6_TO_SOURCE') + ' is not a valid IPv6 address')
  416. autopurge_thread = Thread(target=autopurge)
  417. autopurge_thread.daemon = True
  418. autopurge_thread.start()
  419. mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
  420. mailcowchainwatch_thread.daemon = True
  421. mailcowchainwatch_thread.start()
  422. blacklistupdate_thread = Thread(target=blacklistUpdate)
  423. blacklistupdate_thread.daemon = True
  424. blacklistupdate_thread.start()
  425. whitelistupdate_thread = Thread(target=whitelistUpdate)
  426. whitelistupdate_thread.daemon = True
  427. whitelistupdate_thread.start()
  428. while not quit_now:
  429. time.sleep(0.5)
  430. sys.exit(exit_code)