8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-06 01:46:27 +00:00

separate radius from re2o repo

This commit is contained in:
chapeau 2021-05-13 19:33:56 +02:00
parent 989410509d
commit f70b97677f

View file

@ -7,7 +7,6 @@
# Copyright © 2017 Gabriel Détraz # Copyright © 2017 Gabriel Détraz
# Copyright © 2017 Lara Kermarec # Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle # Copyright © 2017 Augustin Lemesle
# Copyright © 2020 Corentin Canebier
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
@ -35,19 +34,12 @@ https://github.com/FreeRADIUS/freeradius-server/blob/master/src/modules/rlm_pyth
Inspired by Daniel Stan in Crans Inspired by Daniel Stan in Crans
""" """
from configparser import ConfigParser
from re2oapi import Re2oAPIClient
import sys
from pathlib import Path
import subprocess
import logging
import os import os
import sys import sys
import logging
import traceback import traceback
import radiusd # Magic module freeradius (radiusd.py is dummy) import radiusd # Magic module freeradius (radiusd.py is dummy)
from django.core.wsgi import get_wsgi_application from django.core.wsgi import get_wsgi_application
from django.db.models import Q from django.db.models import Q
@ -62,11 +54,14 @@ os.chdir(proj_path)
# This is so models get loaded. # This is so models get loaded.
application = get_wsgi_application() application = get_wsgi_application()
from machines.models import Domain, Interface, IpList, Nas from machines.models import Interface, IpList, Nas, Domain
from preferences.models import RadiusOption
from topologie.models import Port, Switch from topologie.models import Port, Switch
from users.models import User from users.models import User
from preferences.models import RadiusOption
# Logging
class RadiusdHandler(logging.Handler): class RadiusdHandler(logging.Handler):
"""Logs handler for freeradius""" """Logs handler for freeradius"""
@ -112,15 +107,12 @@ def radius_event(fun):
# Ex: Calling-Station-Id: "une_adresse_mac" # Ex: Calling-Station-Id: "une_adresse_mac"
data[key] = value.replace('"', "") data[key] = value.replace('"', "")
try: try:
# TODO s'assurer ici que les tuples renvoy s sont bien des
# (str,str) : rlm_python ne dig re PAS les unicodes
return fun(data) return fun(data)
except Exception as err: except Exception as err:
exc_type, exc_instance, exc_traceback = sys.exc_info() exc_type, exc_instance, exc_traceback = sys.exc_info()
formatted_traceback = "".join(traceback.format_tb(exc_traceback)) formatted_traceback = "".join(traceback.format_tb(exc_traceback))
logger.error("Failed %r on data %r" % (err, auth_data)) logger.error("Failed %r on data %r" % (err, auth_data))
logger.error("Function %r, Traceback : %r" % logger.error("Function %r, Traceback : %r" % (fun, formatted_traceback))
(fun, formatted_traceback))
return radiusd.RLM_MODULE_FAIL return radiusd.RLM_MODULE_FAIL
return new_f return new_f
@ -128,30 +120,10 @@ def radius_event(fun):
@radius_event @radius_event
def instantiate(*_): def instantiate(*_):
"""Instantiate api connection """Usefull for instantiate ldap connexions otherwise,
""" do nothing"""
logger.info("Instantiation") logger.info("Instantiation")
path = Path(__file__).resolve(strict=True).parent
config = ConfigParser()
config.read(path / 'config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
def get_api_client():
"""Gets a Re2o, or tries to initialize one"""
if get_api_client.client is None:
get_api_client.client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
return get_api_client.client
get_api_client.client = None
global api_client
api_client = get_api_client
@radius_event @radius_event
def authorize(data): def authorize(data):
@ -161,26 +133,19 @@ def authorize(data):
- It the nas is known AND nas auth is enabled with mac address, returns - It the nas is known AND nas auth is enabled with mac address, returns
accept here""" accept here"""
# For proxified request, split # For proxified request, split
username = username.split("@", 1)[0] nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
nas_instance = find_nas_from_request(nas)
# For none proxified requests
nas_type = None
if nas_instance:
nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first()
if not nas_type or nas_type.port_access_mode == "802.1X":
user = data.get("User-Name", "")
user = user.split("@", 1)[0]
mac = data.get("Calling-Station-Id", "") mac = data.get("Calling-Station-Id", "")
result, log, password = check_user_machine_and_register(nas_type, user, mac)
# Get all required objects from API logger.info(str(log))
data_from_api = api_client().view( logger.info(str(user))
"radius/authorize/{0}/{1}/{2}".format(
urllib.parse.quote(nas or "None", safe=""),
urllib.parse.quote(username or "None", safe=""),
urllib.parse.quote(mac or "None", safe="")
))
nas_type = data_from_api["nas"]
user = data_from_api["user"]
user_interface = data_from_api["user_interface"]
if not nas_type or nas_type and nas_type["port_access_mode"] == "802.1X":
result, log, password = check_user_machine_and_register(
nas_type, user, user_interface, nas, username, mac)
logger.info(log.encode("utf-8"))
logger.info(username.encode("utf-8"))
if not result: if not result:
return radiusd.RLM_MODULE_REJECT return radiusd.RLM_MODULE_REJECT
@ -197,48 +162,54 @@ def authorize(data):
@radius_event @radius_event
def post_auth(data): def post_auth(data):
"""Function called after the user is authenticated""" """ Function called after the user is authenticated
"""
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None)) nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
nas_port = data.get("NAS-Port-Id", data.get("NAS-Port", None)) nas_instance = find_nas_from_request(nas)
mac = data.get("Calling-Station-Id", None) # All non proxified requests
if not nas_instance:
# Get all required objects from API
data_from_api = api_client().view(
"radius/post_auth/{0}/{1}/{2}".format(
urllib.parse.quote(nas or "None", safe=""),
urllib.parse.quote(nas_port or "None", safe=""),
urllib.parse.quote(mac or "None", safe="")
))
nas_type = data_from_api["nas"]
port = data_from_api["port"]
switch = data_from_api["switch"]
# If proxified request
if not nas_type:
logger.info("Proxified request, nas unknown") logger.info("Proxified request, nas unknown")
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
nas_type = Nas.objects.filter(nas_type=nas_instance.machine_type).first()
if not nas_type:
logger.info("This kind of nas is not registered in the database!")
return radiusd.RLM_MODULE_OK
# If the request is from a switch (wired connection) mac = data.get("Calling-Station-Id", None)
if switch:
# For logging
sw_name = switch["name"] or "?"
room = port["room"] or "Unknown room" if port else "Unknown port"
out = decide_vlan_switch(data_from_api, mac, nas_port) # Switchs and access point can have several interfaces
reason, vlan_id, decision, attributes = out nas_machine = nas_instance.machine
# If it is a switchs
if hasattr(nas_machine, "switch"):
port = data.get("NAS-Port-Id", data.get("NAS-Port", None))
# If the switch is part of a stack, calling ip is different from calling switch.
instance_stack = nas_machine.switch.stack
if instance_stack:
# If it is a stack, we select the correct switch in the stack
id_stack_member = port.split("-")[1].split("/")[0]
nas_machine = (
Switch.objects.filter(stack=instance_stack)
.filter(stack_member_id=id_stack_member)
.prefetch_related("interface_set__domain__extension")
.first()
)
# Find the port number from freeradius, works both with HP, Cisco
# and juniper output
port = port.split(".")[0].split("/")[-1][-2:]
out = decide_vlan_switch(nas_machine, nas_type, port, mac)
sw_name, room, reason, vlan_id, decision, attributes = out
if decision: if decision:
log_message = "(wired) %s -> %s [%s%s]" % ( log_message = "(wired) %s -> %s [%s%s]" % (
sw_name + ":" + nas_port + "/" + str(room), sw_name + ":" + port + "/" + str(room),
mac, mac,
vlan_id, vlan_id,
(reason and ": " + reason), (reason and ": " + reason),
) )
logger.info(log_message) logger.info(log_message)
# Apply vlan from decide_vlan_switch # Wired connexion
return ( return (
radiusd.RLM_MODULE_UPDATED, radiusd.RLM_MODULE_UPDATED,
( (
@ -250,8 +221,8 @@ def post_auth(data):
(), (),
) )
else: else:
log_message = "(wired) %s -> %s [Reject %s]" % ( log_message = "(fil) %s -> %s [Reject %s]" % (
sw_name + ":" + nas_port + "/" + str(room), sw_name + ":" + port + "/" + str(room),
mac, mac,
(reason and ": " + reason), (reason and ": " + reason),
) )
@ -259,86 +230,81 @@ def post_auth(data):
return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ()) return (radiusd.RLM_MODULE_REJECT, tuple(attributes), ())
# Else it is from wifi
else: else:
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address): # TODO : remove this function
@radius_event
def dummy_fun(_):
"""Do nothing, successfully. """
return radiusd.RLM_MODULE_OK
def detach(_=None):
"""Detatch the auth"""
print("*** goodbye from auth.py ***")
return radiusd.RLM_MODULE_OK
def find_nas_from_request(nas_id):
""" Get the nas object from its ID """
nas = (
Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
)
.select_related("machine_type")
.select_related("machine__switch__stack")
)
return nas.first()
def check_user_machine_and_register(nas_type, username, mac_address):
"""Check if username and mac are registered. Register it if unknown. """Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok. Return the user ntlm password if everything is ok.
Used for 802.1X auth Used for 802.1X auth"""
""" interface = Interface.objects.filter(mac_address=mac_address).first()
user = User.objects.filter(pseudo__iexact=username).first()
if not user: if not user:
# No username provided
return (False, "User unknown", "") return (False, "User unknown", "")
if not user.has_access():
if not user["access"]:
return (False, "Invalid connexion (non-contributing user)", "") return (False, "Invalid connexion (non-contributing user)", "")
if interface:
if user_interface: if interface.machine.user != user:
if user_interface["user_pk"] != user["pk"]:
return ( return (
False, False,
"Mac address registered on another user account", "Mac address registered on another user account",
"", "",
) )
elif not interface.is_active:
elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "") return (False, "Interface/Machine disabled", "")
elif not interface.ipv4:
elif not user_interface["ipv4"]: interface.assign_ipv4()
# Try to autoassign ip return (True, "Ok, new ipv4 assignement...", user.pwd_ntlm)
try:
api_client().view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", ""))
except HTTPError as err:
return (False, "Error during ip assignement %s" % err.response.text, "")
else: else:
return (True, "Access ok", user.get("pwd_ntlm", "")) return (True, "Access ok", user.pwd_ntlm)
elif nas_type: elif nas_type:
# The interface is not yet registred, try to autoregister if enabled if nas_type.autocapture_mac:
if nas_type["autocapture_mac"]: result, reason = user.autoregister_machine(mac_address, nas_type)
try: if result:
api_client().view( return (True, "Access Ok, Registering mac...", user.pwd_ntlm)
"radius/autoregister/{0}/{1}/{2}".format( else:
urllib.parse.quote(nas_id or "None", safe=""), return (False, "Error during mac register %s" % reason, "")
urllib.parse.quote(username or "None", safe=""),
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Access Ok, Registering mac...", user["pwd_ntlm"])
except HTTPError as err:
return (False, "Error during mac register %s" % err.response.text, "")
return (False, "Autoregistering is disabled", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
def set_radius_attributes_values(attributes, values): def decide_vlan_switch(nas_machine, nas_type, port_number, mac_address):
"""Set values of parameters in radius attributes"""
return (
(str(attribute["attribute"]), str(attribute["value"] % values))
for attribute in attributes
)
def decide_vlan_switch(data_from_api, user_mac, nas_port):
"""Function for selecting vlan for a switch with wired mac auth radius. """Function for selecting vlan for a switch with wired mac auth radius.
Two modes exist : in strict mode, a registered user cannot connect with Several modes are available :
their machines in a non-registered user room
Sequentially :
- all modes: - all modes:
- unknown NAS : VLAN_OK, - unknown NAS : VLAN_OK,
- unknown port : Decision set in Re2o RadiusOption - unknown port : Decision set in Re2o RadiusOption
- No radius on this port : VLAN_OK - No radius on this port : VLAN_OK
- force : replace VLAN_OK with vlan provided by the database - force : returns vlan provided by the database
- mode strict: - mode strict:
- no room : Decision set in Re2o RadiusOption, - no room : Decision set in Re2o RadiusOption,
- no user in this room : Reject, - no user in this room : Reject,
@ -354,35 +320,21 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
- user contributing : VLAN_OK (can assign ipv4 if needed) - user contributing : VLAN_OK (can assign ipv4 if needed)
- unknown interface : - unknown interface :
- register mac disabled : Decision set in Re2o RadiusOption - register mac disabled : Decision set in Re2o RadiusOption
- register mac enabled : redirect to webauth (not implemented) - register mac enabled : redirect to webauth
Returns: Returns:
tuple with : tuple with :
- Switch name (str)
- Room (str)
- Reason of the decision (str) - Reason of the decision (str)
- vlan_id (int) - vlan_id (int)
- decision (bool) - decision (bool)
- Other Attributs (attribut:str, value:str) - Other Attributs (attribut:str, operator:str, value:str)
""" """
# Get values from api
nas_type = data_from_api["nas"]
room_users = data_from_api["room_users"]
port = data_from_api["port"]
port_profile = data_from_api["port_profile"]
switch = data_from_api["switch"]
user_interface = data_from_api["user_interface"]
radius_option = data_from_api["radius_option"]
EMAIL_STATE_UNVERIFIED = data_from_api["EMAIL_STATE_UNVERIFIED"]
RADIUS_OPTION_REJECT = data_from_api["RADIUS_OPTION_REJECT"]
USER_STATE_ACTIVE = data_from_api["USER_STATE_ACTIVE"]
# Values which can be used as parameters in radius attributes
attributes_kwargs = { attributes_kwargs = {
"client_mac": str(user_mac), "client_mac": str(mac_address),
# magic split "switch_port": str(port_number),
"switch_port": str(nas_port.split(".")[0].split("/")[-1][-2:]),
"switch_ip": str(switch["ipv4"])
} }
# Get port from switch and port number
extra_log = "" extra_log = ""
# If NAS is unknown, go to default vlan # If NAS is unknown, go to default vlan
if not nas_machine: if not nas_machine:
@ -401,36 +353,41 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
attributes_kwargs["switch_ip"] = str(switch.ipv4) attributes_kwargs["switch_ip"] = str(switch.ipv4)
port = Port.objects.filter(switch=switch, port=port_number).first() port = Port.objects.filter(switch=switch, port=port_number).first()
# If the port is unknown, do as in RadiusOption # If the port is unknwon, go to default vlan
if not port or not port_profile: # We don't have enought information to make a better decision
if not port:
return ( return (
sw_name,
"Unknown port", "Unknown port",
radius_option["unknown_port_vlan"] and radius_option["unknown_port_vlan"]["vlan_id"] or None, "PUnknown port",
radius_option["unknown_port"] != RADIUS_OPTION_REJECT, getattr(
set_radius_attributes_values( RadiusOption.get_cached_value("unknown_port_vlan"), "vlan_id", None
radius_option["unknown_port_attributes"], attributes_kwargs), ),
RadiusOption.get_cached_value("unknown_port") != RadiusOption.REJECT,
RadiusOption.get_attributes("unknown_port_attributes", attributes_kwargs),
) )
# Retrieve port profile # Retrieve port profile
port_profile = port.get_port_profile port_profile = port.get_port_profile
# If a vlan is precised in port config, we use it # If a vlan is precised in port config, we use it
if port_profile["vlan_untagged"]: if port_profile.vlan_untagged:
DECISION_VLAN = int(port_profile["vlan_untagged"]["vlan_id"]) DECISION_VLAN = int(port_profile.vlan_untagged.vlan_id)
extra_log = "Force sur vlan %s" % str(DECISION_VLAN) extra_log = "Force sur vlan " + str(DECISION_VLAN)
attributes = () attributes = ()
else: else:
DECISION_VLAN = radius_option["vlan_decision_ok"]["vlan_id"] DECISION_VLAN = RadiusOption.get_cached_value("vlan_decision_ok").vlan_id
attributes = set_radius_attributes_values( attributes = RadiusOption.get_attributes("ok_attributes", attributes_kwargs)
radius_option["ok_attributes"], attributes_kwargs)
# If the port is disabled in re2o, REJECT # If the port is disabled in re2o, REJECT
if not port["state"]: if not port.state:
return ("Port disabled", None, False, ()) return (sw_name, port.room, "Port disabled", None, False, ())
# If radius is disabled, decision is OK # If radius is disabled, decision is OK
if port_profile["radius_type"] == "NO": if port_profile.radius_type == "NO":
return ( return (
sw_name,
"",
"No Radius auth enabled on this port" + extra_log, "No Radius auth enabled on this port" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
@ -439,8 +396,11 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
# If 802.1X is enabled, people has been previously accepted. # If 802.1X is enabled, people has been previously accepted.
# Go to the decision vlan # Go to the decision vlan
if (nas_type["port_access_mode"], port_profile["radius_type"]) == ("802.1X", "802.1X"): if (nas_type.port_access_mode, port_profile.radius_type) == ("802.1X", "802.1X"):
room = port.room or "Room unknown"
return ( return (
sw_name,
room,
"Accept authentication 802.1X", "Accept authentication 802.1X",
DECISION_VLAN, DECISION_VLAN,
True, True,
@ -449,121 +409,177 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
# Otherwise, we are in mac radius. # Otherwise, we are in mac radius.
# If strict mode is enabled, we check every user related with this port. If # If strict mode is enabled, we check every user related with this port. If
# all users and clubs are disabled, we reject to prevent from sharing or # one user or more is not enabled, we reject to prevent from sharing or
# spoofing mac. # spoofing mac.
if port_profile["radius_mode"] == "STRICT": if port_profile.radius_mode == "STRICT":
if not port["room"]: room = port.room
if not room:
return ( return (
sw_name,
"Unknown",
"Unkwown room", "Unkwown room",
radius_option["unknown_room_vlan"] and radius_option["unknown_room_vlan"]["vlan_id"] or None, getattr(
radius_option["unknown_room"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("unknown_room_vlan"), "vlan_id", None
set_radius_attributes_values( ),
radius_option["unknown_room_attributes"], attributes_kwargs), RadiusOption.get_cached_value("unknown_room") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_room_attributes", attributes_kwargs
),
) )
if not room_users: room_user = User.objects.filter(
Q(club__room=port.room) | Q(adherent__room=port.room)
)
if not room_user:
return ( return (
sw_name,
room,
"Non-contributing room", "Non-contributing room",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None, getattr(
radius_option["non_member"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("non_member_vlan"), "vlan_id", None
set_radius_attributes_values( ),
radius_option["non_member_attributes"], attributes_kwargs), RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes("non_member_attributes", attributes_kwargs),
) )
for user in room_user:
all_user_ban = True if user.is_ban() or user.state != User.STATE_ACTIVE:
at_least_one_active_user = False
for user in room_users:
if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE:
all_user_ban = False
if user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]):
at_least_one_active_user = True
if all_user_ban:
return ( return (
sw_name,
room,
"User is banned or disabled", "User is banned or disabled",
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None, getattr(
radius_option["banned"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None
set_radius_attributes_values( ),
radius_option["banned_attributes"], attributes_kwargs), RadiusOption.get_cached_value("banned") != RadiusOption.REJECT,
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
) )
if not at_least_one_active_user: elif user.email_state == User.EMAIL_STATE_UNVERIFIED:
return ( return (
"Non-contributing member or unconfirmed mail", sw_name,
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None, room,
radius_option["non_member"] != RADIUS_OPTION_REJECT, "User is suspended (mail has not been confirmed)",
set_radius_attributes_values( getattr(
radius_option["non_member_attributes"], attributes_kwargs), RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
)
elif not (user.is_connected() or user.is_whitelisted()):
return (
sw_name,
room,
"Non-contributing member",
getattr(
RadiusOption.get_cached_value("non_member_vlan"),
"vlan_id",
None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
) )
# else: user OK, so we check MAC now # else: user OK, so we check MAC now
# If we are authenticating with mac, we look for the interfaces and its mac address
if port_profile.radius_mode == "COMMON" or port_profile.radius_mode == "STRICT":
# Mac auth
interface = (
Interface.objects.filter(mac_address=mac_address)
.select_related("machine__user")
.select_related("ipv4")
.first()
)
# If mac is unknown, # If mac is unknown,
if not user_interface: if not interface:
room = port.room
# We try to register mac, if autocapture is enabled # We try to register mac, if autocapture is enabled
# Final decision depend on RADIUSOption set in re2o # Final decision depend on RADIUSOption set in re2o
# Something is not implemented here... if nas_type.autocapture_mac:
if nas_type["autocapture_mac"]:
return ( return (
sw_name,
room,
"Unknown mac/interface", "Unknown mac/interface",
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, getattr(
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("unknown_machine_vlan"),
set_radius_attributes_values( "vlan_id",
radius_option["unknown_machine_attributes"], attributes_kwargs), None,
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
) )
# Otherwise, if autocapture mac is not enabled, # Otherwise, if autocapture mac is not enabled,
else: else:
return ( return (
sw_name,
"",
"Unknown mac/interface", "Unknown mac/interface",
radius_option["unknown_machine_vlan"] and radius_option["unknown_machine_vlan"]["vlan_id"] or None, getattr(
radius_option["unknown_machine"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("unknown_machine_vlan"),
set_radius_attributes_values( "vlan_id",
radius_option["unknown_machine_attributes"], attributes_kwargs), None,
),
RadiusOption.get_cached_value("unknown_machine")
!= RadiusOption.REJECT,
RadiusOption.get_attributes(
"unknown_machine_attributes", attributes_kwargs
),
) )
# Mac/Interface is found, check if related user is contributing and ok # Mac/Interface is found, check if related user is contributing and ok
# If needed, set ipv4 to it # If needed, set ipv4 to it
else: else:
if user_interface["is_ban"]: room = port.room
if interface.machine.user.is_ban():
return ( return (
sw_name,
room,
"Banned user", "Banned user",
radius_option["banned_vlan"] and radius_option["banned_vlan"]["vlan_id"] or None, getattr(
radius_option["banned"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("banned_vlan"), "vlan_id", None
set_radius_attributes_values( ),
radius_option["banned_attributes"], attributes_kwargs), RadiusOption.get_cached_value("banned") != RadiusOption.REJECT,
RadiusOption.get_attributes("banned_attributes", attributes_kwargs),
) )
if not user_interface["active"]: if not interface.is_active:
return ( return (
sw_name,
room,
"Disabled interface / non-contributing member", "Disabled interface / non-contributing member",
radius_option["non_member_vlan"] and radius_option["non_member_vlan"]["vlan_id"] or None, getattr(
radius_option["non_member"] != RADIUS_OPTION_REJECT, RadiusOption.get_cached_value("non_member_vlan"),
set_radius_attributes_values( "vlan_id",
radius_option["non_member_attributes"], attributes_kwargs), None,
),
RadiusOption.get_cached_value("non_member") != RadiusOption.REJECT,
RadiusOption.get_attributes(
"non_member_attributes", attributes_kwargs
),
) )
# If settings is set to related interface vlan policy based on interface type: # If settings is set to related interface vlan policy based on interface type:
if radius_option["radius_general_policy"] == "MACHINE": if RadiusOption.get_cached_value("radius_general_policy") == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"] DECISION_VLAN = interface.machine_type.ip_type.vlan.vlan_id
if not user_interface["ipv4"]: if not interface.ipv4:
try: interface.assign_ipv4()
api_client().view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(user_mac or "None", safe="")
))
return ( return (
sw_name,
room,
"Ok, assigning new ipv4" + extra_log, "Ok, assigning new ipv4" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
attributes, attributes,
) )
except HTTPError as err:
return (
"Error during ip assignement %s" % err.response.text + extra_log,
DECISION_VLAN,
True,
attributes,
)
else: else:
return ( return (
sw_name,
room,
"Interface OK" + extra_log, "Interface OK" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,