Back
Type Name Operations
__pycache__ Open
account_review Open
autosuspend Open
check_software_mods Open
cms_tools Open
domainchecker Open
etc Open
extras Open
failsuspend Open
guds_modules Open
mailers Open
mitigatord Open
mysql Open
nlp_scripts Open
oldrads Open
ops Open
perl Open
python Open
suspended Open
temporary Open
README
account-review
alp.py
autossl_runner.sh
autosusprunner.sh
backup_scan.sh
blockip
check_apache
check_autossl
check_bandwidth
check_boxtrapper
check_cpu
check_crons
check_darkmailer.py
check_dcpumon
check_dns
check_domcount.sh
check_exim
check_hacks
check_imap
check_io
check_lve
check_mailchannels_dns
check_max_children
check_mem
check_misc
check_mysql
check_pacct
check_pop3
check_raid
check_server
check_size
check_software
check_spamd
check_traffic
check_user
check_zoneh
clean_exim.py
clean_moveuser
cms_counter.py
cms_creds
cms_dumpdb
cms_pw
cmspass.py
cpanel-api
cpumon
ctrl_alt_del
dcpumon.pl
disk_cleanup.py
dns-sync
docroot.py
du-tree
envinfo.py
exclude_rbl.py
exclude_sender
extract-vhost
find_warez
findbadscripts
fixwpcron.py
forensic.py
fraudhunter.py
generate_cpmove_tix
hostsfilemods
imap_io
killall911
lastcommcache.sh
legal_lock_down.sh
lil-cpanel
limit_bots
listacct
mail_sources.py
mailscan
mass_arp_fixer.py
mass_mysql_recover.py
megaclisas-status
modify-account
modsec_disable.py
move_generator.py
msp.pl
mysql_dstat
nlp
packandgo
pastebin
postmortem
procscrape
quarantine
quick_post
radsfunctions.sh
reap_fpm_orphans.sh
recent-cp
reclaim_suspensions
remote_dump
rescp.sh
reset_cpanel
reset_email
rotate_ip_addresses.py
rrdtooldisable.sh
rrdtoolenable.sh
sadatarunner.sh
send_customer_str
send_pp_email
server-load
setmaxemails
show-conns
software_report.py
sqltop
strmailer
suspend_domain
suspend_user
temp_apache_fix
unsuspend_user
unsusprunner.sh
update_spf
upgrade-check
vhost_data.py

File Transfer

Upload files to current directory

File Editor: blockip

