#!/usr/bin/python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Copyright © 2020-2021 Thomas Chevalier import os import sys import logging import argparse import ipaddress import subprocess import urllib.request from typing import Callable, Optional from pathlib import Path def ipv4_network_validator(ip_str: str) -> str: return str(ipaddress.IPv4Network(ip_str)) def ipv6_network_validator(ip_str: str) -> str: return str(ipaddress.IPv6Network(ip_str)) class RefreshFailed(Exception): pass def update_local_file(url: str, path: Path) -> bool: """ Get a file from the URL and write it on the disk if its content differs from the corresponding local file. Parameter: url: the file to get. path: the path where to store the file Returns: True if the local file has been changed, False otherwise. """ logging.debug(f"Local filename is '{path.resolve()}'") logging.debug(f"Downloading {url}") try: source = urllib.request.urlopen(url).read().decode("utf-8") except urllib.error.URLError as err: logging.error(f"Download of {url} failed : {err}") raise RefreshFailed() from err logging.debug(f"Successfuly downloaded {url}") local_content = "" try: with path.open("r") as file: local_content = file.read() except FileNotFoundError: logging.debug(f"File {path.resolve()} doesn't exist") if local_content != source: logging.debug("Remote file differs from local one") with path.open("w") as file: file.write(source) return True logging.debug("Remote file is the same as local one") return False def create_rules_from_file( source: Path, nft_name: str, validator: Callable[[str], str], exclude: Optional[list[str]] = None, ) -> str: """ Create nft commands to flush then fill a set with the content of a local file. Line starting with '#' are ignored and it is possible to exclude specific lines using the `exclude` list. Parameters: source: path of the file contaning the elements of the set (one per line). nft_name: the full name of the set for nftables. validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network. exclude: list of the strings to exclude from the set. Returns: The commands needed to flush the set and then fill it with the element from `source`. """ rules = f"flush set {nft_name}\n" rules += f"add element {nft_name} {{\n" logging.debug(f"Creating rules from local file {source.resolve()}") try: with source.open("r") as file: data = file.read() except FileNotFoundError: logging.warning( "There is no file to read from (maybe because fetching failed and --force-update is set ?)" ) return "" element_count = 0 logging.debug(f"Parsing {source} content") for line in data.split("\n"): # Not in the exclude list, nor empty or comment if len(line) != 0 and line[0] != "#" and (not exclude or line not in exclude): # Further check, verify that the line is a valid CIDR addr # This tries to prevent nft commands injection try: validator(line) except ValueError as err: logging.error(f"The following line isn't valid ({err}) : {line}.") else: rules += line + ",\n" element_count += 1 logging.debug(f"Parsing {source} content is finished") logging.info( f"There {'is' if element_count <= 1 else 'are'} {element_count} elements in set '{nft_name}'" ) rules += "}\n" return rules def update_rules_if_needed( url: str, nft_name: str, working_directory: Path, validator: Callable[[str], str], exclude: Optional[list[str]] = None, force_update: bool = False, ) -> str: """ Refresh the bogon list and create nftables rules if needed. Parameters: url: the url pointing to a bogon list (CIDR, one per line). nft_name: the full name of the set for nftables. working_directory: the directory in which to store files validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network. exclude: a list of CIDR to exclude from the bogon list (for example private addresses). Returns: The commands needed to flush the set and then fill it with the element from `source`. """ logging.debug( f"Refreshing the rule for set '{nft_name}', using '{url}', excluding lines from {exclude}.{'Forcing update' if force_update else ''}" ) local_filename = url.split("/")[-1] logging.debug(f"Storing remote list into {local_filename}") try: update = update_local_file(url, working_directory / local_filename) except RefreshFailed: if force_update: logging.warning( "Refreshing the bogon list failed but continuing because --force_update is set" ) update = True else: logging.critical("Could not refresh the bogon list using remote source") sys.exit(1) if not update and force_update: logging.warning( "The bogon list was already up-to-date but --force_update is set" ) update = True if update: return create_rules_from_file( source=working_directory / local_filename, nft_name=nft_name, validator=validator, exclude=exclude, ) return "" def str_to_logging_level(log_str: str) -> int: str_map = { "debug": logging.DEBUG, "info": logging.INFO, "warning": logging.WARNING, "error": logging.ERROR, "critical": logging.CRITICAL, } return str_map.get(log_str, logging.INFO) def get_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Update bogon list stored in nftables using the bogon list form team-cymru " "(see https://team-cymru.com/community-services/bogon-reference/), for IPv4 and IPv6.", epilog="Copyright © 2021 Thomas Chevalier." "License GPLv3+: GNU GPL version 3 or later .", ) parser.add_argument( "--verbosity", dest="verbosity", action="store", choices=["debug", "info", "warning", "error", "critical"], default="info", help="increase the verbosity of the program (default to %(default)s)", ) parser.add_argument( "--force-update", dest="force_update", action="store_true", help="update nftables sets even if there is no new data", ) parser.add_argument( "--no-sudo", dest="no_sudo", action="store_true", help="don't use sudo to execute `nft`", ) parser.add_argument( "--no-ipv4", dest="no_ipv4", action="store_true", help="don't update ipv4 set", ) parser.add_argument( "--no-ipv6", dest="no_ipv6", action="store_true", help="don't update ipv6 set", ) parser.add_argument( "--working-directory", dest="working_directory", action="store", default="/var/cache/updateBogons/", help="change working directory (default to %(default)s)", ) return parser.parse_args() def main() -> None: args = get_args() logging.basicConfig( format="%(asctime)s [%(levelname)s]: %(message)s", level=str_to_logging_level(args.verbosity), ) # Create working directory if it does not exist if os.path.exists(args.working_directory): if not os.path.isdir(args.working_directory): logging.critical( f"The working directory '{args.working_directory}' exists and is not a directory." ) sys.exit(1) else: logging.debug(f"Creating working directory '{args.working_directory}'") try: os.makedirs(args.working_directory, 0o755) except PermissionError as err: logging.critical( f"Create working directory '{args.working_directory}' FAILED : {err}" ) sys.exit(1) logging.debug( f"Successfuly created working directory '{args.working_directory}'" ) # Create new rules for IPv4 and IPv6, if needed rules = "" if not args.no_ipv4: logging.debug("Refreshing rules for IPv4") rules += update_rules_if_needed( url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv4.txt", nft_name="netdev ddos_mitigation bogon_ipv4", working_directory=Path(args.working_directory), validator=ipv4_network_validator, exclude=["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"], force_update=args.force_update, ) if not args.no_ipv6: logging.debug("Refreshing rules for IPv6") rules += update_rules_if_needed( url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv6.txt", nft_name="netdev ddos_mitigation bogon_ipv6", working_directory=Path(args.working_directory), validator=ipv6_network_validator, force_update=args.force_update, ) # Commit those new rules if len(rules) != 0: logging.info("New rules are available") logging.debug(f"Rules: {rules}") rules_filename = os.path.join(args.working_directory, "rules.nft") logging.debug(f"Writing rules in {rules_filename}") with open(rules_filename, "w") as rules_file: rules_file.write(rules) command = ["sudo", "nft", "-f", rules_filename] if args.no_sudo: command = command[1:] command_str = " ".join(command) logging.debug(f"Executing `{command_str}`") try: subprocess.run(command, capture_output=True, check=True) except subprocess.CalledProcessError as err: error_string = err.stderr.decode("utf-8") if len(error_string) > 0: error_string = ": " + error_string logging.critical( f"Executing `{command_str}` FAILED (ret {err.returncode}){error_string}" ) sys.exit(1) logging.info("Nftables has now registred the new rules") logging.debug(f"Removing {rules_filename}") os.remove(rules_filename) else: logging.info("There is nothing to do") if __name__ == "__main__": main()