server.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. #!/usr/bin/env python3
  2. import re
  3. import os
  4. import time
  5. import atexit
  6. import signal
  7. import ipaddress
  8. from collections import Counter
  9. from random import randint
  10. from threading import Thread
  11. from threading import Lock
  12. import redis
  13. import json
  14. import iptc
  15. import dns.resolver
  16. import dns.exception
  17. while True:
  18. try:
  19. redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
  20. redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
  21. if "".__eq__(redis_slaveof_ip):
  22. r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0)
  23. else:
  24. r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0)
  25. r.ping()
  26. except Exception as ex:
  27. print('%s - trying again in 3 seconds' % (ex))
  28. time.sleep(3)
  29. else:
  30. break
  31. pubsub = r.pubsub()
  32. WHITELIST = []
  33. BLACKLIST= []
  34. bans = {}
  35. quit_now = False
  36. lock = Lock()
  37. def log(priority, message):
  38. tolog = {}
  39. tolog['time'] = int(round(time.time()))
  40. tolog['priority'] = priority
  41. tolog['message'] = message
  42. r.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False))
  43. print(message)
  44. def logWarn(message):
  45. log('warn', message)
  46. def logCrit(message):
  47. log('crit', message)
  48. def logInfo(message):
  49. log('info', message)
  50. def refreshF2boptions():
  51. global f2boptions
  52. global quit_now
  53. if not r.get('F2B_OPTIONS'):
  54. f2boptions = {}
  55. f2boptions['ban_time'] = int
  56. f2boptions['max_attempts'] = int
  57. f2boptions['retry_window'] = int
  58. f2boptions['netban_ipv4'] = int
  59. f2boptions['netban_ipv6'] = int
  60. f2boptions['ban_time'] = r.get('F2B_BAN_TIME') or 1800
  61. f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS') or 10
  62. f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW') or 600
  63. f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4') or 32
  64. f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6') or 128
  65. r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
  66. else:
  67. try:
  68. f2boptions = {}
  69. f2boptions = json.loads(r.get('F2B_OPTIONS'))
  70. except ValueError:
  71. print('Error loading F2B options: F2B_OPTIONS is not json')
  72. quit_now = True
  73. def refreshF2bregex():
  74. global f2bregex
  75. global quit_now
  76. if not r.get('F2B_REGEX'):
  77. f2bregex = {}
  78. f2bregex[1] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed'
  79. f2bregex[2] = '-login: Disconnected \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
  80. f2bregex[3] = '-login: Aborted login \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  81. f2bregex[4] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
  82. f2bregex[5] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
  83. f2bregex[6] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
  84. f2bregex[7] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
  85. f2bregex[8] = '-login: Aborted login \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
  86. r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
  87. else:
  88. try:
  89. f2bregex = {}
  90. f2bregex = json.loads(r.get('F2B_REGEX'))
  91. except ValueError:
  92. print('Error loading F2B options: F2B_REGEX is not json')
  93. quit_now = True
  94. if r.exists('F2B_LOG'):
  95. r.rename('F2B_LOG', 'NETFILTER_LOG')
  96. def mailcowChainOrder():
  97. global lock
  98. global quit_now
  99. while not quit_now:
  100. time.sleep(10)
  101. with lock:
  102. filter4_table = iptc.Table(iptc.Table.FILTER)
  103. filter6_table = iptc.Table6(iptc.Table6.FILTER)
  104. filter4_table.refresh()
  105. filter6_table.refresh()
  106. for f in [filter4_table, filter6_table]:
  107. forward_chain = iptc.Chain(f, 'FORWARD')
  108. input_chain = iptc.Chain(f, 'INPUT')
  109. for chain in [forward_chain, input_chain]:
  110. target_found = False
  111. for position, item in enumerate(chain.rules):
  112. if item.target.name == 'MAILCOW':
  113. target_found = True
  114. if position > 2:
  115. logCrit('Error in %s chain order: MAILCOW on position %d, restarting container' % (chain.name, position))
  116. quit_now = True
  117. if not target_found:
  118. logCrit('Error in %s chain: MAILCOW target not found, restarting container' % (chain.name))
  119. quit_now = True
  120. def ban(address):
  121. global lock
  122. refreshF2boptions()
  123. BAN_TIME = int(f2boptions['ban_time'])
  124. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  125. RETRY_WINDOW = int(f2boptions['retry_window'])
  126. NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
  127. NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
  128. ip = ipaddress.ip_address(address)
  129. if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
  130. ip = ip.ipv4_mapped
  131. address = str(ip)
  132. if ip.is_private or ip.is_loopback:
  133. return
  134. self_network = ipaddress.ip_network(address)
  135. with lock:
  136. temp_whitelist = set(WHITELIST)
  137. if temp_whitelist:
  138. for wl_key in temp_whitelist:
  139. wl_net = ipaddress.ip_network(wl_key, False)
  140. if wl_net.overlaps(self_network):
  141. logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
  142. return
  143. net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
  144. net = str(net)
  145. if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
  146. bans[net] = { 'attempts': 0 }
  147. active_window = RETRY_WINDOW
  148. else:
  149. active_window = time.time() - bans[net]['last_attempt']
  150. bans[net]['attempts'] += 1
  151. bans[net]['last_attempt'] = time.time()
  152. active_window = time.time() - bans[net]['last_attempt']
  153. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  154. cur_time = int(round(time.time()))
  155. logCrit('Banning %s for %d minutes' % (net, BAN_TIME / 60))
  156. if type(ip) is ipaddress.IPv4Address:
  157. with lock:
  158. chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
  159. rule = iptc.Rule()
  160. rule.src = net
  161. target = iptc.Target(rule, "REJECT")
  162. rule.target = target
  163. if rule not in chain.rules:
  164. chain.insert_rule(rule)
  165. else:
  166. with lock:
  167. chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER), 'MAILCOW')
  168. rule = iptc.Rule6()
  169. rule.src = net
  170. target = iptc.Target(rule, "REJECT")
  171. rule.target = target
  172. if rule not in chain.rules:
  173. chain.insert_rule(rule)
  174. r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + BAN_TIME)
  175. else:
  176. logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
  177. def unban(net):
  178. global lock
  179. if not net in bans:
  180. logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
  181. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  182. return
  183. logInfo('Unbanning %s' % net)
  184. if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
  185. with lock:
  186. chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
  187. rule = iptc.Rule()
  188. rule.src = net
  189. target = iptc.Target(rule, "REJECT")
  190. rule.target = target
  191. if rule in chain.rules:
  192. chain.delete_rule(rule)
  193. else:
  194. with lock:
  195. chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER), 'MAILCOW')
  196. rule = iptc.Rule6()
  197. rule.src = net
  198. target = iptc.Target(rule, "REJECT")
  199. rule.target = target
  200. if rule in chain.rules:
  201. chain.delete_rule(rule)
  202. r.hdel('F2B_ACTIVE_BANS', '%s' % net)
  203. r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
  204. if net in bans:
  205. del bans[net]
  206. def permBan(net, unban=False):
  207. global lock
  208. if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
  209. with lock:
  210. chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
  211. rule = iptc.Rule()
  212. rule.src = net
  213. target = iptc.Target(rule, "REJECT")
  214. rule.target = target
  215. if rule not in chain.rules and not unban:
  216. logCrit('Add host/network %s to blacklist' % net)
  217. chain.insert_rule(rule)
  218. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  219. elif rule in chain.rules and unban:
  220. logCrit('Remove host/network %s from blacklist' % net)
  221. chain.delete_rule(rule)
  222. r.hdel('F2B_PERM_BANS', '%s' % net)
  223. else:
  224. with lock:
  225. chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER), 'MAILCOW')
  226. rule = iptc.Rule6()
  227. rule.src = net
  228. target = iptc.Target(rule, "REJECT")
  229. rule.target = target
  230. if rule not in chain.rules and not unban:
  231. logCrit('Add host/network %s to blacklist' % net)
  232. chain.insert_rule(rule)
  233. r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
  234. elif rule in chain.rules and unban:
  235. logCrit('Remove host/network %s from blacklist' % net)
  236. chain.delete_rule(rule)
  237. r.hdel('F2B_PERM_BANS', '%s' % net)
  238. def quit(signum, frame):
  239. global quit_now
  240. quit_now = True
  241. def clear():
  242. global lock
  243. logInfo('Clearing all bans')
  244. for net in bans.copy():
  245. unban(net)
  246. with lock:
  247. filter4_table = iptc.Table(iptc.Table.FILTER)
  248. filter6_table = iptc.Table6(iptc.Table6.FILTER)
  249. for filter_table in [filter4_table, filter6_table]:
  250. filter_table.autocommit = False
  251. forward_chain = iptc.Chain(filter_table, "FORWARD")
  252. input_chain = iptc.Chain(filter_table, "INPUT")
  253. mailcow_chain = iptc.Chain(filter_table, "MAILCOW")
  254. if mailcow_chain in filter_table.chains:
  255. for rule in mailcow_chain.rules:
  256. mailcow_chain.delete_rule(rule)
  257. for rule in forward_chain.rules:
  258. if rule.target.name == 'MAILCOW':
  259. forward_chain.delete_rule(rule)
  260. for rule in input_chain.rules:
  261. if rule.target.name == 'MAILCOW':
  262. input_chain.delete_rule(rule)
  263. filter_table.delete_chain("MAILCOW")
  264. filter_table.commit()
  265. filter_table.refresh()
  266. filter_table.autocommit = True
  267. r.delete('F2B_ACTIVE_BANS')
  268. r.delete('F2B_PERM_BANS')
  269. pubsub.unsubscribe()
  270. def watch():
  271. logInfo('Watching Redis channel F2B_CHANNEL')
  272. pubsub.subscribe('F2B_CHANNEL')
  273. while not quit_now:
  274. for item in pubsub.listen():
  275. try:
  276. refreshF2bregex()
  277. for rule_id, rule_regex in f2bregex.items():
  278. if item['data'] and item['type'] == 'message':
  279. try:
  280. result = re.search(rule_regex, item['data'])
  281. except re.error:
  282. result = False
  283. if result:
  284. addr = result.group(1)
  285. ip = ipaddress.ip_address(addr)
  286. if ip.is_private or ip.is_loopback:
  287. continue
  288. logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
  289. ban(addr)
  290. except Exception as ex:
  291. logWarn('Could not read logline from pubsub, skipping...')
  292. continue
  293. def snat4(snat_target):
  294. global lock
  295. global quit_now
  296. def get_snat4_rule():
  297. rule = iptc.Rule()
  298. rule.src = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24'
  299. rule.dst = '!' + rule.src
  300. target = rule.create_target("SNAT")
  301. target.to_source = snat_target
  302. return rule
  303. while not quit_now:
  304. time.sleep(10)
  305. with lock:
  306. try:
  307. table = iptc.Table('nat')
  308. table.refresh()
  309. chain = iptc.Chain(table, 'POSTROUTING')
  310. table.autocommit = False
  311. if get_snat4_rule() not in chain.rules:
  312. logCrit('Added POSTROUTING rule for source network %s to SNAT target %s' % (get_snat4_rule().src, snat_target))
  313. chain.insert_rule(get_snat4_rule())
  314. table.commit()
  315. else:
  316. for position, item in enumerate(chain.rules):
  317. if item == get_snat4_rule():
  318. if position != 0:
  319. chain.delete_rule(get_snat4_rule())
  320. table.commit()
  321. table.autocommit = True
  322. except:
  323. print('Error running SNAT4, retrying...')
  324. def snat6(snat_target):
  325. global lock
  326. global quit_now
  327. def get_snat6_rule():
  328. rule = iptc.Rule6()
  329. rule.src = os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64')
  330. rule.dst = '!' + rule.src
  331. target = rule.create_target("SNAT")
  332. target.to_source = snat_target
  333. return rule
  334. while not quit_now:
  335. time.sleep(10)
  336. with lock:
  337. try:
  338. table = iptc.Table6('nat')
  339. table.refresh()
  340. chain = iptc.Chain(table, 'POSTROUTING')
  341. table.autocommit = False
  342. if get_snat6_rule() not in chain.rules:
  343. logInfo('Added POSTROUTING rule for source network %s to SNAT target %s' % (get_snat6_rule().src, snat_target))
  344. chain.insert_rule(get_snat6_rule())
  345. table.commit()
  346. else:
  347. for position, item in enumerate(chain.rules):
  348. if item == get_snat6_rule():
  349. if position != 0:
  350. chain.delete_rule(get_snat6_rule())
  351. table.commit()
  352. table.autocommit = True
  353. except:
  354. print('Error running SNAT6, retrying...')
  355. def autopurge():
  356. while not quit_now:
  357. time.sleep(10)
  358. refreshF2boptions()
  359. BAN_TIME = int(f2boptions['ban_time'])
  360. MAX_ATTEMPTS = int(f2boptions['max_attempts'])
  361. QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
  362. if QUEUE_UNBAN:
  363. for net in QUEUE_UNBAN:
  364. unban(str(net))
  365. for net in bans.copy():
  366. if bans[net]['attempts'] >= MAX_ATTEMPTS:
  367. if time.time() - bans[net]['last_attempt'] > BAN_TIME:
  368. unban(net)
  369. def isIpNetwork(address):
  370. try:
  371. ipaddress.ip_network(address, False)
  372. except ValueError:
  373. return False
  374. return True
  375. def genNetworkList(list):
  376. resolver = dns.resolver.Resolver()
  377. hostnames = []
  378. networks = []
  379. for key in list:
  380. if isIpNetwork(key):
  381. networks.append(key)
  382. else:
  383. hostnames.append(key)
  384. for hostname in hostnames:
  385. hostname_ips = []
  386. for rdtype in ['A', 'AAAA']:
  387. try:
  388. answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
  389. except dns.exception.Timeout:
  390. logInfo('Hostname %s timedout on resolve' % hostname)
  391. break
  392. except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
  393. continue
  394. except dns.exception.DNSException as dnsexception:
  395. logInfo('%s' % dnsexception)
  396. continue
  397. for rdata in answer:
  398. hostname_ips.append(rdata.to_text())
  399. networks.extend(hostname_ips)
  400. return set(networks)
  401. def whitelistUpdate():
  402. global lock
  403. global quit_now
  404. global WHITELIST
  405. while not quit_now:
  406. start_time = time.time()
  407. list = r.hgetall('F2B_WHITELIST')
  408. new_whitelist = []
  409. if list:
  410. new_whitelist = genNetworkList(list)
  411. with lock:
  412. if Counter(new_whitelist) != Counter(WHITELIST):
  413. WHITELIST = new_whitelist
  414. logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
  415. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  416. def blacklistUpdate():
  417. global quit_now
  418. global BLACKLIST
  419. while not quit_now:
  420. start_time = time.time()
  421. list = r.hgetall('F2B_BLACKLIST')
  422. new_blacklist = []
  423. if list:
  424. new_blacklist = genNetworkList(list)
  425. if Counter(new_blacklist) != Counter(BLACKLIST):
  426. addban = set(new_blacklist).difference(BLACKLIST)
  427. delban = set(BLACKLIST).difference(new_blacklist)
  428. BLACKLIST = new_blacklist
  429. logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
  430. if addban:
  431. for net in addban:
  432. permBan(net=net)
  433. if delban:
  434. for net in delban:
  435. permBan(net=net, unban=True)
  436. time.sleep(60.0 - ((time.time() - start_time) % 60.0))
  437. def initChain():
  438. # Is called before threads start, no locking
  439. print("Initializing mailcow netfilter chain")
  440. # IPv4
  441. if not iptc.Chain(iptc.Table(iptc.Table.FILTER), "MAILCOW") in iptc.Table(iptc.Table.FILTER).chains:
  442. iptc.Table(iptc.Table.FILTER).create_chain("MAILCOW")
  443. for c in ['FORWARD', 'INPUT']:
  444. chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), c)
  445. rule = iptc.Rule()
  446. rule.src = '0.0.0.0/0'
  447. rule.dst = '0.0.0.0/0'
  448. target = iptc.Target(rule, "MAILCOW")
  449. rule.target = target
  450. if rule not in chain.rules:
  451. chain.insert_rule(rule)
  452. # IPv6
  453. if not iptc.Chain(iptc.Table6(iptc.Table6.FILTER), "MAILCOW") in iptc.Table6(iptc.Table6.FILTER).chains:
  454. iptc.Table6(iptc.Table6.FILTER).create_chain("MAILCOW")
  455. for c in ['FORWARD', 'INPUT']:
  456. chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER), c)
  457. rule = iptc.Rule6()
  458. rule.src = '::/0'
  459. rule.dst = '::/0'
  460. target = iptc.Target(rule, "MAILCOW")
  461. rule.target = target
  462. if rule not in chain.rules:
  463. chain.insert_rule(rule)
  464. if __name__ == '__main__':
  465. # In case a previous session was killed without cleanup
  466. clear()
  467. # Reinit MAILCOW chain
  468. initChain()
  469. watch_thread = Thread(target=watch)
  470. watch_thread.daemon = True
  471. watch_thread.start()
  472. if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
  473. try:
  474. snat_ip = os.getenv('SNAT_TO_SOURCE')
  475. snat_ipo = ipaddress.ip_address(snat_ip)
  476. if type(snat_ipo) is ipaddress.IPv4Address:
  477. snat4_thread = Thread(target=snat4,args=(snat_ip,))
  478. snat4_thread.daemon = True
  479. snat4_thread.start()
  480. except ValueError:
  481. print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
  482. if os.getenv('SNAT6_TO_SOURCE') and os.getenv('SNAT6_TO_SOURCE') != 'n':
  483. try:
  484. snat_ip = os.getenv('SNAT6_TO_SOURCE')
  485. snat_ipo = ipaddress.ip_address(snat_ip)
  486. if type(snat_ipo) is ipaddress.IPv6Address:
  487. snat6_thread = Thread(target=snat6,args=(snat_ip,))
  488. snat6_thread.daemon = True
  489. snat6_thread.start()
  490. except ValueError:
  491. print(os.getenv('SNAT6_TO_SOURCE') + ' is not a valid IPv6 address')
  492. autopurge_thread = Thread(target=autopurge)
  493. autopurge_thread.daemon = True
  494. autopurge_thread.start()
  495. mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
  496. mailcowchainwatch_thread.daemon = True
  497. mailcowchainwatch_thread.start()
  498. blacklistupdate_thread = Thread(target=blacklistUpdate)
  499. blacklistupdate_thread.daemon = True
  500. blacklistupdate_thread.start()
  501. whitelistupdate_thread = Thread(target=whitelistUpdate)
  502. whitelistupdate_thread.daemon = True
  503. whitelistupdate_thread.start()
  504. signal.signal(signal.SIGTERM, quit)
  505. atexit.register(clear)
  506. while not quit_now:
  507. time.sleep(0.5)