main.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. #!/usr/bin/env python3
  2. import re
  3. import os
  4. import sys
  5. import time
  6. import atexit
  7. import signal
  8. import ipaddress
  9. from collections import Counter
  10. from random import randint
  11. from threading import Thread
  12. from threading import Lock
  13. import redis
  14. import json
  15. import dns.resolver
  16. import dns.exception
  17. import uuid
  18. from modules.Logger import Logger
  19. from modules.IPTables import IPTables
  20. from modules.NFTables import NFTables
  21. # globals
  22. WHITELIST = []
  23. BLACKLIST= []
  24. bans = {}
  25. quit_now = False
  26. exit_code = 0
  27. lock = Lock()
  28. chain_name = "MAILCOW"
  29. r = None
  30. pubsub = None
  31. clear_before_quit = False
  32. def refreshF2boptions():
  33. global f2boptions
  34. global quit_now
  35. global exit_code
  36. f2boptions = {}
  37. if not r.get('F2B_OPTIONS'):
  38. f2boptions['ban_time'] = r.get('F2B_BAN_TIME')
  39. f2boptions['max_ban_time'] = r.get('F2B_MAX_BAN_TIME')
  40. f2boptions['ban_time_increment'] = r.get('F2B_BAN_TIME_INCREMENT')
  41. f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS')
  42. f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW')
  43. f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4')
  44. f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6')
  45. else:
  46. try:
  47. f2boptions = json.loads(r.get('F2B_OPTIONS'))
  48. except ValueError:
  49. logger.logCrit('Error loading F2B options: F2B_OPTIONS is not json')
  50. quit_now = True
  51. exit_code = 2
  52. verifyF2boptions(f2boptions)
  53. r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
  54. def verifyF2boptions(f2boptions):
  55. verifyF2boption(f2boptions,'ban_time', 1800)
  56. verifyF2boption(f2boptions,'max_ban_time', 10000)
  57. verifyF2boption(f2boptions,'ban_time_increment', True)
  58. verifyF2boption(f2boptions,'max_attempts', 10)
  59. verifyF2boption(f2boptions,'retry_window', 600)
  60. verifyF2boption(f2boptions,'netban_ipv4', 32)
  61. verifyF2boption(f2boptions,'netban_ipv6', 128)
  62. verifyF2boption(f2boptions,'banlist_id', str(uuid.uuid4()))
  63. verifyF2boption(f2boptions,'manage_external', 0)
  64. def verifyF2boption(f2boptions, f2boption, f2bdefault):
  65. f2boptions[f2boption] = f2boptions[f2boption] if f2boption in f2boptions and f2boptions[f2boption] is not None else f2bdefault
  66. def refreshF2bregex():
  67. global f2bregex
  68. global quit_now
  69. global exit_code
  70. if not r.get('F2B_REGEX'):
  71. f2bregex = {}
  72. f2bregex[1] = r'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
  73. f2bregex[2] = r'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
  74. f2bregex[3] = r'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+'
  75. f2bregex[4] = r'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+'
  76. f2bregex[5] = r'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
  77. f2bregex[6] = r'-login: Disconnected.+ \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
  78. f2bregex[7] = r'-login: Aborted login.+ \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  79. f2bregex[8] = r'-login: Aborted login.+ \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  80. f2bregex[9] = r'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
  81. f2bregex[10] = r'([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
  82. r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
  83. else:
  84. try:
  85. f2bregex = {}
  86. f2bregex = json.loads(r.get('F2B_REGEX'))
  87. except ValueError:
  88. logger.logCrit('Error loading F2B options: F2B_REGEX is not json')
  89. quit_now = True
  90. exit_code = 2
  91. def get_ip(address):
  92. ip = ipaddress.ip_address(address)
  93. if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
  94. ip = ip.ipv4_mapped
  95. if ip.is_private or ip.is_loopback:
  96. return False
  97. return ip
  98. def ban(address):
  99. global f2boptions
  100. global lock
  101. refreshF2boptions()
  102. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  103. RETRY_WINDOW = int(f2boptions['retry_window'])
  104. NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
  105. NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
  106. ip = get_ip(address)
  107. if not ip: return
  108. address = str(ip)
  109. self_network = ipaddress.ip_network(address)
  110. with lock:
  111. temp_whitelist = set(WHITELIST)
  112. if temp_whitelist:
  113. for wl_key in temp_whitelist:
  114. wl_net = ipaddress.ip_network(wl_key, False)
  115. if wl_net.overlaps(self_network):
  116. logger.logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
  117. return
  118. net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
  119. net = str(net)
  120. if not net in bans:
  121. bans[net] = {'attempts': 0, 'last_attempt': 0, 'ban_counter': 0}
  122. current_attempt = time.time()
  123. if current_attempt - bans[net]['last_attempt'] > RETRY_WINDOW:
  124. bans[net]['attempts'] = 0
  125. bans[net]['attempts'] += 1
  126. bans[net]['last_attempt'] = current_attempt
  127. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  128. cur_time = int(round(time.time()))
  129. NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
  130. logger.logCrit('Banning %s for %d minutes' % (net, NET_BAN_TIME / 60 ))
  131. if type(ip) is ipaddress.IPv4Address and int(f2boptions['manage_external']) != 1:
  132. with lock:
  133. tables.banIPv4(net)
  134. elif int(f2boptions['manage_external']) != 1:
  135. with lock:
  136. tables.banIPv6(net)
  137. r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + NET_BAN_TIME)
  138. else:
  139. logger.logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
  140. def unban(net):
  141. global lock
  142. if not net in bans:
  143. logger.logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
  144. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  145. return
  146. logger.logInfo('Unbanning %s' % net)
  147. if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
  148. with lock:
  149. tables.unbanIPv4(net)
  150. else:
  151. with lock:
  152. tables.unbanIPv6(net)
  153. r.hdel('F2B_ACTIVE_BANS', '%s' % net)
  154. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  155. if net in bans:
  156. bans[net]['attempts'] = 0
  157. bans[net]['ban_counter'] += 1
  158. def permBan(net, unban=False):
  159. global f2boptions
  160. global lock
  161. is_unbanned = False
  162. is_banned = False
  163. if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
  164. with lock:
  165. if unban:
  166. is_unbanned = tables.unbanIPv4(net)
  167. elif int(f2boptions['manage_external']) != 1:
  168. is_banned = tables.banIPv4(net)
  169. else:
  170. with lock:
  171. if unban:
  172. is_unbanned = tables.unbanIPv6(net)
  173. elif int(f2boptions['manage_external']) != 1:
  174. is_banned = tables.banIPv6(net)
  175. if is_unbanned:
  176. r.hdel('F2B_PERM_BANS', '%s' % net)
  177. logger.logCrit('Removed host/network %s from blacklist' % net)
  178. elif is_banned:
  179. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  180. logger.logCrit('Added host/network %s to blacklist' % net)
  181. def clear():
  182. global lock
  183. logger.logInfo('Clearing all bans')
  184. for net in bans.copy():
  185. unban(net)
  186. with lock:
  187. tables.clearIPv4Table()
  188. tables.clearIPv6Table()
  189. try:
  190. if r is not None:
  191. r.delete('F2B_ACTIVE_BANS')
  192. r.delete('F2B_PERM_BANS')
  193. except Exception as ex:
  194. logger.logWarn('Error clearing redis keys F2B_ACTIVE_BANS and F2B_PERM_BANS: %s' % ex)
  195. def watch():
  196. global pubsub
  197. global quit_now
  198. global exit_code
  199. logger.logInfo('Watching Redis channel F2B_CHANNEL')
  200. pubsub.subscribe('F2B_CHANNEL')
  201. while not quit_now:
  202. try:
  203. for item in pubsub.listen():
  204. refreshF2bregex()
  205. for rule_id, rule_regex in f2bregex.items():
  206. if item['data'] and item['type'] == 'message':
  207. try:
  208. result = re.search(rule_regex, item['data'])
  209. except re.error:
  210. result = False
  211. if result:
  212. addr = result.group(1)
  213. ip = ipaddress.ip_address(addr)
  214. if ip.is_private or ip.is_loopback:
  215. continue
  216. logger.logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
  217. ban(addr)
  218. except Exception as ex:
  219. logger.logWarn('Error reading log line from pubsub: %s' % ex)
  220. pubsub = None
  221. quit_now = True
  222. exit_code = 2
  223. def snat4(snat_target):
  224. global lock
  225. global quit_now
  226. while not quit_now:
  227. time.sleep(10)
  228. with lock:
  229. tables.snat4(snat_target, os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24')
  230. def snat6(snat_target):
  231. global lock
  232. global quit_now
  233. while not quit_now:
  234. time.sleep(10)
  235. with lock:
  236. tables.snat6(snat_target, os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64'))
  237. def autopurge():
  238. global f2boptions
  239. while not quit_now:
  240. time.sleep(10)
  241. refreshF2boptions()
  242. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  243. QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
  244. if QUEUE_UNBAN:
  245. for net in QUEUE_UNBAN:
  246. unban(str(net))
  247. for net in bans.copy():
  248. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  249. NET_BAN_TIME = calcNetBanTime(bans[net]['ban_counter'])
  250. TIME_SINCE_LAST_ATTEMPT = time.time() - bans[net]['last_attempt']
  251. if TIME_SINCE_LAST_ATTEMPT > NET_BAN_TIME:
  252. unban(net)
  253. def mailcowChainOrder():
  254. global lock
  255. global quit_now
  256. global exit_code
  257. while not quit_now:
  258. time.sleep(10)
  259. with lock:
  260. quit_now, exit_code = tables.checkIPv4ChainOrder()
  261. if quit_now: return
  262. quit_now, exit_code = tables.checkIPv6ChainOrder()
  263. def calcNetBanTime(ban_counter):
  264. global f2boptions
  265. BAN_TIME = int(f2boptions['ban_time'])
  266. MAX_BAN_TIME = int(f2boptions['max_ban_time'])
  267. BAN_TIME_INCREMENT = bool(f2boptions['ban_time_increment'])
  268. NET_BAN_TIME = BAN_TIME if not BAN_TIME_INCREMENT else BAN_TIME * 2 ** ban_counter
  269. NET_BAN_TIME = max([BAN_TIME, min([NET_BAN_TIME, MAX_BAN_TIME])])
  270. return NET_BAN_TIME
  271. def isIpNetwork(address):
  272. try:
  273. ipaddress.ip_network(address, False)
  274. except ValueError:
  275. return False
  276. return True
  277. def genNetworkList(list):
  278. resolver = dns.resolver.Resolver()
  279. hostnames = []
  280. networks = []
  281. for key in list:
  282. if isIpNetwork(key):
  283. networks.append(key)
  284. else:
  285. hostnames.append(key)
  286. for hostname in hostnames:
  287. hostname_ips = []
  288. for rdtype in ['A', 'AAAA']:
  289. try:
  290. answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
  291. except dns.exception.Timeout:
  292. logger.logInfo('Hostname %s timedout on resolve' % hostname)
  293. break
  294. except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
  295. continue
  296. except dns.exception.DNSException as dnsexception:
  297. logger.logInfo('%s' % dnsexception)
  298. continue
  299. for rdata in answer:
  300. hostname_ips.append(rdata.to_text())
  301. networks.extend(hostname_ips)
  302. return set(networks)
  303. def whitelistUpdate():
  304. global lock
  305. global quit_now
  306. global WHITELIST
  307. while not quit_now:
  308. start_time = time.time()
  309. list = r.hgetall('F2B_WHITELIST')
  310. new_whitelist = []
  311. if list:
  312. new_whitelist = genNetworkList(list)
  313. with lock:
  314. if Counter(new_whitelist) != Counter(WHITELIST):
  315. WHITELIST = new_whitelist
  316. logger.logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
  317. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  318. def blacklistUpdate():
  319. global quit_now
  320. global BLACKLIST
  321. while not quit_now:
  322. start_time = time.time()
  323. list = r.hgetall('F2B_BLACKLIST')
  324. new_blacklist = []
  325. if list:
  326. new_blacklist = genNetworkList(list)
  327. if Counter(new_blacklist) != Counter(BLACKLIST):
  328. addban = set(new_blacklist).difference(BLACKLIST)
  329. delban = set(BLACKLIST).difference(new_blacklist)
  330. BLACKLIST = new_blacklist
  331. logger.logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
  332. if addban:
  333. for net in addban:
  334. permBan(net=net)
  335. if delban:
  336. for net in delban:
  337. permBan(net=net, unban=True)
  338. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  339. def sigterm_quit(signum, frame):
  340. global clear_before_quit
  341. clear_before_quit = True
  342. sys.exit(exit_code)
  343. def berfore_quit():
  344. if clear_before_quit:
  345. clear()
  346. if pubsub is not None:
  347. pubsub.unsubscribe()
  348. if __name__ == '__main__':
  349. atexit.register(berfore_quit)
  350. signal.signal(signal.SIGTERM, sigterm_quit)
  351. # init Logger
  352. logger = Logger()
  353. # init backend
  354. backend = sys.argv[1]
  355. if backend == "nftables":
  356. logger.logInfo('Using NFTables backend')
  357. tables = NFTables(chain_name, logger)
  358. else:
  359. logger.logInfo('Using IPTables backend')
  360. tables = IPTables(chain_name, logger)
  361. # In case a previous session was killed without cleanup
  362. clear()
  363. # Reinit MAILCOW chain
  364. # Is called before threads start, no locking
  365. logger.logInfo("Initializing mailcow netfilter chain")
  366. tables.initChainIPv4()
  367. tables.initChainIPv6()
  368. if os.getenv("DISABLE_NETFILTER_ISOLATION_RULE").lower() in ("y", "yes"):
  369. logger.logInfo(f"Skipping {chain_name} isolation")
  370. else:
  371. logger.logInfo(f"Setting {chain_name} isolation")
  372. tables.create_mailcow_isolation_rule("br-mailcow", [3306, 6379, 8983, 12345], os.getenv("MAILCOW_REPLICA_IP"))
  373. # connect to redis
  374. while True:
  375. try:
  376. redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
  377. redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
  378. if "".__eq__(redis_slaveof_ip):
  379. r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0, password=os.environ['REDISPASS'])
  380. else:
  381. r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0, password=os.environ['REDISPASS'])
  382. r.ping()
  383. pubsub = r.pubsub()
  384. except Exception as ex:
  385. print('%s - trying again in 3 seconds' % (ex))
  386. time.sleep(3)
  387. else:
  388. break
  389. logger.set_redis(r)
  390. # rename fail2ban to netfilter
  391. if r.exists('F2B_LOG'):
  392. r.rename('F2B_LOG', 'NETFILTER_LOG')
  393. # clear bans in redis
  394. r.delete('F2B_ACTIVE_BANS')
  395. r.delete('F2B_PERM_BANS')
  396. refreshF2boptions()
  397. watch_thread = Thread(target=watch)
  398. watch_thread.daemon = True
  399. watch_thread.start()
  400. if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
  401. try:
  402. snat_ip = os.getenv('SNAT_TO_SOURCE')
  403. snat_ipo = ipaddress.ip_address(snat_ip)
  404. if type(snat_ipo) is ipaddress.IPv4Address:
  405. snat4_thread = Thread(target=snat4,args=(snat_ip,))
  406. snat4_thread.daemon = True
  407. snat4_thread.start()
  408. except ValueError:
  409. print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
  410. if os.getenv('SNAT6_TO_SOURCE') and os.getenv('SNAT6_TO_SOURCE') != 'n':
  411. try:
  412. snat_ip = os.getenv('SNAT6_TO_SOURCE')
  413. snat_ipo = ipaddress.ip_address(snat_ip)
  414. if type(snat_ipo) is ipaddress.IPv6Address:
  415. snat6_thread = Thread(target=snat6,args=(snat_ip,))
  416. snat6_thread.daemon = True
  417. snat6_thread.start()
  418. except ValueError:
  419. print(os.getenv('SNAT6_TO_SOURCE') + ' is not a valid IPv6 address')
  420. autopurge_thread = Thread(target=autopurge)
  421. autopurge_thread.daemon = True
  422. autopurge_thread.start()
  423. mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
  424. mailcowchainwatch_thread.daemon = True
  425. mailcowchainwatch_thread.start()
  426. blacklistupdate_thread = Thread(target=blacklistUpdate)
  427. blacklistupdate_thread.daemon = True
  428. blacklistupdate_thread.start()
  429. whitelistupdate_thread = Thread(target=whitelistUpdate)
  430. whitelistupdate_thread.daemon = True
  431. whitelistupdate_thread.start()
  432. while not quit_now:
  433. time.sleep(0.5)
  434. sys.exit(exit_code)