8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-12-23 23:43:47 +00:00

on a testé à rennes et ça marche du tonnerre

This commit is contained in:
chapeau 2020-11-28 23:07:36 +01:00
parent df6de3ef2f
commit 048140db20
3 changed files with 119 additions and 41 deletions

View file

@ -1,5 +1,5 @@
# -*- mode: python; coding: utf-8 -*- # -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il # Re2o est un logiciel d'administration développé initiallement au Rézo Metz. Il
# se veut agnostique au réseau considéré, de manière à être installable en # se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics. # quelques clics.
# #
@ -45,9 +45,21 @@ import subprocess
import logging import logging
import traceback import traceback
import radiusd import radiusd
from requests import HTTPError
import urllib.parse import urllib.parse
api_client = None path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
class RadiusdHandler(logging.Handler): class RadiusdHandler(logging.Handler):
@ -61,7 +73,7 @@ class RadiusdHandler(logging.Handler):
rad_sig = radiusd.L_INFO rad_sig = radiusd.L_INFO
else: else:
rad_sig = radiusd.L_DBG rad_sig = radiusd.L_DBG
radiusd.radlog(rad_sig, record.msg.encode("utf-8")) radiusd.radlog(rad_sig, str(record.msg))
# Initialisation d'un logger (pour logguer unifi ) # Initialisation d'un logger (pour logguer unifi )
@ -116,30 +128,22 @@ def instantiate(*_):
"""Usefull for instantiate ldap connexions otherwise, """Usefull for instantiate ldap connexions otherwise,
do nothing""" do nothing"""
logger.info("Instantiation") logger.info("Instantiation")
path = (os.path.dirname(os.path.abspath(__file__)))
config = ConfigParser()
config.read(path+'/config.ini')
api_hostname = config.get('Re2o', 'hostname')
api_password = config.get('Re2o', 'password')
api_username = config.get('Re2o', 'username')
global api_client
api_client = Re2oAPIClient(
api_hostname, api_username, api_password, use_tls=True)
@radius_event @radius_event
def authorize(data): def authorize(data):
# Pour les requetes proxifiees, on split # Pour les requetes proxifiees, on split
nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None)) nas = data.get("NAS-IP-Address", data.get("NAS-Identifier", None))
user = data.get("User-Name", "").decode("utf-8", errors="replace") username = data.get("User-Name", "")
user = user.split("@", 1)[0] username = username.split("@", 1)[0]
mac = data.get("Calling-Station-Id", "") mac = data.get("Calling-Station-Id", "")
data_from_api = api_client.view( data_from_api = api_client.view(
"radius/authorize/{0}/{1}/{2}".format(nas, user, mac)) "radius/authorize/{0}/{1}/{2}".format(
urllib.parse.quote(nas or "None", safe=""),
urllib.parse.quote(username or "None", safe=""),
urllib.parse.quote(mac or "None", safe="")
))
nas_type = data_from_api["nas"] nas_type = data_from_api["nas"]
user = data_from_api["user"] user = data_from_api["user"]
@ -147,9 +151,9 @@ def authorize(data):
if nas_type and nas_type["port_access_mode"] == "802.1X": if nas_type and nas_type["port_access_mode"] == "802.1X":
result, log, password = check_user_machine_and_register( result, log, password = check_user_machine_and_register(
nas_type, user, user_interface) nas_type, user, user_interface, nas, username, mac)
logger.info(log.encode("utf-8")) logger.info(log.encode("utf-8"))
logger.info(user.encode("utf-8")) logger.info(username.encode("utf-8"))
if not result: if not result:
return radiusd.RLM_MODULE_REJECT return radiusd.RLM_MODULE_REJECT
@ -175,9 +179,9 @@ def post_auth(data):
data_from_api = api_client.view( data_from_api = api_client.view(
"radius/post_auth/{0}/{1}/{2}".format( "radius/post_auth/{0}/{1}/{2}".format(
urllib.parse.quote(nas), urllib.parse.quote(nas or "None", safe=""),
urllib.parse.quote(nas_port), urllib.parse.quote(nas_port or "None", safe=""),
urllib.parse.quote(mac) urllib.parse.quote(mac or "None", safe="")
)) ))
nas_type = data_from_api["nas"] nas_type = data_from_api["nas"]
@ -232,7 +236,7 @@ def post_auth(data):
return radiusd.RLM_MODULE_OK return radiusd.RLM_MODULE_OK
def check_user_machine_and_register(nas_type, user, user_interface): def check_user_machine_and_register(nas_type, user, user_interface, nas_id, username, mac_address):
"""Check if username and mac are registered. Register it if unknown. """Check if username and mac are registered. Register it if unknown.
Return the user ntlm password if everything is ok. Return the user ntlm password if everything is ok.
Used for 802.1X auth""" Used for 802.1X auth"""
@ -250,17 +254,28 @@ def check_user_machine_and_register(nas_type, user, user_interface):
elif not user_interface["active"]: elif not user_interface["active"]:
return (False, "Interface/Machine disabled", "") return (False, "Interface/Machine disabled", "")
elif not user_interface["ipv4"]: elif not user_interface["ipv4"]:
# interface.assign_ipv4() try:
api_client.view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", "")) return (True, "Ok, new ipv4 assignement...", user.get("pwd_ntlm", ""))
except HTTPError as err:
return (False, "Error during ip assignement %s" % err.response.text, "")
else: else:
return (True, "Access ok", user.get("pwd_ntlm", "")) return (True, "Access ok", user.get("pwd_ntlm", ""))
elif nas_type: elif nas_type:
if nas_type["autocapture_mac"]: if nas_type["autocapture_mac"]:
# result, reason = user.autoregister_machine(mac_address, nas_type) try:
# if result: api_client.view(
# return (True, "Access Ok, Registering mac...", user.pwd_ntlm) "radius/autoregister/{0}/{1}/{2}".format(
# else: urllib.parse.quote(nas_id or "None", safe=""),
# return (False, "Error during mac register %s" % reason, "") urllib.parse.quote(username or "None", safe=""),
urllib.parse.quote(mac_address or "None", safe="")
))
return (True, "Access Ok, Registering mac...", user["pwd_ntlm"])
except HTTPError as err:
return (False, "Error during mac register %s" % err.response.text, "")
return (False, "L'auto capture est désactivée", "") return (False, "L'auto capture est désactivée", "")
else: else:
return (False, "Unknown interface/machine", "") return (False, "Unknown interface/machine", "")
@ -402,7 +417,7 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
for user in room_users: for user in room_users:
if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE: if not user["is_ban"] and user["state"] == USER_STATE_ACTIVE:
all_user_ban = False all_user_ban = False
elif user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]): if user["email_state"] != EMAIL_STATE_UNVERIFIED and (user["is_connected"] or user["is_whitelisted"]):
at_least_one_active_user = True at_least_one_active_user = True
if all_user_ban: if all_user_ban:
@ -470,13 +485,25 @@ def decide_vlan_switch(data_from_api, user_mac, nas_port):
if radius_option["radius_general_policy"] == "MACHINE": if radius_option["radius_general_policy"] == "MACHINE":
DECISION_VLAN = user_interface["vlan_id"] DECISION_VLAN = user_interface["vlan_id"]
if not user_interface["ipv4"]: if not user_interface["ipv4"]:
# interface.assign_ipv4() try:
api_client.view(
"radius/assign_ip/{0}".format(
urllib.parse.quote(user_mac or "None", safe="")
))
return ( return (
"Ok, assigning new ipv4" + extra_log, "Ok, assigning new ipv4" + extra_log,
DECISION_VLAN, DECISION_VLAN,
True, True,
attributes, attributes,
) )
except HTTPError as err:
return (
"Error during ip assignement %s" % err.response.text + extra_log,
DECISION_VLAN,
True,
attributes,
)
else: else:
return ( return (
"Interface OK" + extra_log, "Interface OK" + extra_log,

View file

@ -26,4 +26,8 @@ urls_functional_view = [
views.authorize, None), views.authorize, None),
(r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$", (r"radius/post_auth/(?P<nas_id>[^/]+)/(?P<nas_port>.+)/(?P<user_mac>[0-9a-fA-F\:\-]{17})$",
views.post_auth, None), views.post_auth, None),
(r"radius/autoregister/(?P<nas_id>[^/]+)/(?P<username>.+)/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.autoregister_machine, None),
(r"radius/assign_ip/(?P<mac_address>[0-9a-fA-F\:\-]{17})$",
views.assign_ip, None),
] ]

