Update firewall rules to filter invalid ip address.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
11 KiB

#!/usr/bin/python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Copyright © 2020-2021 Thomas Chevalier <contact@tchevalier.fr>
import os
import sys
import logging
import argparse
import ipaddress
import subprocess
import urllib.request
from typing import Callable, Optional
from pathlib import Path
def ipv4_network_validator(ip_str: str) -> str:
return str(ipaddress.IPv4Network(ip_str))
def ipv6_network_validator(ip_str: str) -> str:
return str(ipaddress.IPv6Network(ip_str))
class RefreshFailed(Exception):
pass
def update_local_file(url: str, path: Path) -> bool:
"""
Get a file from the URL and write it on the disk if its content differs from the corresponding local file.
Parameter:
url: the file to get.
path: the path where to store the file
Returns:
True if the local file has been changed, False otherwise.
"""
logging.debug(f"Local filename is '{path.resolve()}'")
logging.debug(f"Downloading {url}")
try:
source = urllib.request.urlopen(url).read().decode("utf-8")
except urllib.error.URLError as err:
logging.error(f"Download of {url} failed : {err}")
raise RefreshFailed() from err
logging.debug(f"Successfuly downloaded {url}")
local_content = ""
try:
with path.open("r") as file:
local_content = file.read()
except FileNotFoundError:
logging.debug(f"File {path.resolve()} doesn't exist")
if local_content != source:
logging.debug("Remote file differs from local one")
with path.open("w") as file:
file.write(source)
return True
logging.debug("Remote file is the same as local one")
return False
def create_rules_from_file(
source: Path,
nft_name: str,
validator: Callable[[str], str],
exclude: Optional[list[str]] = None,
) -> str:
"""
Create nft commands to flush then fill a set with the content of a local file.
Line starting with '#' are ignored and it is possible to exclude specific lines using
the `exclude` list.
Parameters:
source: path of the file contaning the elements of the set (one per line).
nft_name: the full name of the set for nftables.
validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network.
exclude: list of the strings to exclude from the set.
Returns:
The commands needed to flush the set and then fill it with the element from `source`.
"""
rules = f"flush set {nft_name}\n"
rules += f"add element {nft_name} {{\n"
logging.debug(f"Creating rules from local file {source.resolve()}")
try:
with source.open("r") as file:
data = file.read()
except FileNotFoundError:
logging.warning(
"There is no file to read from (maybe because fetching failed and --force-update is set ?)"
)
return ""
element_count = 0
logging.debug(f"Parsing {source} content")
for line in data.split("\n"):
# Not in the exclude list, nor empty or comment
if len(line) != 0 and line[0] != "#" and (not exclude or line not in exclude):
# Further check, verify that the line is a valid CIDR addr
# This tries to prevent nft commands injection
try:
validator(line)
except ValueError as err:
logging.error(f"The following line isn't valid ({err}) : {line}.")
else:
rules += line + ",\n"
element_count += 1
logging.debug(f"Parsing {source} content is finished")
logging.info(
f"There {'is' if element_count <= 1 else 'are'} {element_count} elements in set '{nft_name}'"
)
rules += "}\n"
return rules
def update_rules_if_needed(
url: str,
nft_name: str,
working_directory: Path,
validator: Callable[[str], str],
exclude: Optional[list[str]] = None,
force_update: bool = False,
) -> str:
"""
Refresh the bogon list and create nftables rules if needed.
Parameters:
url: the url pointing to a bogon list (CIDR, one per line).
nft_name: the full name of the set for nftables.
working_directory: the directory in which to store files
validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network.
exclude: a list of CIDR to exclude from the bogon list (for example private addresses).
Returns:
The commands needed to flush the set and then fill it with the element from `source`.
"""
logging.debug(
f"Refreshing the rule for set '{nft_name}', using '{url}', excluding lines from {exclude}.{'Forcing update' if force_update else ''}"
)
local_filename = url.split("/")[-1]
logging.debug(f"Storing remote list into {local_filename}")
try:
update = update_local_file(url, working_directory / local_filename)
except RefreshFailed:
if force_update:
logging.warning(
"Refreshing the bogon list failed but continuing because --force_update is set"
)
update = True
else:
logging.critical("Could not refresh the bogon list using remote source")
sys.exit(1)
if not update and force_update:
logging.warning(
"The bogon list was already up-to-date but --force_update is set"
)
update = True
if update:
return create_rules_from_file(
source=working_directory / local_filename,
nft_name=nft_name,
validator=validator,
exclude=exclude,
)
return ""
def str_to_logging_level(log_str: str) -> int:
str_map = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
return str_map.get(log_str, logging.INFO)
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Update bogon list stored in nftables using the bogon list form team-cymru "
"(see https://team-cymru.com/community-services/bogon-reference/), for IPv4 and IPv6.",
epilog="Copyright © 2021 Thomas Chevalier."
"License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>.",
)
parser.add_argument(
"--verbosity",
dest="verbosity",
action="store",
choices=["debug", "info", "warning", "error", "critical"],
default="info",
help="increase the verbosity of the program (default to %(default)s)",
)
parser.add_argument(
"--force-update",
dest="force_update",
action="store_true",
help="update nftables sets even if there is no new data",
)
parser.add_argument(
"--no-sudo",
dest="no_sudo",
action="store_true",
help="don't use sudo to execute `nft`",
)
parser.add_argument(
"--no-ipv4",
dest="no_ipv4",
action="store_true",
help="don't update ipv4 set",
)
parser.add_argument(
"--no-ipv6",
dest="no_ipv6",
action="store_true",
help="don't update ipv6 set",
)
parser.add_argument(
"--working-directory",
dest="working_directory",
action="store",
default="/var/cache/updateBogons/",
help="change working directory (default to %(default)s)",
)
return parser.parse_args()
def main() -> None:
args = get_args()
logging.basicConfig(
format="%(asctime)s [%(levelname)s]: %(message)s",
level=str_to_logging_level(args.verbosity),
)
# Create working directory if it does not exist
if os.path.exists(args.working_directory):
if not os.path.isdir(args.working_directory):
logging.critical(
f"The working directory '{args.working_directory}' exists and is not a directory."
)
sys.exit(1)
else:
logging.debug(f"Creating working directory '{args.working_directory}'")
try:
os.makedirs(args.working_directory, 0o755)
except PermissionError as err:
logging.critical(
f"Create working directory '{args.working_directory}' FAILED : {err}"
)
sys.exit(1)
logging.debug(
f"Successfuly created working directory '{args.working_directory}'"
)
# Create new rules for IPv4 and IPv6, if needed
rules = ""
if not args.no_ipv4:
logging.debug("Refreshing rules for IPv4")
rules += update_rules_if_needed(
url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv4.txt",
nft_name="netdev ddos_mitigation bogon_ipv4",
working_directory=Path(args.working_directory),
validator=ipv4_network_validator,
exclude=["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
force_update=args.force_update,
)
if not args.no_ipv6:
logging.debug("Refreshing rules for IPv6")
rules += update_rules_if_needed(
url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv6.txt",
nft_name="netdev ddos_mitigation bogon_ipv6",
working_directory=Path(args.working_directory),
validator=ipv6_network_validator,
force_update=args.force_update,
)
# Commit those new rules
if len(rules) != 0:
logging.info("New rules are available")
logging.debug(f"Rules: {rules}")
rules_filename = os.path.join(args.working_directory, "rules.nft")
logging.debug(f"Writing rules in {rules_filename}")
with open(rules_filename, "w") as rules_file:
rules_file.write(rules)
command = ["sudo", "nft", "-f", rules_filename]
if args.no_sudo:
command = command[1:]
command_str = " ".join(command)
logging.debug(f"Executing `{command_str}`")
try:
subprocess.run(command, capture_output=True, check=True)
except subprocess.CalledProcessError as err:
error_string = err.stderr.decode("utf-8")
if len(error_string) > 0:
error_string = ": " + error_string
logging.critical(
f"Executing `{command_str}` FAILED (ret {err.returncode}){error_string}"
)
sys.exit(1)
logging.info("Nftables has now registred the new rules")
logging.debug(f"Removing {rules_filename}")
os.remove(rules_filename)
else:
logging.info("There is nothing to do")
if __name__ == "__main__":
main()