#!/usr/lib/rads/venv/bin/python3 """wrapper script for blocking/unblocking IPs on shared via ipset Also ensures that the IPs are safe to block by importing the imh-whcheck module from imh-fail2ban""" import sys import subprocess import syslog import os import pwd import sqlite3 import logging import requests import json from pathlib import Path from socket import AddressFamily # pylint:disable=no-name-in-module from argparse import ArgumentParser from datetime import datetime, timedelta, timezone import psutil from netaddr import IPNetwork IPLIST_PATH = "/etc/rads/iplists" DB_PATH = "/etc/rads/blocked_ips.db" LOG_PATH = "/var/log/blockip.log" # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler(LOG_PATH), ], ) logger = logging.getLogger('blockip') def get_iplists(): list_path = Path(IPLIST_PATH) def fetch_iplist(): iplist = requests.get("https://ipinfo.imhadmin.net/v1/net/lists") list_path.write_text(iplist.text) try: list_stat = list_path.stat() day_in_seconds = 24 * 3600 # If the file is empty or older than 24 hours, fetch a new copy if ( list_stat.st_size == 0 or ( datetime.now() - datetime.fromtimestamp(list_stat.st_mtime) ).total_seconds() > day_in_seconds ): fetch_iplist() except FileNotFoundError: fetch_iplist() return json.loads(list_path.read_text()) def log_action(msg, level='info', console_msg=None): """ Log action to both syslog and log file If console_msg is provided, print that to console """ try: user = os.getlogin() except Exception: try: user = pwd.getpwuid(os.getuid()).pw_name except Exception: user = 'unknown' full_msg = f"[{user}] {msg}" # Log to syslog syslog.openlog('blockip') syslog.syslog(full_msg) syslog.closelog() # Log to file log_func = getattr(logger, level.lower()) log_func(full_msg) # Only print to console if console_msg is provided if console_msg: print(console_msg) def init_db(): """Initialize SQLite database and create table if it doesn't exist""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( '''CREATE TABLE IF NOT EXISTS blocked_ips (ip TEXT PRIMARY KEY, reason TEXT, blocked_at TEXT, expires_at TEXT)''' ) conn.commit() conn.close() def add_ip_to_db(ip, reason, blocked_at, expires_at): """Add IP entry to database""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute( '''INSERT OR REPLACE INTO blocked_ips (ip, reason, blocked_at, expires_at) VALUES (?, ?, ?, ?)''', (ip, reason, blocked_at, expires_at), ) conn.commit() conn.close() def remove_ip_from_db(ip): """Remove IP entry from database""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('DELETE FROM blocked_ips WHERE ip = ?', (ip,)) conn.commit() conn.close() def get_expired_ips(): """Get list of expired IPs""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") c.execute('SELECT ip FROM blocked_ips WHERE expires_at <= ?', (now,)) expired_ips = [row[0] for row in c.fetchall()] conn.close() return expired_ips def remove_expired_ips(expired_ips): """Remove expired IPs from database""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.executemany( 'DELETE FROM blocked_ips WHERE ip = ?', [(ip,) for ip in expired_ips] ) conn.commit() conn.close() def parse_cli(): """ Parse CLI arguments """ parser = ArgumentParser(description="ipset wrapper for IMH shared firewall") mgroup = parser.add_mutually_exclusive_group(required=True) mgroup.add_argument( '--block', '-d', action='store', nargs=2, metavar=("IP/CIDR", "REASON"), help="IP Address or network to block + reason/comment", ) mgroup.add_argument( '--unblock', '-u', action='store', metavar="IP/CIDR", help="IP Address or network to unblock", ) parser.add_argument( '--time', '-t', type=str, help="Required block duration (e.g., 7d, 4h, 30m)", ) mgroup.add_argument( '--clean', action='store_true', help="Unblock IPs whose expiration time has passed (used by cron)", ) parser.add_argument( "--skip-relayhosts", "-s", action="store_true", help="skip checking /etc/relayhosts when blocking an IP", ) return parser.parse_args() def parse_time_duration(duration): """Parse time duration string into timedelta""" units = {'m': 'minutes', 'h': 'hours', 'd': 'days'} unit = duration[-1] value = int(duration[:-1]) if unit not in units: raise ValueError("Invalid time unit. Use m/h/d.") return timedelta(**{units[unit]: value}) def check_ignore(ip): """ Check if @ip is safe block, or should be ignored This uses the imh-whcheck script from imh-fail2ban returns True if we should ignore, otherwise False """ cmd = ['/etc/fail2ban/filter.d/ignorecommands/imh-whcheck', ip] try: return subprocess.call(cmd) == 0 except OSError as exc: print(exc, file=sys.stderr) return False def check_ipinfo(ip): ''' Check IP list for IPs that shouldn't be blocked ''' ip_map = get_iplists() for provider, iplist in ip_map.items(): for cidr in iplist: if ip in IPNetwork(cidr): return provider return None def check_relayhosts(addr: str): """Check /etc/relayhosts for matches""" try: with open('/etc/relayhosts', encoding='utf-8') as f: for tline in f.readlines(): tline = tline.strip() if addr in IPNetwork(tline): return True except OSError as exc: print(exc, file=sys.stderr) return False return False def check_local(ip): """ Check if @ip is local """ local_ips = [ IPNetwork(x[0].address) for x in list(psutil.net_if_addrs().values()) if x[0].family is AddressFamily.AF_INET ] return ip in local_ips def write_syslog(msg): """ Track actions via syslog and logging servers """ syslog.openlog('blockip') syslog.syslog(msg) syslog.closelog() def ipset(cmd: str, ip, setname='blacklist', comment=''): """ Run ipset with specified @cmd for @ip on @setname """ try: ret = subprocess.run( ['/sbin/ipset', cmd, setname, str(ip)], encoding='utf-8', stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, check=False, ) except Exception as e: error_msg = f"ERROR: Failed to execute ipset: {str(e)}" log_action(error_msg, 'error') return if ret.returncode == 0: log_msg = f"{setname} {cmd} {ip}" if comment: log_msg += f" - {comment}" log_action(log_msg) else: error_msg = f"FAILED: {setname} {cmd} {ip}: {ret.stderr}" log_action(error_msg, 'error') def unblock_expired_ips(): """Unblock IPs whose expiration time has passed""" expired_ips = get_expired_ips() for ip in expired_ips: ipset('del', ip, comment='Cleaned Expired IP') log_msg = f"Cleaned Expired IP {ip}" log_action(log_msg) print(f"Unblocked IP {ip} (expired)") remove_expired_ips(expired_ips) def _main(): """Entry point""" try: # Ensure log directory exists os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) # Initialize database init_db() args = parse_cli() if args.block and not args.time: error_msg = "ERROR: --time is required when using --block" log_action(error_msg, 'error', console_msg=error_msg) sys.exit(3) if args.clean: log_action("Starting cleanup of expired IPs") unblock_expired_ips() log_action("Finished cleanup of expired IPs") return # Exit after cron task finishes ip_list = args.block if args.block else [args.unblock] try: tip = IPNetwork(ip_list[0]) except Exception as e: error_msg = f"ERROR: {str(e)}" log_action(error_msg, 'error') sys.exit(1) if tip.size > 1024: error_msg = "ERROR: Using networks larger than /22 is not allowed!" log_action(error_msg, 'error', console_msg=error_msg) sys.exit(2) elif check_local(str(tip.ip)): error_msg = f"ERROR: Unable to block local IP {tip}" log_action(error_msg, 'error', console_msg=error_msg) sys.exit(2) elif check_ignore(str(tip.ip)): error_msg = ( f"ERROR: Unable to block IP {tip}, found in fail2ban whitelist" ) log_action(error_msg, 'error', console_msg=error_msg) sys.exit(2) elif range_name:= check_ipinfo(str(tip.ip)): error_msg = ( f"ERROR: Unable to block IP {tip}, found in no-block list " f"({range_name})." ) log_action(error_msg, 'error', console_msg=error_msg) sys.exit(2) elif not args.skip_relayhosts and check_relayhosts(str(tip.ip)): error_msg = ( f"ERROR: Unable to block IP {tip}, found in /etc/relayhosts. " "Add -s/--skip-relayhosts to skip this check." ) log_action(error_msg, 'error', console_msg=error_msg) sys.exit(2) if args.block: comment = ip_list[1] expiry_msg = "" blocked_at = datetime.now(timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" ) if args.time: duration = parse_time_duration(args.time) expires_at = datetime.now(timezone.utc) + duration expiry_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%SZ") expiry_msg = f" [Block expires at {expiry_iso}]" add_ip_to_db(str(tip), comment, blocked_at, expiry_iso) log_action( f"Added IP {tip} to database with expiry {expiry_iso}" ) ipset('add', tip, comment=comment) log_action( f"Blocked IP {tip} ({comment}){expiry_msg}", console_msg=f"SUCCESS: Blocked IP {tip} " f"({comment}){expiry_msg}", ) elif args.unblock: ipset('del', tip) remove_ip_from_db(str(tip)) log_action( f"Unblocked IP {tip}", console_msg=f"SUCCESS: Unblocked IP {tip}", ) except Exception as e: error_msg = f"CRITICAL ERROR: {str(e)}" log_action(error_msg, 'critical') sys.exit(1) if __name__ == '__main__': _main()