8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2025-01-11 02:34:28 +00:00
This commit is contained in:
chapeau 2021-05-13 19:29:49 +02:00
parent a323bf7d68
commit d1a1d6613d
3 changed files with 94 additions and 32 deletions

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*- # -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il # Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en # se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics. # quelques clics.
# #
@ -71,7 +71,7 @@ class RadiusdHandler(logging.Handler):
rad_sig = radiusd.L_INFO rad_sig = radiusd.L_INFO
else: else:
rad_sig = radiusd.L_DBG rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, record.msg.encode("utf-8")) radiusd.radlog(rad_sig, str(record.msg))
# Init for logging # Init for logging
@ -126,18 +126,6 @@ def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise, """Usefull for instantiate ldap connexions otherwise,
do nothing""" do nothing"""
logger.info("Instantiation") logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event @radius_event
@ -234,7 +222,7 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
def check_user_machine_and_register(nas_type, user, user_interface): def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address):
"""Check if username and mac are registered. Register it if unknown. """Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok. Return the user ntlm password if everything is ok.
Used for 802.1X auth""" Used for 802.1X auth"""
@ -252,17 +240,28 @@ def check_user_machine_and_register(nas_type, user, user_interface):
elif not user_interface["active"]: elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "") return (False, "Interface/Machine disabled", "")
elif not user_interface["ipv4"]: elif not user_interface["ipv4"]:
# interface.assign_ipv4() try:
return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", "")) api_client.view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", ""))
except HTTPError as err:
return (False, "Error during ip assignement %s" % err.response.text, "")
else: else:
return (True, "Access ok", user.get("pwd_ntlm", "")) return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type: elif nas_type:
if nas_type["autocapture_mac"]: if nas_type["autocapture_mac"]:
# result, reason = user.autoregister_machine(mac_address, nas_type) try:
# if result: api_client.view(
# return (True, "Access Ok, Registering mac...", user.pwd_ntlm) "radius/autoregister/{0}/{1}/{2}".format(
# else: urllib.parse.quote(nas_id or "None", safe=""),
# return (False, "Error during mac register %s" % reason, "") urllib.parse.quote(username or "None", safe=""),
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Access Ok, Registering mac...", user["pwd_ntlm"])
except HTTPError as err:
return (False, "Error during mac register %s" % err.response.text, "")
return (False, "L'auto capture est désactivée", "") return (False, "L'auto capture est désactivée", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
@ -423,7 +422,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
for user in room_users: for user in room_users:
if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE: if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE:
all_user_ban = False all_user_ban = False
elif user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]): if user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]):
at_least_one_active_user = True at_least_one_active_user = True
if all_user_ban: if all_user_ban:
@ -491,13 +490,25 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
if radius_option["radius_general_policy"] == "MACHINE": if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"] DECISION_VLAN = user_interface["vlan_id"]
if not user_interface["ipv4"]: if not user_interface["ipv4"]:
# interface.assign_ipv4() try:
return ( api_client.view(
"Ok, assigning new ipv4" + extra_log, "radius/assign_ip/{0}".format(
DECISION_VLAN, urllib.parse.quote(user_mac or "None", safe="")
True, ))
attributes, return (
) "Ok, assigning new ipv4" + extra_log,
DECISION_VLAN,
True,
attributes,
)
except HTTPError as err:
return (
"Error during ip assignement %s" % err.response.text + extra_log,
DECISION_VLAN,
True,
attributes,
)
else: else:
return ( return (
"Interface OK" + extra_log, "Interface OK" + extra_log,

View file

@ -26,4 +26,8 @@ urls_functional_view = [
views.authorize, None), views.authorize, None),
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$", (r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$",
views.post_auth, None), views.post_auth, None),
(r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.autoregister_machine, None),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.assign_ip, None),
] ]

View file

@ -22,6 +22,8 @@
from rest_framework.decorators import api_view from rest_framework.decorators import api_view
from rest_framework.response import Response from rest_framework.response import Response
from django.db.models import Q from django.db.models import Q
from django.http import HttpResponse
from django.forms import ValidationError
from . import serializers from . import serializers
from machines.models import Domain, IpList, Interface, Nas from machines.models import Domain, IpList, Interface, Nas
@ -42,6 +44,16 @@ class AuthorizeResponse:
@api_view(['GET']) @api_view(['GET'])
def authorize(request, nas_id, username, mac_address): def authorize(request, nas_id, username, mac_address):
"""Return objects the radius need for the Authorize step
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
AuthorizeResponse: contains all the informations
"""
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
@ -106,8 +118,10 @@ def post_auth(request, nas_id, nas_port, user_mac):
) )
# get port # get port
port_number = nas_port.split(".")[0].split("/")[-1][-2:] port = None
port = Port.objects.filter(switch=switch, port=port_number).first() if nas_port and nas_port != "None":
port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first()
port_profile = None port_profile = None
if port: if port:
@ -139,3 +153,36 @@ def post_auth(request, nas_id, nas_port, user_mac):
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE)) PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
return Response(data=serialized.data) return Response(data=serialized.data)
@api_view(['GET'])
def autoregister_machine(request, nas_id, username, mac_address):
nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
).first()
nas_type = None
if nas_interface:
nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first()
user = User.objects.filter(pseudo__iexact=username).first()
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
return Response(data=reason)
return Response(reason, status=400)
@api_view(['GET'])
def assign_ip(request, mac_address):
interface = (
Interface.objects.filter(mac_address=mac_address)
.first()
)
try:
interface.assign_ipv4()
return Response()
except ValidationError as err:
return Response(err.message, status=400)