From 9d7a7fa1f556252ae3c42038bf3e769a94ddc9eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Kervella?= Date: Fri, 13 Apr 2018 20:11:55 +0000 Subject: [PATCH] Pep8 compliance on freeradius --- freeradius_utils/auth.py | 228 +++++++++++++++++++++++++++------------ 1 file changed, 162 insertions(+), 66 deletions(-) diff --git a/freeradius_utils/auth.py b/freeradius_utils/auth.py index 5d3195da..b25fa003 100644 --- a/freeradius_utils/auth.py +++ b/freeradius_utils/auth.py @@ -1,4 +1,4 @@ -# ⁻*- mode: python; coding: utf-8 -*- +# -*- mode: python; coding: utf-8 -*- # Re2o est un logiciel d'administration développé initiallement au rezometz. Il # se veut agnostique au réseau considéré, de manière à être installable en # quelques clics. @@ -37,11 +37,20 @@ Inspiré du travail de Daniel Stan au Crans import logging import netaddr -import radiusd # Module magique freeradius (radiusd.py is dummy) +import radiusd # Module magique freeradius (radiusd.py is dummy) import binascii import hashlib -import os, sys +import os +import sys +from django.core.wsgi import get_wsgi_application +import argparse + +from django.db.models import Q +from machines.models import Interface, IpList, Nas, Domain +from topologie.models import Room, Port, Switch +from users.models import User +from preferences.models import OptionalTopologie proj_path = "/var/www/re2o/" # This is so Django knows where to find stuff. @@ -52,28 +61,17 @@ sys.path.append(proj_path) os.chdir(proj_path) # This is so models get loaded. -from django.core.wsgi import get_wsgi_application application = get_wsgi_application() -import argparse - -from django.db.models import Q -from machines.models import Interface, IpList, Nas, Domain -from topologie.models import Room, Port, Switch -from users.models import User -from preferences.models import OptionalTopologie - options, created = OptionalTopologie.objects.get_or_create() VLAN_NOK = options.vlan_decision_nok.vlan_id VLAN_OK = options.vlan_decision_ok.vlan_id - #: Serveur radius de test (pas la prod) TEST_SERVER = bool(os.getenv('DBG_FREERADIUS', False)) -## -*- Logging -*- - +# Logging class RadiusdHandler(logging.Handler): """Handler de logs pour freeradius""" @@ -87,6 +85,7 @@ class RadiusdHandler(logging.Handler): rad_sig = radiusd.L_DBG radiusd.radlog(rad_sig, record.msg) + # Initialisation d'un logger (pour logguer unifié) logger = logging.getLogger('auth.py') logger.setLevel(logging.DEBUG) @@ -95,10 +94,11 @@ handler = RadiusdHandler() handler.setFormatter(formatter) logger.addHandler(handler) + def radius_event(fun): """Décorateur pour les fonctions d'interfaces avec radius. - Une telle fonction prend un uniquement argument, qui est une liste de tuples - (clé, valeur) et renvoie un triplet dont les composantes sont : + Une telle fonction prend un uniquement argument, qui est une liste de + tuples (clé, valeur) et renvoie un triplet dont les composantes sont : * le code de retour (voir radiusd.RLM_MODULE_* ) * un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok et autres trucs du genre) @@ -118,8 +118,8 @@ def radius_event(fun): # Ex: Calling-Station-Id: "une_adresse_mac" data[key] = value.replace('"', '') try: - # TODO s'assurer ici que les tuples renvoyés sont bien des (str,str) - # rlm_python ne digère PAS les unicodes + # TODO s'assurer ici que les tuples renvoyés sont bien des + # (str,str) : rlm_python ne digère PAS les unicodes return fun(data) except Exception as err: logger.error('Failed %r on data %r' % (err, auth_data)) @@ -128,7 +128,6 @@ def radius_event(fun): return new_f - @radius_event def instantiate(*_): """Utile pour initialiser les connexions ldap une première fois (otherwise, @@ -137,12 +136,15 @@ def instantiate(*_): if TEST_SERVER: logger.info(u'DBG_FREERADIUS is enabled') + @radius_event def authorize(data): """On test si on connait le calling nas: - - si le nas est inconnue, on suppose que c'est une requète 802.1X, on la traite + - si le nas est inconnue, on suppose que c'est une requète 802.1X, on la + traite - si le nas est connu, on applique 802.1X si le mode est activé - - si le nas est connu et si il s'agit d'un nas auth par mac, on repond accept en authorize + - si le nas est connu et si il s'agit d'un nas auth par mac, on repond + accept en authorize """ # Pour les requetes proxifiees, on split nas = data.get('NAS-IP-Address', data.get('NAS-Identifier', None)) @@ -155,28 +157,35 @@ def authorize(data): user = data.get('User-Name', '').decode('utf-8', errors='replace') user = user.split('@', 1)[0] mac = data.get('Calling-Station-Id', '') - result, log, password = check_user_machine_and_register(nas_type, user, mac) + result, log, password = check_user_machine_and_register( + nas_type, + user, + mac + ) logger.info(log.encode('utf-8')) logger.info(user.encode('utf-8')) if not result: return radiusd.RLM_MODULE_REJECT else: - return (radiusd.RLM_MODULE_UPDATED, - (), - ( - (str("NT-Password"), str(password)), - ), + return ( + radiusd.RLM_MODULE_UPDATED, + (), + ( + (str("NT-Password"), str(password)), + ), ) else: - return (radiusd.RLM_MODULE_UPDATED, - (), - ( - ("Auth-Type", "Accept"), - ), + return ( + radiusd.RLM_MODULE_UPDATED, + (), + ( + ("Auth-Type", "Accept"), + ), ) + @radius_event def post_auth(data): nas = data.get('NAS-IP-Address', data.get('NAS-Identifier', None)) @@ -187,61 +196,87 @@ def post_auth(data): return radiusd.RLM_MODULE_OK nas_type = Nas.objects.filter(nas_type=nas_instance.type).first() if not nas_type: - logger.info(u"Type de nas non enregistré dans la bdd!".encode('utf-8')) + logger.info( + u"Type de nas non enregistré dans la bdd!".encode('utf-8') + ) return radiusd.RLM_MODULE_OK mac = data.get('Calling-Station-Id', None) - # Switch et bornes héritent de machine et peuvent avoir plusieurs interfaces filles + # Switch et bornes héritent de machine et peuvent avoir plusieurs + # interfaces filles nas_machine = nas_instance.machine # Si il s'agit d'un switch if hasattr(nas_machine, 'switch'): port = data.get('NAS-Port-Id', data.get('NAS-Port', None)) - #Pour les infrastructures possédant des switchs Juniper : - #On vérifie si le switch fait partie d'un stack Juniper + # Pour les infrastructures possédant des switchs Juniper : + # On vérifie si le switch fait partie d'un stack Juniper instance_stack = nas_machine.switch.stack if instance_stack: # Si c'est le cas, on resélectionne le bon switch dans la stack id_stack_member = port.split("-")[1].split('/')[0] - nas_machine = Switch.objects.filter(stack=instance_stack).filter(stack_member_id=id_stack_member).prefetch_related('interface_set__domain__extension').first() - # On récupère le numéro du port sur l'output de freeradius. La ligne suivante fonctionne pour cisco, HP et Juniper + nas_machine = (Switch.objects + .filter(stack=instance_stack) + .filter(stack_member_id=id_stack_member) + .prefetch_related( + 'interface_set__domain__extension' + ) + .first()) + # On récupère le numéro du port sur l'output de freeradius. + # La ligne suivante fonctionne pour cisco, HP et Juniper port = port.split(".")[0].split('/')[-1][-2:] out = decide_vlan_and_register_switch(nas_machine, nas_type, port, mac) sw_name, room, reason, vlan_id = out - log_message = '(fil) %s -> %s [%s%s]' % \ - (sw_name + u":" + port + u"/" + unicode(room), mac, vlan_id, (reason and u': ' + reason).encode('utf-8')) + log_message = '(fil) %s -> %s [%s%s]' % ( + sw_name + u":" + port + u"/" + unicode(room), + mac, + vlan_id, + (reason and u': ' + reason).encode('utf-8') + ) logger.info(log_message) # Filaire - return (radiusd.RLM_MODULE_UPDATED, + return ( + radiusd.RLM_MODULE_UPDATED, ( ("Tunnel-Type", "VLAN"), ("Tunnel-Medium-Type", "IEEE-802"), ("Tunnel-Private-Group-Id", '%d' % int(vlan_id)), ), () - ) + ) else: return radiusd.RLM_MODULE_OK + @radius_event def dummy_fun(_): """Do nothing, successfully. (C'est pour avoir un truc à mettre)""" return radiusd.RLM_MODULE_OK + def detach(_=None): """Appelé lors du déchargement du module (enfin, normalement)""" print "*** goodbye from auth.py ***" return radiusd.RLM_MODULE_OK + def find_nas_from_request(nas_id): - nas = Interface.objects.filter(Q(domain=Domain.objects.filter(name=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))).select_related('type').select_related('machine__switch__stack') + nas = (Interface.objects + .filter( + Q(domain=Domain.objects.filter(name=nas_id)) | + Q(ipv4=IpList.objects.filter(ipv4=nas_id)) + ) + .select_related('type') + .select_related('machine__switch__stack')) return nas.first() + def check_user_machine_and_register(nas_type, username, mac_address): - """ Verifie le username et la mac renseignee. L'enregistre si elle est inconnue. + """Verifie le username et la mac renseignee. L'enregistre si elle est + inconnue. Renvoie le mot de passe ntlm de l'user si tout est ok Utilise pour les authentifications en 802.1X""" interface = Interface.objects.filter(mac_address=mac_address).first() @@ -252,7 +287,10 @@ def check_user_machine_and_register(nas_type, username, mac_address): return (False, u"Adhérent non cotisant", '') if interface: if interface.machine.user != user: - return (False, u"Machine enregistrée sur le compte d'un autre user...", '') + return (False, + u"Machine enregistrée sur le compte d'un autre " + "user...", + '') elif not interface.is_active: return (False, u"Machine desactivée", '') elif not interface.ipv4: @@ -264,7 +302,9 @@ def check_user_machine_and_register(nas_type, username, mac_address): if nas_type.autocapture_mac: result, reason = user.autoregister_machine(mac_address, nas_type) if result: - return (True, u'Access Ok, Capture de la mac...', user.pwd_ntlm) + return (True, + u'Access Ok, Capture de la mac...', + user.pwd_ntlm) else: return (False, u'Erreur dans le register mac %s' % reason, '') else: @@ -273,8 +313,10 @@ def check_user_machine_and_register(nas_type, username, mac_address): return (False, u"Machine inconnue", '') -def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_address): - """Fonction de placement vlan pour un switch en radius filaire auth par mac. +def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, + mac_address): + """Fonction de placement vlan pour un switch en radius filaire auth par + mac. Plusieurs modes : - nas inconnu, port inconnu : on place sur le vlan par defaut VLAN_OK - pas de radius sur le port : VLAN_OK @@ -292,7 +334,8 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr - interface inconnue : - register mac désactivé : VLAN_NOK - register mac activé : - - dans la chambre associé au port, pas d'user ou non à jour : VLAN_NOK + - dans la chambre associé au port, pas d'user ou non à + jour : VLAN_NOK - user à jour, autocapture de la mac et VLAN_OK """ # Get port from switch and port number @@ -303,8 +346,13 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr sw_name = str(nas_machine) - port = Port.objects.filter(switch=Switch.objects.filter(machine_ptr=nas_machine), port=port_number).first() - #Si le port est inconnu, on place sur le vlan defaut + port = (Port.objects + .filter( + switch=Switch.objects.filter(machine_ptr=nas_machine), + port=port_number + ) + .first()) + # Si le port est inconnu, on place sur le vlan defaut if not port: return (sw_name, "Chambre inconnue", u'Port inconnu', VLAN_OK) @@ -316,7 +364,10 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr DECISION_VLAN = VLAN_OK if port.radius == 'NO': - return (sw_name, "", u"Pas d'authentification sur ce port" + extra_log, DECISION_VLAN) + return (sw_name, + "", + u"Pas d'authentification sur ce port" + extra_log, + DECISION_VLAN) if port.radius == 'BLOQ': return (sw_name, port.room, u'Port desactive', VLAN_NOK) @@ -326,7 +377,9 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr if not room: return (sw_name, "Inconnue", u'Chambre inconnue', VLAN_NOK) - room_user = User.objects.filter(Q(club__room=port.room) | Q(adherent__room=port.room)) + room_user = User.objects.filter( + Q(club__room=port.room) | Q(adherent__room=port.room) + ) if not room_user: return (sw_name, room, u'Chambre non cotisante', VLAN_NOK) for user in room_user: @@ -336,35 +389,78 @@ def decide_vlan_and_register_switch(nas_machine, nas_type, port_number, mac_addr if port.radius == 'COMMON' or port.radius == 'STRICT': # Authentification par mac - interface = Interface.objects.filter(mac_address=mac_address).select_related('machine__user').select_related('ipv4').first() + interface = (Interface.objects + .filter(mac_address=mac_address) + .select_related('machine__user') + .select_related('ipv4') + .first()) if not interface: room = port.room # On essaye de register la mac if not nas_type.autocapture_mac: return (sw_name, "", u'Machine inconnue', VLAN_NOK) elif not room: - return (sw_name, "Inconnue", u'Chambre et machine inconnues', VLAN_NOK) + return (sw_name, + "Inconnue", + u'Chambre et machine inconnues', + VLAN_NOK) else: if not room_user: - room_user = User.objects.filter(Q(club__room=port.room) | Q(adherent__room=port.room)) + room_user = User.objects.filter( + Q(club__room=port.room) | Q(adherent__room=port.room) + ) if not room_user: - return (sw_name, room, u'Machine et propriétaire de la chambre inconnus', VLAN_NOK) + return (sw_name, + room, + u'Machine et propriétaire de la chambre ' + 'inconnus', + VLAN_NOK) elif room_user.count() > 1: - return (sw_name, room, u'Machine inconnue, il y a au moins 2 users dans la chambre/local -> ajout de mac automatique impossible', VLAN_NOK) + return (sw_name, + room, + u'Machine inconnue, il y a au moins 2 users ' + 'dans la chambre/local -> ajout de mac ' + 'automatique impossible', + VLAN_NOK) elif not room_user.first().has_access(): - return (sw_name, room, u'Machine inconnue et adhérent non cotisant', VLAN_NOK) + return (sw_name, + room, + u'Machine inconnue et adhérent non cotisant', + VLAN_NOK) else: - result, reason = room_user.first().autoregister_machine(mac_address, nas_type) + result, reason = (room_user + .first() + .autoregister_machine( + mac_address, + nas_type + )) if result: - return (sw_name, room, u'Access Ok, Capture de la mac...' + extra_log, DECISION_VLAN) + return (sw_name, + room, + u'Access Ok, Capture de la mac: ' + extra_log, + DECISION_VLAN) else: - return (sw_name, room, u'Erreur dans le register mac %s' % reason + unicode(mac_address), VLAN_NOK) + return (sw_name, + room, + u'Erreur dans le register mac %s' % ( + reason + unicode(mac_address) + ), + VLAN_NOK) else: room = port.room if not interface.is_active: - return (sw_name, room, u'Machine non active / adherent non cotisant', VLAN_NOK) + return (sw_name, + room, + u'Machine non active / adherent non cotisant', + VLAN_NOK) elif not interface.ipv4: interface.assign_ipv4() - return (sw_name, room, u"Ok, Reassignation de l'ipv4" + extra_log, DECISION_VLAN) + return (sw_name, + room, + u"Ok, Reassignation de l'ipv4" + extra_log, + DECISION_VLAN) else: - return (sw_name, room, u'Machine OK' + extra_log, DECISION_VLAN) + return (sw_name, + room, + u'Machine OK' + extra_log, + DECISION_VLAN)