View file

@ -22,6 +22,8 @@
from rest_framework.decorators import api_view from rest_framework.decorators import api_view
from rest_framework.response import Response from rest_framework.response import Response
from django.db.models import Q from django.db.models import Q
from django.http import HttpResponse
from django.forms import ValidationError
from . import serializers from . import serializers
from machines.models import Domain, IpList, Interface, Nas from machines.models import Domain, IpList, Interface, Nas
@ -42,6 +44,16 @@ class AuthorizeResponse:
@api_view(['GET']) @api_view(['GET'])
def authorize(request, nas_id, username, mac_address): def authorize(request, nas_id, username, mac_address):
"""Return objects the radius need for the Authorize step
Parameters:
nas_id (string): NAS name or ipv4
username (string): username of the user who is trying to connect
mac_address (string): mac address of the device which is trying to connect
Return:
AuthorizeResponse: contains all the informations
"""
nas_interface = Interface.objects.filter( nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id)) Q(domain=Domain.objects.filter(name=nas_id))
@ -106,6 +118,8 @@ def post_auth(request, nas_id, nas_port, user_mac):
) )
# get port # get port
port = None
if nas_port and nas_port != "None":
port_number = nas_port.split(".")[0].split("/")[-1][-2:] port_number = nas_port.split(".")[0].split("/")[-1][-2:]
port = Port.objects.filter(switch=switch, port=port_number).first() port = Port.objects.filter(switch=switch, port=port_number).first()
@ -139,3 +153,36 @@ def post_auth(request, nas_id, nas_port, user_mac):
PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE)) PostAuthResponse(nas_type, room_users, port, port_profile, switch, user_interface, radius_option, EMAIL_STATE_UNVERIFIED, RADIUS_OPTION_REJECT, USER_STATE_ACTIVE))
return Response(data=serialized.data) return Response(data=serialized.data)
@api_view(['GET'])
def autoregister_machine(request, nas_id, username, mac_address):
nas_interface = Interface.objects.filter(
Q(domain=Domain.objects.filter(name=nas_id))
| Q(ipv4=IpList.objects.filter(ipv4=nas_id))
).first()
nas_type = None
if nas_interface:
nas_type = Nas.objects.filter(
nas_type=nas_interface.machine_type).first()
user = User.objects.filter(pseudo__iexact=username).first()
result, reason = user.autoregister_machine(mac_address, nas_type)
if result:
return Response(data=reason)
return Response(reason, status=400)
@api_view(['GET'])
def assign_ip(request, mac_address):
interface = (
Interface.objects.filter(mac_address=mac_address)
.first()
)
try:
interface.assign_ipv4()
return Response()
except ValidationError as err:
return Response(err.message, status=400)