You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
341 lines
11 KiB
341 lines
11 KiB
#!/usr/bin/python3
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Copyright © 2020-2021 Thomas Chevalier <contact@tchevalier.fr>
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import argparse
|
|
import ipaddress
|
|
import subprocess
|
|
import urllib.request
|
|
|
|
from typing import Callable, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
def ipv4_network_validator(ip_str: str) -> str:
|
|
return str(ipaddress.IPv4Network(ip_str))
|
|
|
|
|
|
def ipv6_network_validator(ip_str: str) -> str:
|
|
return str(ipaddress.IPv6Network(ip_str))
|
|
|
|
|
|
class RefreshFailed(Exception):
|
|
pass
|
|
|
|
|
|
def update_local_file(url: str, path: Path) -> bool:
|
|
"""
|
|
Get a file from the URL and write it on the disk if its content differs from the corresponding local file.
|
|
Parameter:
|
|
url: the file to get.
|
|
path: the path where to store the file
|
|
Returns:
|
|
True if the local file has been changed, False otherwise.
|
|
"""
|
|
logging.debug(f"Local filename is '{path.resolve()}'")
|
|
|
|
logging.debug(f"Downloading {url}")
|
|
try:
|
|
source = urllib.request.urlopen(url).read().decode("utf-8")
|
|
except urllib.error.URLError as err:
|
|
logging.error(f"Download of {url} failed : {err}")
|
|
raise RefreshFailed() from err
|
|
|
|
logging.debug(f"Successfuly downloaded {url}")
|
|
|
|
local_content = ""
|
|
try:
|
|
with path.open("r") as file:
|
|
local_content = file.read()
|
|
except FileNotFoundError:
|
|
logging.debug(f"File {path.resolve()} doesn't exist")
|
|
|
|
if local_content != source:
|
|
logging.debug("Remote file differs from local one")
|
|
with path.open("w") as file:
|
|
file.write(source)
|
|
return True
|
|
logging.debug("Remote file is the same as local one")
|
|
return False
|
|
|
|
|
|
def create_rules_from_file(
|
|
source: Path,
|
|
nft_name: str,
|
|
validator: Callable[[str], str],
|
|
exclude: Optional[list[str]] = None,
|
|
) -> str:
|
|
"""
|
|
Create nft commands to flush then fill a set with the content of a local file.
|
|
Line starting with '#' are ignored and it is possible to exclude specific lines using
|
|
the `exclude` list.
|
|
|
|
Parameters:
|
|
source: path of the file contaning the elements of the set (one per line).
|
|
nft_name: the full name of the set for nftables.
|
|
validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network.
|
|
exclude: list of the strings to exclude from the set.
|
|
|
|
Returns:
|
|
The commands needed to flush the set and then fill it with the element from `source`.
|
|
"""
|
|
rules = f"flush set {nft_name}\n"
|
|
rules += f"add element {nft_name} {{\n"
|
|
|
|
logging.debug(f"Creating rules from local file {source.resolve()}")
|
|
|
|
try:
|
|
with source.open("r") as file:
|
|
data = file.read()
|
|
except FileNotFoundError:
|
|
logging.warning(
|
|
"There is no file to read from (maybe because fetching failed and --force-update is set ?)"
|
|
)
|
|
return ""
|
|
|
|
element_count = 0
|
|
logging.debug(f"Parsing {source} content")
|
|
for line in data.split("\n"):
|
|
# Not in the exclude list, nor empty or comment
|
|
if len(line) != 0 and line[0] != "#" and (not exclude or line not in exclude):
|
|
|
|
# Further check, verify that the line is a valid CIDR addr
|
|
# This tries to prevent nft commands injection
|
|
try:
|
|
validator(line)
|
|
except ValueError as err:
|
|
logging.error(f"The following line isn't valid ({err}) : {line}.")
|
|
else:
|
|
rules += line + ",\n"
|
|
element_count += 1
|
|
|
|
logging.debug(f"Parsing {source} content is finished")
|
|
logging.info(
|
|
f"There {'is' if element_count <= 1 else 'are'} {element_count} elements in set '{nft_name}'"
|
|
)
|
|
|
|
rules += "}\n"
|
|
return rules
|
|
|
|
|
|
def update_rules_if_needed(
|
|
url: str,
|
|
nft_name: str,
|
|
working_directory: Path,
|
|
validator: Callable[[str], str],
|
|
exclude: Optional[list[str]] = None,
|
|
force_update: bool = False,
|
|
) -> str:
|
|
"""
|
|
Refresh the bogon list and create nftables rules if needed.
|
|
|
|
Parameters:
|
|
url: the url pointing to a bogon list (CIDR, one per line).
|
|
nft_name: the full name of the set for nftables.
|
|
working_directory: the directory in which to store files
|
|
validator: a function that throws a ValueError if the given ip network is not valid, otherwise returns a sanitized string representing the network.
|
|
exclude: a list of CIDR to exclude from the bogon list (for example private addresses).
|
|
|
|
Returns:
|
|
The commands needed to flush the set and then fill it with the element from `source`.
|
|
"""
|
|
|
|
logging.debug(
|
|
f"Refreshing the rule for set '{nft_name}', using '{url}', excluding lines from {exclude}.{'Forcing update' if force_update else ''}"
|
|
)
|
|
|
|
local_filename = url.split("/")[-1]
|
|
logging.debug(f"Storing remote list into {local_filename}")
|
|
try:
|
|
update = update_local_file(url, working_directory / local_filename)
|
|
except RefreshFailed:
|
|
if force_update:
|
|
logging.warning(
|
|
"Refreshing the bogon list failed but continuing because --force_update is set"
|
|
)
|
|
update = True
|
|
else:
|
|
logging.critical("Could not refresh the bogon list using remote source")
|
|
sys.exit(1)
|
|
|
|
if not update and force_update:
|
|
logging.warning(
|
|
"The bogon list was already up-to-date but --force_update is set"
|
|
)
|
|
update = True
|
|
|
|
if update:
|
|
return create_rules_from_file(
|
|
source=working_directory / local_filename,
|
|
nft_name=nft_name,
|
|
validator=validator,
|
|
exclude=exclude,
|
|
)
|
|
return ""
|
|
|
|
|
|
def str_to_logging_level(log_str: str) -> int:
|
|
str_map = {
|
|
"debug": logging.DEBUG,
|
|
"info": logging.INFO,
|
|
"warning": logging.WARNING,
|
|
"error": logging.ERROR,
|
|
"critical": logging.CRITICAL,
|
|
}
|
|
return str_map.get(log_str, logging.INFO)
|
|
|
|
|
|
def get_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Update bogon list stored in nftables using the bogon list form team-cymru "
|
|
"(see https://team-cymru.com/community-services/bogon-reference/), for IPv4 and IPv6.",
|
|
epilog="Copyright © 2021 Thomas Chevalier."
|
|
"License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>.",
|
|
)
|
|
parser.add_argument(
|
|
"--verbosity",
|
|
dest="verbosity",
|
|
action="store",
|
|
choices=["debug", "info", "warning", "error", "critical"],
|
|
default="info",
|
|
help="increase the verbosity of the program (default to %(default)s)",
|
|
)
|
|
parser.add_argument(
|
|
"--force-update",
|
|
dest="force_update",
|
|
action="store_true",
|
|
help="update nftables sets even if there is no new data",
|
|
)
|
|
parser.add_argument(
|
|
"--no-sudo",
|
|
dest="no_sudo",
|
|
action="store_true",
|
|
help="don't use sudo to execute `nft`",
|
|
)
|
|
parser.add_argument(
|
|
"--no-ipv4",
|
|
dest="no_ipv4",
|
|
action="store_true",
|
|
help="don't update ipv4 set",
|
|
)
|
|
parser.add_argument(
|
|
"--no-ipv6",
|
|
dest="no_ipv6",
|
|
action="store_true",
|
|
help="don't update ipv6 set",
|
|
)
|
|
parser.add_argument(
|
|
"--working-directory",
|
|
dest="working_directory",
|
|
action="store",
|
|
default="/var/cache/updateBogons/",
|
|
help="change working directory (default to %(default)s)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = get_args()
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s [%(levelname)s]: %(message)s",
|
|
level=str_to_logging_level(args.verbosity),
|
|
)
|
|
|
|
# Create working directory if it does not exist
|
|
if os.path.exists(args.working_directory):
|
|
if not os.path.isdir(args.working_directory):
|
|
logging.critical(
|
|
f"The working directory '{args.working_directory}' exists and is not a directory."
|
|
)
|
|
sys.exit(1)
|
|
else:
|
|
logging.debug(f"Creating working directory '{args.working_directory}'")
|
|
try:
|
|
os.makedirs(args.working_directory, 0o755)
|
|
except PermissionError as err:
|
|
logging.critical(
|
|
f"Create working directory '{args.working_directory}' FAILED : {err}"
|
|
)
|
|
sys.exit(1)
|
|
logging.debug(
|
|
f"Successfuly created working directory '{args.working_directory}'"
|
|
)
|
|
|
|
# Create new rules for IPv4 and IPv6, if needed
|
|
rules = ""
|
|
if not args.no_ipv4:
|
|
logging.debug("Refreshing rules for IPv4")
|
|
rules += update_rules_if_needed(
|
|
url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv4.txt",
|
|
nft_name="netdev ddos_mitigation bogon_ipv4",
|
|
working_directory=Path(args.working_directory),
|
|
validator=ipv4_network_validator,
|
|
exclude=["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"],
|
|
force_update=args.force_update,
|
|
)
|
|
|
|
if not args.no_ipv6:
|
|
logging.debug("Refreshing rules for IPv6")
|
|
rules += update_rules_if_needed(
|
|
url="https://www.team-cymru.org/Services/Bogons/fullbogons-ipv6.txt",
|
|
nft_name="netdev ddos_mitigation bogon_ipv6",
|
|
working_directory=Path(args.working_directory),
|
|
validator=ipv6_network_validator,
|
|
force_update=args.force_update,
|
|
)
|
|
|
|
# Commit those new rules
|
|
if len(rules) != 0:
|
|
logging.info("New rules are available")
|
|
logging.debug(f"Rules: {rules}")
|
|
|
|
rules_filename = os.path.join(args.working_directory, "rules.nft")
|
|
logging.debug(f"Writing rules in {rules_filename}")
|
|
with open(rules_filename, "w") as rules_file:
|
|
rules_file.write(rules)
|
|
|
|
command = ["sudo", "nft", "-f", rules_filename]
|
|
if args.no_sudo:
|
|
command = command[1:]
|
|
|
|
command_str = " ".join(command)
|
|
logging.debug(f"Executing `{command_str}`")
|
|
|
|
try:
|
|
subprocess.run(command, capture_output=True, check=True)
|
|
except subprocess.CalledProcessError as err:
|
|
error_string = err.stderr.decode("utf-8")
|
|
if len(error_string) > 0:
|
|
error_string = ": " + error_string
|
|
logging.critical(
|
|
f"Executing `{command_str}` FAILED (ret {err.returncode}){error_string}"
|
|
)
|
|
sys.exit(1)
|
|
logging.info("Nftables has now registred the new rules")
|
|
|
|
logging.debug(f"Removing {rules_filename}")
|
|
os.remove(rules_filename)
|
|
|
|
else:
|
|
logging.info("There is nothing to do")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|