8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-05 17:36:27 +00:00
This commit is contained in:
chapeau 2021-05-13 19:28:56 +02:00
parent 8ba9fa2fa0
commit 01e7822d95
4 changed files with 211 additions and 263 deletions

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*- # -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il # Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# se veut agnostique au réseau considéré, de manière à être installable en # se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics. # quelques clics.
# #
@ -7,6 +7,7 @@
# Copyright © 2017 Gabriel Détraz # Copyright © 2017 Gabriel Détraz
# Copyright © 2017 Lara Kermarec # Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle # Copyright © 2017 Augustin Lemesle
# Copyright © 2020 Corentin Canebier
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
@ -59,20 +60,18 @@ from preferences.models import RadiusOption
from topologie.models import Port, Switch from topologie.models import Port, Switch
from users.models import User from users.models import User
# Logging
class RadiusdHandler(logging.Handler): class RadiusdHandler(logging.Handler):
"""Logs handler for freeradius""" """Handler de logs pour freeradius"""
def emit(self, record): def emit(self, record):
"""Log message processing, level are converted""" """Process un message de log, en convertissant les niveaux"""
if record.levelno >= logging.WARN: if record.levelno >= logging.WARN:
rad_sig = radiusd.L_ERR rad_sig = radiusd.L_ERR
elif record.levelno >= logging.INFO: elif record.levelno >= logging.INFO:
rad_sig = radiusd.L_INFO rad_sig = radiusd.L_INFO
else: else:
rad_sig = radiusd.L_DBG rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, str(record.msg)) radiusd.radlog(rad_sig, record.msg.encode("utf-8"))
# Init for logging # Init for logging
@ -85,15 +84,17 @@ logger.addHandler(handler)
def radius_event(fun): def radius_event(fun):
"""Decorator for freeradius fonction with radius. """Décorateur pour les fonctions d'interfaces avec radius.
This function take a unique argument which is a list of tuples (key, value) Une telle fonction prend un uniquement argument, qui est une liste de
and return a tuple of 3 values which are: tuples (clé, valeur) et renvoie un triplet dont les composantes sont :
* return code (see radiusd.RLM_MODULE_* ) * le code de retour (voir radiusd.RLM_MODULE_* )
* a tuple of 2 elements for response value (access ok , etc) * un tuple de couples (clé, valeur) pour les valeurs de réponse (accès ok
* a tuple of 2 elements for internal value to update (password for example) et autres trucs du genre)
* un tuple de couples (clé, valeur) pour les valeurs internes à mettre à
jour (mot de passe par exemple)
Here, we convert the list of tuples into a dictionnary. On se contente avec ce décorateur (pour l'instant) de convertir la liste de
""" tuples en entrée en un dictionnaire."""
def new_f(auth_data): def new_f(auth_data):
""" The function transforming the tuples as dict """ """ The function transforming the tuples as dict """
@ -106,12 +107,15 @@ def radius_event(fun):
# Ex: Calling-Station-Id: "une_adresse_mac" # Ex: Calling-Station-Id: "une_adresse_mac"
data[key] = value.replace('"', "") data[key] = value.replace('"', "")
try: try:
# TODO s'assurer ici que les tuples renvoy s sont bien des
# (str,str) : rlm_python ne dig re PAS les unicodes
return fun(data) return fun(data)
except Exception as err: except Exception as err:
exc_type, exc_instance, exc_traceback = sys.exc_info() exc_type, exc_instance, exc_traceback = sys.exc_info()
formatted_traceback = "".join(traceback.format_tb(exc_traceback)) formatted_traceback = "".join(traceback.format_tb(exc_traceback))
logger.error("Failed %r on data %r" % (err, auth_data)) logger.error("Failed %r on data %r" % (err, auth_data))
logger.error("Function %r, Traceback : %r" % (fun, formatted_traceback)) logger.error("Function %r, Traceback : %r" %
(fun, formatted_traceback))
return radiusd.RLM_MODULE_FAIL return radiusd.RLM_MODULE_FAIL
return new_f return new_f
@ -122,6 +126,18 @@ def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise, """Usefull for instantiate ldap connexions otherwise,
do nothing""" do nothing"""
logger.info("Instantiation") logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event @radius_event
@ -171,32 +187,18 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first() nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first()
if not nas_type: if not nas_type:
logger.info("This kind of nas is not registered in the database!") logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
mac = data.get("Calling-Station-Id", None) # If it is a switch
if switch:
sw_name = switch["name"] or "?"
room = "Unknown port"
if port:
room = port.room or "Unknown room"
# Switchs and access point can have several interfaces out = decide_vlan_switch(data_from_api, mac, nas_port)
nas_machine = nas_instance.machine reason, vlan_id, decision, attributes = out
# If it is a switchs
if hasattr(nas_machine, "switch"):
port = data.get("NAS-Port-Id", data.get("NAS-Port", None))
# If the switch is part of a stack, calling ip is different from calling switch.
instance_stack = nas_machine.switch.stack
if instance_stack:
# If it is a stack, we select the correct switch in the stack
id_stack_member = port.split("-")[1].split("/")[0]
nas_machine = (
Switch.objects.filter(stack=instance_stack)
.filter(stack_member_id=id_stack_member)
.prefetch_related("interface_set__domain__extension")
.first()
)
# Find the port number from freeradius, works both with HP, Cisco
# and juniper output
port = port.split(".")[0].split("/")[-1][-2:]
out = decide_vlan_switch(nas_machine, nas_type, port, mac)
sw_name, room, reason, vlan_id, decision, attributes = out
if decision: if decision:
log_message = "(wired) %s -> %s [%s%s]" % ( log_message = "(wired) %s -> %s [%s%s]" % (
@ -232,70 +234,50 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
# TODO : remove this function def check_user_machine_and_register(nas_type, user, user_interface):
@radius_event
def dummy_fun(_):
"""Do nothing, successfully. """
return radiusd.RLM_MODULE_OK
def detach(_=None):
"""Detatch the auth"""
print("*** goodbye from auth.py ***")
return radiusd.RLM_MODULE_OK
def find_nas_from_request(nas_id):
""" Get the nas object from its ID """
nas = (
Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
)
.select_related("machine_type")
.select_related("machine__switch__stack")
)
return nas.first()
def check_user_machine_and_register(nas_type, username, mac_address):
"""Check if username and mac are registered. Register it if unknown. """Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok. Return the user ntlm password if everything is ok.
Used for 802.1X auth""" Used for 802.1X auth"""
interface = Interface.objects.filter(mac_address=mac_address).first()
user = User.objects.filter(pseudo__iexact=username).first()
if not user: if not user:
return (False, "User unknown", "") return (False, "User unknown", "")
if not user.has_access(): if not user["access"]:
return (False, "Invalid connexion (non-contributing user)", "") return (False, "Invalid connexion (non-contributing user)", "")
if interface: if user_interface:
if interface.machine.user != user: if user_interface["user_pk"] != user["pk"]:
return ( return (
False, False,
"Mac address registered on another user account", "Mac address registered on another user account",
"", "",
) )
elif not interface.is_active: elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "") return (False, "Interface/Machine disabled", "")
elif not interface.ipv4: elif not user_interface["ipv4"]:
interface.assign_ipv4() # interface.assign_ipv4()
return (True, "Ok, new ipv4 assignement...", user.pwd_ntlm) return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", ""))
else: else:
return (True, "Access ok", user.pwd_ntlm) return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type: elif nas_type:
if nas_type.autocapture_mac: if nas_type["autocapture_mac"]:
result, reason = user.autoregister_machine(mac_address, nas_type) # result, reason = user.autoregister_machine(mac_address, nas_type)
if result: # if result:
return (True, "Access Ok, Registering mac...", user.pwd_ntlm) # return (True, "Access Ok, Registering mac...", user.pwd_ntlm)
else: # else:
return (False, "Error during mac register %s" % reason, "") # return (False, "Error during mac register %s" % reason, "")
return (False, "L'auto capture est désactivée", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address): def set_radius_attributes_values(attributes, values):
return (
(str(attribute.attribute), str(attribute.value % values))
for attribute in attributes
)
def decide_vlan_switch(data_from_api, user_mac, nas_port):
"""Function for selecting vlan for a switch with wired mac auth radius. """Function for selecting vlan for a switch with wired mac auth radius.
Several modes are available : Several modes are available :
- all modes: - all modes:
@ -328,10 +310,24 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
- decision (bool) - decision (bool)
- Other Attributs (attribut:str, operator:str, value:str) - Other Attributs (attribut:str, operator:str, value:str)
""" """
nas_type = data_from_api["nas"]
room_users = data_from_api["room_users"]
port = data_from_api["port"]
port_profile = data_from_api["port_profile"]
switch = data_from_api["switch"]
user_interface = data_from_api["user_interface"]
radius_option = data_from_api["radius_option"]
EMAIL_STATE_UNVERIFIED = data_from_api["EMAIL_STATE_UNVERIFIED"]
RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"]
USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"]
attributes_kwargs = { attributes_kwargs = {
"client_mac": str(mac_address), "client_mac": str(user_mac),
"switch_port": str(port_number), "switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]),
"switch_ip": str(switch.ipv4)
} }
# Get port from switch and port number # Get port from switch and port number
extra_log = "" extra_log = ""
# If NAS is unknown, go to default vlan # If NAS is unknown, go to default vlan
@ -353,39 +349,35 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
# If the port is unknwon, go to default vlan # If the port is unknwon, go to default vlan
# We don't have enought information to make a better decision # We don't have enought information to make a better decision
if not port: if not port or not port_profile:
return ( return (
sw_name,
"Unknown port", "Unknown port",
"PUnknown port", radius_option["unknown_port_vlan"] and radius_option["unknown_port_vlan"]["vlan_id"] or None,
getattr( radius_option["unknown_port"] != RADIUS_OPTION_REJECT,
RadiusOption.get_cached_value("unknown_port_vlan"), "vlan_id", None set_radius_attributes_values(
), radius_option["unknown_port_attributes"], attributes_kwargs),
RadiusOption.get_cached_value("unknown_port") != RadiusOption.REJECT,
RadiusOption.get_attributes("unknown_port_attributes", attributes_kwargs),
) )
# Retrieve port profile # Retrieve port profile
port_profile = port.get_port_profile port_profile = port.get_port_profile
# If a vlan is precised in port config, we use it # If a vlan is precised in port config, we use it
if port_profile.vlan_untagged: if port_profile["vlan_untagged"]:
DECISION_VLAN = int(port_profile.vlan_untagged.vlan_id) DECISION_VLAN = int(port_profile["vlan_untagged"]["vlan_id"])
extra_log = "Force sur vlan " + str(DECISION_VLAN) extra_log = "Force sur vlan " + str(DECISION_VLAN)
attributes = () attributes = ()
else: else:
DECISION_VLAN = RadiusOption.get_cached_value("vlan_decision_ok").vlan_id DECISION_VLAN = radius_option["vlan_decision_ok"]["vlan_id"]
attributes = RadiusOption.get_attributes("ok_attributes", attributes_kwargs) attributes = set_radius_attributes_values(
radius_option["ok_attributes"], attributes_kwargs)
# If the port is disabled in re2o, REJECT # If the port is disabled in re2o, REJECT
if not port.state: if not port["state"]:
return (sw_name, port.room, "Port disabled", None, False, ()) return ("Port disabled", None, False, ())
# If radius is disabled, decision is OK # If radius is disabled, decision is OK
if port_profile.radius_type == "NO": if port_profile["radius_type"] == "NO":
return ( return (
sw_name,
"",
"No Radius auth enabled on this port" + extra_log, "No Radius auth enabled on this port" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
@ -394,11 +386,8 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
# If 802.1X is enabled, people has been previously accepted. # If 802.1X is enabled, people has been previously accepted.
# Go to the decision vlan # Go to the decision vlan
if (nas_type.port_access_mode, port_profile.radius_type) == ("802.1X", "802.1X"): if (nas_type["port_access_mode"], port_profile["radius_type"]) == ("802.1X", "802.1X"):
room = port.room or "Room unknown"
return ( return (
sw_name,
room,
"Accept authentication 802.1X", "Accept authentication 802.1X",
DECISION_VLAN, DECISION_VLAN,
True, True,
@ -409,166 +398,101 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
# If strict mode is enabled, we check every user related with this port. If # If strict mode is enabled, we check every user related with this port. If
# one user or more is not enabled, we reject to prevent from sharing or # one user or more is not enabled, we reject to prevent from sharing or
# spoofing mac. # spoofing mac.
if port_profile.radius_mode == "STRICT": if port_profile["radius_mode"] == "STRICT":
room = port.room if not port["room"]:
if not room:
return ( return (
sw_name,
"Unknown",
"Unkwown room", "Unkwown room",
getattr( radius_option["unknown_room_vlan"] and radius_option["unknown_room_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("unknown_room_vlan"), "vlan_id", None radius_option["unknown_room"] != RADIUS_OPTION_REJECT,
), set_radius_attributes_values(
RadiusOption.get_cached_value("unknown_room") != RadiusOption.REJECT, radius_option["unknown_room_attributes"], attributes_kwargs),
RadiusOption.get_attributes(
"unknown_room_attributes", attributes_kwargs
),
) )
room_user = User.objects.filter( if not room_users:
Q(club__room=port.room) | Q(adherent__room=port.room)
)
if not room_user:
return ( return (
sw_name,
room,
"Non-contributing room", "Non-contributing room",
getattr( radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("non_member_vlan"), "vlan_id", None radius_option["non_member"] != RADIUS_OPTION_REJECT,
), set_radius_attributes_values(
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT, radius_option["non_member_attributes"], attributes_kwargs),
RadiusOption.get_attributes("non_member_attributes", attributes_kwargs), )
all_user_ban = True
at_least_one_active_user = False
for user in room_users:
if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE:
all_user_ban = False
elif user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]):
at_least_one_active_user = True
if all_user_ban:
return (
"User is banned or disabled",
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None,
radius_option["banned"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["banned_attributes"], attributes_kwargs),
)
if not at_least_one_active_user:
return (
"Non-contributing member or unconfirmed mail",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
radius_option["non_member"] != RADIUS_OPTION_REJECT,
set_radius_attributes_values(
radius_option["non_member_attributes"], attributes_kwargs),
) )
for user in room_user:
if user.is_ban() or user.state != User.STATE_ACTIVE:
return (
sw_name,
room,
"User is banned or disabled",
getattr(
RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None
),
RadiusOption.get_cached_value("banned") != RadiusOption.REJECT,
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
)
elif user.email_state == User.EMAIL_STATE_UNVERIFIED:
return (
sw_name,
room,
"User is suspended (mail has not been confirmed)",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
)
elif not (user.is_connected() or user.is_whitelisted()):
return (
sw_name,
room,
"Non-contributing member",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
)
# else: user OK, so we check MAC now # else: user OK, so we check MAC now
# If we are authenticating with mac, we look for the interfaces and its mac address # If we are authenticating with mac, we look for the interfaces and its mac address
if port_profile.radius_mode == "COMMON" or port_profile.radius_mode == "STRICT": if port_profile.radius_mode == "COMMON" or port_profile.radius_mode == "STRICT":
# Mac auth
interface = (
Interface.objects.filter(mac_address=mac_address)
.select_related("machine__user")
.select_related("ipv4")
.first()
)
# If mac is unknown, # If mac is unknown,
if not interface: if not user_interface:
room = port.room
# We try to register mac, if autocapture is enabled # We try to register mac, if autocapture is enabled
# Final decision depend on RADIUSOption set in re2o # Final decision depend on RADIUSOption set in re2o
if nas_type.autocapture_mac: if nas_type["autocapture_mac"]:
return ( return (
sw_name,
room,
"Unknown mac/interface", "Unknown mac/interface",
getattr( radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("unknown_machine_vlan"), radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
"vlan_id", set_radius_attributes_values(
None, radius_option["unknown_machine_attributes"], attributes_kwargs),
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
) )
# Otherwise, if autocapture mac is not enabled, # Otherwise, if autocapture mac is not enabled,
else: else:
return ( return (
sw_name,
"",
"Unknown mac/interface", "Unknown mac/interface",
getattr( radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("unknown_machine_vlan"), radius_option["unknown_machine"] != RADIUS_OPTION_REJECT,
"vlan_id", set_radius_attributes_values(
None, radius_option["unknown_machine_attributes"], attributes_kwargs),
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
) )
# Mac/Interface is found, check if related user is contributing and ok # Mac/Interface is found, check if related user is contributing and ok
# If needed, set ipv4 to it # If needed, set ipv4 to it
else: else:
room = port.room if user_interface["is_ban"]:
if interface.machine.user.is_ban():
return ( return (
sw_name,
room,
"Banned user", "Banned user",
getattr( radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None radius_option["banned"] != RADIUS_OPTION_REJECT,
), set_radius_attributes_values(
RadiusOption.get_cached_value("banned") != RadiusOption.REJECT, radius_option["banned_attributes"], attributes_kwargs),
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
) )
if not interface.is_active: if not user_interface["active"]:
return ( return (
sw_name,
room,
"Disabled interface / non-contributing member", "Disabled interface / non-contributing member",
getattr( radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None,
RadiusOption.get_cached_value("non_member_vlan"), radius_option["non_member"] != RADIUS_OPTION_REJECT,
"vlan_id", set_radius_attributes_values(
None, radius_option["non_member_attributes"], attributes_kwargs),
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
) )
# If settings is set to related interface vlan policy based on interface type: # If settings is set to related interface vlan policy based on interface type:
if RadiusOption.get_cached_value("radius_general_policy") == "MACHINE": if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = interface.machine_type.ip_type.vlan.vlan_id DECISION_VLAN = user_interface["vlan_id"]
if not interface.ipv4: if not user_interface["ipv4"]:
interface.assign_ipv4() # interface.assign_ipv4()
return ( return (
sw_name,
room,
"Ok, assigning new ipv4" + extra_log, "Ok, assigning new ipv4" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
@ -576,8 +500,6 @@ def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
) )
else: else:
return ( return (
sw_name,
room,
"Interface OK" + extra_log, "Interface OK" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,

View file

@ -1,3 +1,24 @@
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2020 Corentin Canebier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from rest_framework import serializers from rest_framework import serializers
import machines.models as machines import machines.models as machines
@ -6,13 +27,15 @@ from api.serializers import NamespacedHMSerializer
from rest_framework.serializers import Serializer from rest_framework.serializers import Serializer
class Ipv4Serializer(Serializer):
ipv4 = serializers.CharField()
class InterfaceSerializer(Serializer): class InterfaceSerializer(Serializer):
mac_address = serializers.CharField() mac_address = serializers.CharField()
ipv4 = serializers.CharField(source="ipv4.ipv4") ipv4 = Ipv4Serializer()
active = serializers.BooleanField(source="is_active") active = serializers.BooleanField(source="is_active")
user_pk = serializers.CharField(source="machine.user.pk") user_pk = serializers.CharField(source="machine.user.pk")
# machine_type_pk = serializers.CharField(source="machine_type.pk")
# switch_stack = serializers.CharField(source="machine.switch.stack")
machine_short_name = serializers.CharField(source="machine.short_name") machine_short_name = serializers.CharField(source="machine.short_name")
is_ban = serializers.BooleanField(source="machine.user.is_ban") is_ban = serializers.BooleanField(source="machine.user.is_ban")
vlan_id = serializers.IntegerField( vlan_id = serializers.IntegerField(
@ -96,3 +119,4 @@ class PostAuthResponseSerializer(Serializer):
radius_option = RadiusOptionSerializer() radius_option = RadiusOptionSerializer()
EMAIL_STATE_UNVERIFIED = serializers.IntegerField() EMAIL_STATE_UNVERIFIED = serializers.IntegerField()
RADIUS_OPTION_REJECT = serializers.CharField() RADIUS_OPTION_REJECT = serializers.CharField()
USER_STATE_ACTIVE = serializers.CharField()

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*- # -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il # Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en # se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics. # quelques clics.
# #
@ -21,15 +21,9 @@
from . import views from . import views
urls_view = [
# (r"radius/nas-interface-from-id/(?P<nas_id>.+)$", views.nas_from_id_view),
# (r"radius/nas-from-machine-type/(?P<machine_type>.+)$", views.NasFromMachineTypeView),
# (r"radius/user-from-username/(?P<username>.+)$", views.UserFromUsernameView),
# (r"radius/interface-from-mac-address/(?P<mac_address>.+)$", views.InterfaceFromMacAddressView),
]
urls_functional_view = [ urls_functional_view = [
(r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F:\-]{17})$", (r"radius/authorize/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.authorize, None), views.authorize, None),
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F:\-]{17})$", (r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$",
views.post_auth, None), views.post_auth, None),
] ]

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*- # -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il # Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en # se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics. # quelques clics.
# #
@ -62,7 +62,7 @@ def authorize(request, nas_id, username, mac_address):
class PostAuthResponse: class PostAuthResponse:
def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT): def __init__(self, nas, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE):
self.nas = nas self.nas = nas
self.room_users = room_users self.room_users = room_users
self.port = port self.port = port
@ -72,6 +72,7 @@ class PostAuthResponse:
self.radius_option = radius_option self.radius_option = radius_option
self.EMAIL_STATE_UNVERIFIED = EMAIL_STATE_UNVERIFIED self.EMAIL_STATE_UNVERIFIED = EMAIL_STATE_UNVERIFIED
self.RADIUS_OPTION_REJECT = RADIUS_OPTION_REJECT self.RADIUS_OPTION_REJECT = RADIUS_OPTION_REJECT
self.USER_STATE_ACTIVE = USER_STATE_ACTIVE
def can_view(self, user): def can_view(self, user):
return [True] return [True]
@ -84,29 +85,33 @@ def post_auth(request, nas_id, nas_port, user_mac):
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id)) | Q(ipv4=IpList.objects.filter(ipv4=nas_id))
).first() ).first()
print(nas_id)
nas_type = None nas_type = None
if nas_interface: if nas_interface:
nas_type = Nas.objects.filter( nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first() nas_type=nas_interface.machine_type).first()
# get switch # get switch
switch = Switch.objects.filter(machine_ptr=nas_interface.machine).first() switch = None
if hasattr(nas_interface.machine, "switch"): if nas_interface:
stack = nas_interface.machine.switch.stack switch = Switch.objects.filter(
if stack: machine_ptr=nas_interface.machine).first()
id_stack_member = nas_port.split("-")[1].split("/")[0] if hasattr(nas_interface.machine, "switch"):
switch = ( stack = nas_interface.machine.switch.stack
Switch.objects.filter(stack=stack) if stack:
.filter(stack_member_id=id_stack_member) id_stack_member = nas_port.split("-")[1].split("/")[0]
.first() switch = (
) Switch.objects.filter(stack=stack)
.filter(stack_member_id=id_stack_member)
.first()
)
# get port # get port
port_number = nas_port.split(".")[0].split("/")[-1][-2:] port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first() port = Port.objects.filter(switch=switch, port=port_number).first()
port_profile = port.get_port_profile port_profile = None
if port:
port_profile = port.get_port_profile
# get user_interface # get user_interface
user_interface = ( user_interface = (
@ -117,9 +122,11 @@ def post_auth(request, nas_id, nas_port, user_mac):
) )
# get room users # get room users
room_users = User.objects.filter( room_users = []
Q(club__room=port.room) | Q(adherent__room=port.room) if port:
) room_users = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
# get radius options # get radius options
radius_option = RadiusOption.objects.first() radius_option = RadiusOption.objects.first()
@ -127,7 +134,8 @@ def post_auth(request, nas_id, nas_port, user_mac):
EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED EMAIL_STATE_UNVERIFIED = User.EMAIL_STATE_UNVERIFIED
RADIUS_OPTION_REJECT = RadiusOption.REJECT RADIUS_OPTION_REJECT = RadiusOption.REJECT
USER_STATE_ACTIVE = User.STATE_ACTIVE
serialized = serializers.PostAuthResponseSerializer( serialized = serializers.PostAuthResponseSerializer(
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT)) PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
return Response(data=serialized.data) return Response(data=serialized.data)