8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-12-23 15:33:45 +00:00
re2o/users/models.py
Hugo Levy-Falk 697d1ef7aa feat: Move LDAP to an optional app.
The Entire LDAP infrastructures now relies on signals rather than direct function calls and is in its own app. This means it can be deactivated, but also that we can easily plug new services in addition to LDAP, such as OAuth.

Closes issue #270
2021-01-11 21:53:49 +01:00

2793 lines
92 KiB
Python
Executable file

# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au Rézo Metz.
# Il se veut agnostique au réseau considéré, de manière à être installable
# en quelques clics.
#
# Copyright © 2017-2020 Gabriel Détraz
# Copyright © 2017-2020 Lara Kermarec
# Copyright © 2017-2020 Augustin Lemesle
# Copyright © 2017-2020 Hugo Levy--Falk
# Copyright © 2017-2020 Jean-Romain Garnier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
The database models for the 'users' app of re2o.
The goal is to keep the main actions here, i.e. the 'clean' and 'save'
function are higly reposnsible for the changes, checking the coherence of the
data and the good behaviour in general for not breaking the database.
For further details on each of those models, see the documentation details for
each.
Here are defined the following django models :
* Users : Adherent and Club (which inherit from Base User Abstract of django).
* Whitelists
* Bans
* Schools (teaching structures)
* Rights (Groups and ListRight)
* ServiceUser (for ldap connexions)
"""
from __future__ import unicode_literals
import re
import uuid
import datetime
import sys
from django.db import models
from django.db.models import Q
from django import forms
from django.forms import ValidationError
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from django.utils.functional import cached_property
from django.template import loader
from django.core.urlresolvers import reverse
from django.db import transaction
from django.utils import timezone
from datetime import timedelta
from django.contrib.auth.models import (
AbstractBaseUser,
BaseUserManager,
PermissionsMixin,
Group,
)
from django.core.validators import RegexValidator
import traceback
from django.utils.translation import ugettext_lazy as _
from django.core.files.uploadedfile import InMemoryUploadedFile
from reversion import revisions as reversion
from re2o.settings import LDAP, GID_RANGES, UID_RANGES
from re2o.field_permissions import FieldPermissionModelMixin
from re2o.mixins import AclMixin, RevMixin
from re2o.base import smtp_check
from re2o.mail_utils import send_mail
from cotisations.models import Cotisation, Facture, Paiement, Vente
from machines.models import Domain, Interface, Machine, regen
from preferences.models import GeneralOption, AssoOption, OptionalUser
from preferences.models import OptionalMachine, MailMessageOption
from users import signals
from PIL import Image
from io import BytesIO
import sys
# General utilities
def linux_user_check(login):
"""Check if a login comply with unix base login policy
Parameters:
login (string): Login to check
Returns:
boolean: True if login comply with policy
"""
UNIX_LOGIN_PATTERN = re.compile("^[a-z][a-z0-9-]*[$]?$")
return UNIX_LOGIN_PATTERN.match(login)
def linux_user_validator(login):
"""Check if a login comply with unix base login policy, returns
a standard Django ValidationError if login is not correct
Parameters:
login (string): Login to check
Returns:
ValidationError if login comply with policy
"""
if not linux_user_check(login):
raise forms.ValidationError(
_("The username \"%(label)s\" contains forbidden characters."),
params={"label": login},
)
def get_fresh_user_uid():
"""Return a fresh unused uid.
Returns:
uid (int): The fresh uid available
"""
uids = list(range(int(min(UID_RANGES["users"])), int(max(UID_RANGES["users"]))))
try:
used_uids = list(User.objects.values_list("uid_number", flat=True))
except:
used_uids = []
free_uids = [id for id in uids if id not in used_uids]
return min(free_uids)
def get_fresh_gid():
"""Return a fresh unused gid.
Returns:
uid (int): The fresh gid available
"""
gids = list(range(int(min(GID_RANGES["posix"])), int(max(GID_RANGES["posix"]))))
used_gids = list(ListRight.objects.values_list("gid", flat=True))
free_gids = [id for id in gids if id not in used_gids]
return min(free_gids)
class UserManager(BaseUserManager):
"""User manager basique de django"""
def _create_user(self, pseudo, surname, email, password=None, su=False):
if not pseudo:
raise ValueError(_("Users must have an username."))
if not linux_user_check(pseudo):
raise ValueError(_("Username should only contain [a-z0-9-]."))
user = Adherent(
pseudo=pseudo,
surname=surname,
name=surname,
email=self.normalize_email(email),
)
user.set_password(password)
user.confirm_mail()
if su:
user.is_superuser = True
user.save(using=self._db)
return user
def create_user(self, pseudo, surname, email, password=None):
"""
Creates and saves a User with the given pseudo, name, surname, email,
and password.
"""
return self._create_user(pseudo, surname, email, password, False)
def create_superuser(self, pseudo, surname, email, password):
"""
Creates and saves a superuser with the given pseudo, name, surname,
email, and password.
"""
return self._create_user(pseudo, surname, email, password, True)
class User(
RevMixin, FieldPermissionModelMixin, AbstractBaseUser, PermissionsMixin, AclMixin
):
"""Base re2o User model
Attributes:
surname: surname of the user
pseudo: login of the user
email: The main email of the user
local_email_redirect: Option for redirection of all emails to the main email
local_email_enabled: If True, enable a local email account
school: Optional field, the school of the user
shell: User shell linux
comment: Optionnal comment field
pwd_ntlm: Hash password in ntlm for freeradius
state: State of the user, can be active, not yet active, etc (see below)
email_state: State of the main email (if confirmed or not)
registered: Date of initial creation
telephone: Phone number
uid_number: Linux uid of this user
legacy_uid: Optionnal legacy user id
shortcuts_enabled : Option for js shortcuts
email_change_date: Date of the last email change
"""
STATE_ACTIVE = 0
STATE_DISABLED = 1
STATE_ARCHIVE = 2
STATE_NOT_YET_ACTIVE = 3
STATE_FULL_ARCHIVE = 4
STATES = (
(0, _("Active")),
(1, _("Disabled")),
(2, _("Archived")),
(3, _("Not yet active")),
(4, _("Fully archived")),
)
EMAIL_STATE_VERIFIED = 0
EMAIL_STATE_UNVERIFIED = 1
EMAIL_STATE_PENDING = 2
EMAIL_STATES = (
(0, _("Confirmed")),
(1, _("Not confirmed")),
(2, _("Waiting for email confirmation")),
)
surname = models.CharField(max_length=255)
pseudo = models.CharField(
max_length=32,
unique=True,
help_text=_("Must only contain letters, numerals or dashes."),
validators=[linux_user_validator],
)
email = models.EmailField(
blank=True,
default="",
help_text=_("External email address allowing us to contact you."),
)
local_email_redirect = models.BooleanField(
default=False,
help_text=_(
"Enable redirection of the local email messages to the"
" main email address."
),
)
local_email_enabled = models.BooleanField(
default=False,
help_text=_("Enable the local email account.")
)
school = models.ForeignKey(
"School",
on_delete=models.PROTECT,
null=True,
blank=True,
help_text=_("Education institute.")
)
shell = models.ForeignKey(
"ListShell",
on_delete=models.PROTECT,
null=True,
blank=True,
help_text=_("Unix shell.")
)
comment = models.CharField(
help_text=_("Comment, school year."), max_length=255, blank=True
)
pwd_ntlm = models.CharField(max_length=255)
state = models.IntegerField(
choices=STATES,
default=STATE_NOT_YET_ACTIVE,
help_text=_("Account state.")
)
email_state = models.IntegerField(choices=EMAIL_STATES, default=EMAIL_STATE_PENDING)
registered = models.DateTimeField(auto_now_add=True)
telephone = models.CharField(max_length=15, blank=True, null=True)
uid_number = models.PositiveIntegerField(default=get_fresh_user_uid, unique=True)
legacy_uid = models.PositiveIntegerField(
unique=True,
blank=True,
null=True,
help_text=_("Optionnal legacy uid, for import and transition purpose")
)
shortcuts_enabled = models.BooleanField(
verbose_name=_("enable shortcuts on Re2o website"), default=True
)
email_change_date = models.DateTimeField(auto_now_add=True)
theme = models.CharField(max_length=255, default="default.css")
USERNAME_FIELD = "pseudo"
REQUIRED_FIELDS = ["surname", "email"]
objects = UserManager()
request = None
class Meta:
permissions = (
("change_user_password", _("Can change the password of a user")),
("change_user_state", _("Can edit the state of a user")),
("change_user_force", _("Can force the move")),
("change_user_shell", _("Can edit the shell of a user")),
("change_user_pseudo", _("Can edit the pseudo of a user")),
(
"change_user_groups",
_("Can edit the groups of rights of a user (critical permission)"),
),
("change_all_users", _("Can edit all users, including those with rights")),
("view_user", _("Can view a user object")),
)
verbose_name = _("user (member or club)")
verbose_name_plural = _("users (members or clubs)")
###### Shortcuts and methods for user instance ######
@cached_property
def name(self):
"""Shortcuts, returns name attribute if the user is linked with
an adherent instance.
Parameters:
self (user instance): user to return infos
Returns:
name (string): Name value if available
"""
if self.is_class_adherent:
return self.adherent.name
else:
return ""
@cached_property
def room(self):
"""Shortcuts, returns room attribute; unique for adherent
and multiple (queryset) for club.
Parameters:
self (user instance): user to return infos
Returns:
room (room instance): Room instance
"""
if self.is_class_adherent:
return self.adherent.room
elif self.is_class_club:
return self.club.room
else:
raise NotImplementedError(_("Unknown type."))
@cached_property
def get_mail_addresses(self):
"""Shortcuts, returns all local email address queryset only if local_email
global option is enabled.
Parameters:
self (user instance): user to return infos
Returns:
emailaddresse_set (queryset): All Email address of the local account
"""
if self.local_email_enabled:
return self.emailaddress_set.all()
return None
@cached_property
def get_mail(self):
"""Shortcuts, returns the email address to use to contact the instance user self.
Depends on if local_email account has been activated, otherwise returns self.email.
Parameters:
self (user instance): user to return infos
Returns:
email (string): The correct email to use
"""
if (
not OptionalUser.get_cached_value("local_email_accounts_enabled")
or not self.local_email_enabled
or self.local_email_redirect
):
return str(self.email)
else:
return str(self.emailaddress_set.get(local_part=self.pseudo.lower()))
@cached_property
def class_type(self):
"""Shortcuts, returns the class string "Adherent" of "Club", related with the
self instance.
Parameters:
self (user instance): user to return infos
Returns:
class (string): The class "Adherent" or "Club"
"""
if hasattr(self, "adherent"):
return "Adherent"
elif hasattr(self, "club"):
return "Club"
else:
raise NotImplementedError(_("Unknown type."))
@cached_property
def class_display(self):
"""Shortcuts, returns the pretty string "Member" of "Club", related with the
self instance.
Parameters:
self (user instance): user to return infos
Returns:
class (string): "Member" or "Club"
"""
if hasattr(self, "adherent"):
return _("Member")
elif hasattr(self, "club"):
return _("Club")
else:
raise NotImplementedError(_("Unknown type."))
@cached_property
def gid_number(self):
"""Shortcuts, returns the main and default gid for users,
from settings file
Parameters:
self (user instance): user to return infos
Returns:
gid (int): Default gid number
"""
return int(LDAP["user_gid"])
@cached_property
def gid(self):
"""Shortcuts, returns the main and default gid for users,
from settings file
Parameters:
self (user instance): user to return infos
Returns:
gid (int): Default gid number
"""
return LDAP["user_gid"]
@cached_property
def is_class_club(self):
"""Shortcuts, returns if the instance related with user is
a club.
Parameters:
self (user instance): user to return infos
Returns:
boolean : Returns true if this user is a club
"""
return hasattr(self, "club")
@cached_property
def is_class_adherent(self):
"""Shortcuts, returns if the instance related with user is
an adherent.
Parameters:
self (user instance): user to return infos
Returns:
boolean : Returns true if this user is an adherent
"""
return hasattr(self, "adherent")
@property
def is_active(self):
"""Shortcuts, used by django for allowing connection from this user.
Returns True if this user has state active, or not yet active,
or if preferences allows connection for archived users.
Parameters:
self (user instance): user to return infos
Returns:
boolean : Returns true if this user is allow to connect.
"""
allow_archived = OptionalUser.get_cached_value("allow_archived_connexion")
return (
self.state == self.STATE_ACTIVE
or self.state == self.STATE_NOT_YET_ACTIVE
or (
allow_archived
and self.state in (self.STATE_ARCHIVE, self.STATE_FULL_ARCHIVE)
)
)
@property
def is_staff(self):
"""Shortcuts, used by django for admin pannel access, shortcuts to
is_admin.
Parameters:
self (user instance): user to return infos
Returns:
boolean : Returns true if this user is_staff.
"""
return self.is_admin
@property
def is_admin(self):
"""Shortcuts, used by django for admin pannel access. Test if user
instance is_superuser or member of admin group.
Parameters:
self (user instance): user to return infos
Returns:
boolean : Returns true if this user is allow to access to admin pannel.
"""
admin, _ = Group.objects.get_or_create(name="admin")
return self.is_superuser or admin in self.groups.all()
def get_full_name(self):
"""Shortcuts, returns pretty full name to display both in case of user
is a club or an adherent.
Parameters:
self (user instance): user to return infos
Returns:
full_name (string) : Returns full name, name + surname.
"""
name = self.name
if name:
return "%s %s" % (name, self.surname)
else:
return self.surname
def get_short_name(self):
"""Shortcuts, returns short name to display both in case of user is
a club or an adherent.
Parameters:
self (user instance): user to return infos
Returns:
surname (string) : Returns surname.
"""
return self.surname
@property
def get_shell(self):
"""Shortcuts, returns linux user shell to use for this user if
provided, otherwise the default shell defined in preferences.
Parameters:
self (user instance): user to return infos
Returns:
shell (linux shell) : Returns linux shell.
"""
return self.shell or OptionalUser.get_cached_value("shell_default")
@cached_property
def home_directory(self):
"""Shortcuts, returns linux user home directory to use.
Parameters:
self (user instance): user to return infos
Returns:
home dir (string) : Returns home directory.
"""
return "/home/" + self.pseudo
@cached_property
def get_shadow_expire(self):
"""Shortcuts, returns the shadow expire value : 0 if this account is
disabled or if the email has not been verified to block the account
access.
Parameters:
self (user instance): user to return infos
Returns:
shadow_expire (int) : Shadow expire value.
"""
if self.state == self.STATE_DISABLED or self.email_state == self.EMAIL_STATE_UNVERIFIED:
return str(0)
else:
return None
@cached_property
def solde(self):
"""Shortcuts, calculate and returns the balance for this user, as a
dynamic balance beetween debiti (-) and credit (+) "Vente" objects
flaged as balance operations.
Parameters:
self (user instance): user to return infos
Returns:
solde (float) : The balance of the user.
"""
solde_objects = Paiement.objects.filter(is_balance=True)
somme_debit = (
Vente.objects.filter(
facture__in=Facture.objects.filter(
user=self, paiement__in=solde_objects, valid=True
)
).aggregate(
total=models.Sum(
models.F("prix") * models.F("number"),
output_field=models.DecimalField(),
)
)[
"total"
]
or 0
)
somme_credit = (
Vente.objects.filter(
facture__in=Facture.objects.filter(user=self, valid=True), name="solde"
).aggregate(
total=models.Sum(
models.F("prix") * models.F("number"),
output_field=models.DecimalField(),
)
)[
"total"
]
or 0
)
return somme_credit - somme_debit
@cached_property
def email_address(self):
"""Shortcuts, returns all the email addresses (queryset) associated
with the local account, if the account has been activated,
otherwise return a none queryset.
Parameters:
self (user instance): user to return infos
Returns:
email_address (django queryset) : Returns a queryset containing
EMailAddress of this user.
"""
if (
OptionalUser.get_cached_value("local_email_accounts_enabled")
and self.local_email_enabled
):
return self.emailaddress_set.all()
return EMailAddress.objects.none()
def end_adhesion(self):
"""Methods, calculate and returns the end of membership value date of
this user with aggregation of Cotisation objects linked to user
instance.
Parameters:
self (user instance): user to return infos
Returns:
end_adhesion (date) : Date of the end of the membership.
"""
date_max = (
Cotisation.objects.filter(
vente__in=Vente.objects.filter(
facture__in=Facture.objects.filter(user=self).exclude(valid=False)
)
)
.aggregate(models.Max("date_end_memb"))["date_end_memb__max"]
)
return date_max
def end_connexion(self):
"""Methods, calculate and returns the end of connection subscription value date
of this user with aggregation of Cotisation objects linked to user instance.
Parameters:
self (user instance): user to return infos
Returns:
end_adhesion (date) : Date of the end of the connection subscription.
"""
date_max = (
Cotisation.objects.filter(
vente__in=Vente.objects.filter(
facture__in=Facture.objects.filter(user=self).exclude(valid=False)
)
)
.aggregate(models.Max("date_end_con"))["date_end_con__max"]
)
return date_max
def is_adherent(self):
"""Methods, calculate and returns if the user has a valid membership by testing
if end_adherent is after now or not.
Parameters:
self (user instance): user to return infos
Returns:
is_adherent (boolean) : True is user has a valid membership.
"""
end = self.end_adhesion()
if not end:
return False
elif end < timezone.now():
return False
else:
return True
# it looks wrong, we should check if there is a cotisation where
# were date_start_memb < timezone.now() < date_end_memb,
# in case the user purshased a cotisation starting in the futur
# somehow
def is_connected(self):
"""Methods, calculate and returns if the user has a valid membership AND a
valid connection subscription by testing if end_connexion is after now or not.
If true, returns is_adherent() method value.
Parameters:
self (user instance): user to return infos
Returns:
is_connected (boolean) : True is user has a valid membership and a valid connexion.
"""
end = self.end_connexion()
if not end:
return False
elif end < timezone.now():
return False
else:
return self.is_adherent()
# it looks wrong, we should check if there is a cotisation where
# were date_start_con < timezone.now() < date_end_con,
# in case the user purshased a cotisation starting in the futur
# somehow
def end_ban(self):
"""Methods, calculate and returns the end of a ban value date
of this user with aggregation of ban objects linked to user instance.
Parameters:
self (user instance): user to return infos
Returns:
end_ban (date) : Date of the end of the bans objects.
"""
date_max = Ban.objects.filter(user=self).aggregate(models.Max("date_end"))[
"date_end__max"
]
return date_max
def end_whitelist(self):
"""Methods, calculate and returns the end of a whitelist value date
of this user with aggregation of whitelists objects linked to user instance.
Parameters:
self (user instance): user to return infos
Returns:
end_whitelist (date) : Date of the end of the whitelists objects.
"""
date_max = Whitelist.objects.filter(user=self).aggregate(
models.Max("date_end")
)["date_end__max"]
return date_max
def is_ban(self):
"""Methods, calculate and returns if the user is banned by testing
if end_ban is after now or not.
parameters:
self (user instance): user to return infos
returns:
is_ban (boolean) : true if user is under a ban sanction decision.
"""
end = self.end_ban()
if not end:
return False
elif end < timezone.now():
return False
else:
return True
def is_whitelisted(self):
"""Methods, calculate and returns if the user has a whitelist free connection
if end_whitelist is after now or not.
parameters:
self (user instance): user to return infos
returns:
is_whitelisted (boolean) : true if user has a whitelist connection.
"""
end = self.end_whitelist()
if not end:
return False
elif end < timezone.now():
return False
else:
return True
def has_access(self):
"""Methods, returns if the user has an internet access.
Return True if user is active and has a verified email, is not under a ban
decision and has a valid membership and connection or a whitelist.
parameters:
self (user instance): user to return infos
returns:
has_access (boolean) : true if user has an internet connection.
"""
return (
self.state == User.STATE_ACTIVE
and self.email_state != User.EMAIL_STATE_UNVERIFIED
and not self.is_ban()
and (self.is_connected() or self.is_whitelisted())
) or self == AssoOption.get_cached_value("utilisateur_asso")
def end_access(self):
"""Methods, returns the date of the end of the connection for this user,
as the maximum date beetween connection (membership objects) and whitelists.
parameters:
self (user instance): user to return infos
returns:
end_access (datetime) : Returns the date of the end_access connection.
"""
if not self.end_connexion():
if not self.end_whitelist():
return None
else:
return self.end_whitelist()
else:
if not self.end_whitelist():
return self.end_connexion()
else:
return max(self.end_connexion(), self.end_whitelist())
@classmethod
def users_interfaces(cls, users, active=True, all_interfaces=False):
"""Class method, returns all interfaces related/belonging to users
contained in query_sert "users".
Parameters:
users (list of users queryset): users which interfaces
have to be returned
active (boolean): If true, filter on interfaces
all_interfaces (boolean): If true, returns all interfaces
returns:
interfaces (queryset): Queryset of interfaces instances
"""
if all_interfaces:
return Interface.objects.filter(
machine__in=Machine.objects.filter(user__in=users)
).select_related("domain__extension")
else:
return Interface.objects.filter(
machine__in=Machine.objects.filter(user__in=users, active=active)
).select_related("domain__extension")
def user_interfaces(self, active=True, all_interfaces=False):
"""Method, returns all interfaces related/belonging to an user.
Parameters:
self (user instance): user which interfaces
have to be returned
active (boolean): If true, filter on interfaces
all_interfaces (boolean): If true, returns all interfaces
returns:
interfaces (queryset): Queryset of interfaces instances
"""
return self.users_interfaces(
[self], active=active, all_interfaces=all_interfaces
)
###### Methods and user edition functions, modify user attributes ######
def set_active(self):
"""Method, make this user active. Called in post-saved of subscription,
set the state value active if state is not_yet_active, with
a valid membership.
Also make an archived user fully active.
Parameters:
self (user instance): user to set active
"""
if self.state == self.STATE_NOT_YET_ACTIVE:
# Look for ventes with non 0 subscription duration in the invoices set
not_zero = self.facture_set.filter(valid=True).exclude(Q(vente__duration_membership=0)).exists()
days_not_zero = self.facture_set.filter(valid=True).exclude(Q(vente__duration_days_membership=0)).exists()
if(not_zero or days_not_zero\
or OptionalUser.get_cached_value("all_users_active")):
self.state = self.STATE_ACTIVE
self.save()
if self.state == self.STATE_ARCHIVE or self.state == self.STATE_FULL_ARCHIVE:
self.unarchive()
self.state = self.STATE_ACTIVE
self.save()
def set_password(self, password):
"""Method, overload the basic set_password inherited from django BaseUser.
Called when setting a new password, to set the classic django password
hashed, and also the NTLM hashed pwd_ntlm password.
Parameters:
self (user instance): user to set password
password (string): new password (cleatext) to set.
"""
from re2o.login import hashNT
super().set_password(password)
self.pwd_ntlm = hashNT(password)
return
def confirm_mail(self):
"""Method, set the email_state to VERIFIED when the email has been verified.
Parameters:
self (user instance): user to set password
"""
self.email_state = self.EMAIL_STATE_VERIFIED
def assign_ips(self):
"""Method, assigns ipv4 to all interfaces related to a user.
Parameters:
self (user instance): user which interfaces have to be assigned
"""
interfaces = self.user_interfaces()
with transaction.atomic(), reversion.create_revision():
Interface.mass_assign_ipv4(interfaces)
reversion.set_comment("IPv4 assignment")
def unassign_ips(self):
"""Method, unassigns and remove ipv4 to all interfaces related to a user.
(set ipv4 field to null)
Parameters:
self (user instance): user which interfaces have to be assigned
"""
interfaces = self.user_interfaces()
with transaction.atomic(), reversion.create_revision():
Interface.mass_unassign_ipv4(interfaces)
reversion.set_comment("IPv4 unassignment")
@classmethod
def mass_unassign_ips(cls, users_list):
"""Class method, unassigns and remove ipv4 to all interfaces related
to a list of users.
Parameters:
users_list (list of users or queryset): users which interfaces
have to be unassigned
"""
interfaces = cls.users_interfaces(users_list)
with transaction.atomic(), reversion.create_revision():
Interface.mass_unassign_ipv4(interfaces)
reversion.set_comment("IPv4 assignment")
def disable_email(self):
"""Method, disable email account and email redirection for
an user.
Parameters:
self (user instance): user to disabled email.
"""
self.local_email_enabled = False
self.local_email_redirect = False
@classmethod
def mass_disable_email(cls, queryset_users):
"""Class method, disable email accounts and email redirection for
a list of users (or queryset).
Parameters:
users_list (list of users or queryset): users which email
account to disable.
"""
queryset_users.update(local_email_enabled=False)
queryset_users.update(local_email_redirect=False)
def delete_data(self):
"""Method, delete non mandatory data, delete machine,
and disable email accounts for a list of users (or queryset).
Called during full archive process.
Parameters:
self (user instance): user to delete data.
"""
self.disable_email()
self.machine_set.all().delete()
@classmethod
def mass_delete_data(cls, queryset_users):
"""Class method, delete non mandatory data, delete machine
and disable email accounts for a list of users (or queryset).
Called during full archive process.
Parameters:
users_list (list of users or queryset): users to perform
delete data.
"""
cls.mass_disable_email(queryset_users)
Machine.mass_delete(Machine.objects.filter(user__in=queryset_users))
signals.remove_mass.send(sender=cls, queryset=queryset_users)
def archive(self):
"""Method, archive user by unassigning ips.
Parameters:
self (user instance): user to archive.
"""
self.unassign_ips()
@classmethod
def mass_archive(cls, users_list):
"""Class method, mass archive a queryset of users.
Called during archive process, unassign ip and set to
archive state.
Parameters:
users_list (list of users queryset): users to perform
mass archive.
"""
# Force eval of queryset
bool(users_list)
users_list = users_list.all()
cls.mass_unassign_ips(users_list)
users_list.update(state=User.STATE_ARCHIVE)
def full_archive(self):
"""Method, full archive an user by unassigning ips, deleting data
and authentication deletion.
Parameters:
self (user instance): user to full archive.
"""
self.archive()
self.delete_data()
signals.remove.send(sender=User, instance=self)
@classmethod
def mass_full_archive(cls, users_list):
"""Class method, mass full archive a queryset of users.
Called during full archive process, unassign ip, delete
non mandatory data and set to full archive state.
Parameters:
users_list (list of users queryset): users to perform
mass full archive.
"""
# Force eval of queryset
bool(users_list)
users_list = users_list.all()
cls.mass_unassign_ips(users_list)
cls.mass_delete_data(users_list)
users_list.update(state=User.STATE_FULL_ARCHIVE)
def unarchive(self):
"""Method, unarchive an user by assigning ips, and recreating
authentication user associated.
Parameters:
self (user instance): user to unarchive.
"""
self.assign_ips()
signals.synchronise.send(sender=self.__class__, instance=self)
def state_sync(self):
"""Master Method, call unarchive, full_archive or archive method
on an user when state is changed, based on previous state.
Parameters:
self (user instance): user to sync state.
"""
if (
self.__original_state != self.STATE_ACTIVE
and self.state == self.STATE_ACTIVE
):
self.unarchive()
elif (
self.__original_state != self.STATE_ARCHIVE
and self.state == self.STATE_ARCHIVE
):
self.archive()
elif (
self.__original_state != self.STATE_FULL_ARCHIVE
and self.state == self.STATE_FULL_ARCHIVE
):
self.full_archive()
###### Send mail functions ######
def notif_inscription(self, request=None):
"""Method/function, send an email 'welcome' to user instance, after
successfull register.
Parameters:
self (user instance): user to send the welcome email
request (optional request): Specify request
Returns:
email: Welcome email after user register
"""
template = loader.get_template("users/email_welcome")
mailmessageoptions, _created = MailMessageOption.objects.get_or_create()
context = {
"nom": self.get_full_name(),
"asso_name": AssoOption.get_cached_value("name"),
"asso_email": AssoOption.get_cached_value("contact"),
"welcome_mail_fr": mailmessageoptions.welcome_mail_fr,
"welcome_mail_en": mailmessageoptions.welcome_mail_en,
"pseudo": self.pseudo,
}
send_mail(
request,
"Bienvenue au %(name)s / Welcome to %(name)s"
% {"name": AssoOption.get_cached_value("name")},
"",
GeneralOption.get_cached_value("email_from"),
[self.email],
html_message=template.render(context),
)
def reset_passwd_mail(self, request):
"""Method/function, makes a Request class instance, and send
an email to user instance for password change in case of initial
password set or forget password form.
Parameters:
self (user instance): user to send the welcome email
request: Specify request, mandatory to build the reset link
Returns:
email: Reset password email for user instance
"""
req = Request()
req.type = Request.PASSWD
req.user = self
req.save()
template = loader.get_template("users/email_passwd_request")
context = {
"name": req.user.get_full_name(),
"asso": AssoOption.get_cached_value("name"),
"asso_mail": AssoOption.get_cached_value("contact"),
"site_name": GeneralOption.get_cached_value("site_name"),
"url": request.build_absolute_uri(
reverse("users:process", kwargs={"token": req.token})
),
"expire_in": str(GeneralOption.get_cached_value("req_expire_hrs")),
}
send_mail(
request,
"Changement de mot de passe de %(name)s / Password change for "
"%(name)s" % {"name": AssoOption.get_cached_value("name")},
template.render(context),
GeneralOption.get_cached_value("email_from"),
[req.user.email],
fail_silently=False,
)
def send_confirm_email_if_necessary(self, request):
"""Method/function, check if a confirmation by email is needed,
and trigger send.
* If the user changed email, it needs to be confirmed
* If they're not fully archived, send a confirmation email
Parameters:
self (user instance): user to send the confirmation email
request: Specify request, mandatory to build the reset link
Returns:
boolean: True if a confirmation of the mail is needed
"""
# Only update the state if the email changed
if self.__original_email == self.email:
return False
# If the user was previously in the PENDING or UNVERIFIED state,
# we can't update email_change_date otherwise it would push back
# their due date
# However, if the user is in the VERIFIED state, we reset the date
if self.email_state == self.EMAIL_STATE_VERIFIED:
self.email_change_date = timezone.now()
# Remember that the user needs to confirm their email address again
self.email_state = self.EMAIL_STATE_PENDING
self.save()
# Fully archived users shouldn't get an email, so stop here
if self.state == self.STATE_FULL_ARCHIVE:
return False
# Send the email
self.confirm_email_address_mail(request)
return True
def trigger_email_changed_state(self, request):
"""Method/function, update the value of the last email change,
and call and send the confirm email link.
Function called only after a manual of email_state by an admin.
Parameters:
self (user instance): user to send the confirmation email
request: Specify request, mandatory to build the reset link
Returns:
boolean: True if a confirmation of the mail is needed
"""
if self.email_state == self.EMAIL_STATE_VERIFIED:
return False
self.email_change_date = timezone.now()
self.save()
self.confirm_email_address_mail(request)
return True
def confirm_email_before_date(self):
"""Method/function, calculate the maximum date for confirmation
of the new email address
Parameters:
self (user instance): user to calculate maximum date
for confirmation
Returns:
date: Date of the maximum time to perform email confirmation
"""
if self.email_state == self.EMAIL_STATE_VERIFIED:
return None
days = OptionalUser.get_cached_value("disable_emailnotyetconfirmed")
return self.email_change_date + timedelta(days=days)
def confirm_email_address_mail(self, request):
"""Method/function, makes a Request class instance, and send
an email to user instance to confirm a new email address.
* If the user changed email, it needs to be confirmed
* If they're not fully archived, send a confirmation email
Parameters:
self (user instance): user to send the confirmation email
request: Specify request, mandatory to build the reset link
Returns:
email: An email with a link to confirm the new email address
"""
# Delete all older requests for this user, that aren't for this email
filter = Q(user=self) & Q(type=Request.EMAIL) & ~Q(email=self.email)
Request.objects.filter(filter).delete()
# Create the request and send the email
req = Request()
req.type = Request.EMAIL
req.user = self
req.email = self.email
req.save()
template = loader.get_template("users/email_confirmation_request")
context = {
"name": req.user.get_full_name(),
"asso": AssoOption.get_cached_value("name"),
"asso_mail": AssoOption.get_cached_value("contact"),
"site_name": GeneralOption.get_cached_value("site_name"),
"url": request.build_absolute_uri(
reverse("users:process", kwargs={"token": req.token})
),
"expire_in": str(GeneralOption.get_cached_value("req_expire_hrs")),
"confirm_before_fr": self.confirm_email_before_date().strftime("%d/%m/%Y"),
"confirm_before_en": self.confirm_email_before_date().strftime("%Y-%m-%d"),
}
send_mail(
request,
"Confirmation du mail de %(name)s / Email confirmation for "
"%(name)s" % {"name": AssoOption.get_cached_value("name")},
template.render(context),
GeneralOption.get_cached_value("email_from"),
[req.user.email],
fail_silently=False,
)
return
def autoregister_machine(self, mac_address, nas_type, request=None):
"""Function, register a new interface on the user instance account.
Called automaticaly mainly by freeradius python backend, for autoregister.
Parameters:
self (user instance): user to register new interface
mac_address (string): New mac address to add on the new interface
nas_type (Django Nas object instance): The nas object calling
request: Optional django request
Returns:
interface (Interface instance): The new interface registered
"""
allowed, _message, _rights = Machine.can_create(self, self.id)
if not allowed:
return False, _("Maximum number of registered machines reached.")
if not nas_type:
return False, _("Re2o doesn't know wich machine type to assign.")
machine_type_cible = nas_type.machine_type
try:
machine_parent = Machine()
machine_parent.user = self
interface_cible = Interface()
interface_cible.mac_address = mac_address
interface_cible.machine_type = machine_type_cible
interface_cible.clean()
machine_parent.clean()
domain = Domain()
domain.name = self.get_next_domain_name()
domain.interface_parent = interface_cible
domain.clean()
machine_parent.save()
interface_cible.machine = machine_parent
interface_cible.save()
domain.interface_parent = interface_cible
domain.clean()
domain.save()
self.notif_auto_newmachine(interface_cible)
except Exception as error:
return False, traceback.format_exc()
return interface_cible, _("OK")
def notif_auto_newmachine(self, interface):
"""Function/method, send an email to notify the new interface
registered on user instance account.
Parameters:
self (user instance): user to notify new registration
interface (interface instance): new interface registered
Returns:
boolean: True if a confirmation of the mail is needed
"""
template = loader.get_template("users/email_auto_newmachine")
context = {
"nom": self.get_full_name(),
"mac_address": interface.mac_address,
"asso_name": AssoOption.get_cached_value("name"),
"interface_name": interface.domain,
"asso_email": AssoOption.get_cached_value("contact"),
"pseudo": self.pseudo,
}
send_mail(
None,
"Ajout automatique d'une machine / New machine autoregistered",
"",
GeneralOption.get_cached_value("email_from"),
[self.email],
html_message=template.render(context),
)
return
def notif_disable(self, request=None):
"""Function/method, send an email to notify that the account is disabled
in case of unconfirmed email address.
Parameters:
self (user instance): user to notif disabled decision
request (django request): request to build email
Returns:
email: Notification email
"""
template = loader.get_template("users/email_disable_notif")
context = {
"name": self.get_full_name(),
"asso_name": AssoOption.get_cached_value("name"),
"asso_email": AssoOption.get_cached_value("contact"),
"site_name": GeneralOption.get_cached_value("site_name"),
}
send_mail(
request,
"Suspension automatique / Automatic suspension",
template.render(context),
GeneralOption.get_cached_value("email_from"),
[self.email],
fail_silently=False,
)
return
def get_next_domain_name(self):
"""Function/method, provide a unique name for a new interface.
Parameters:
self (user instance): user to get a new domain name
Returns:
domain name (string): String of new domain name
"""
def simple_pseudo():
"""Renvoie le pseudo sans underscore (compat dns)"""
return self.pseudo.replace("_", "-").lower()
def composed_pseudo(name):
"""Renvoie le resultat de simplepseudo et rajoute le nom"""
return simple_pseudo() + str(name)
num = 0
while Domain.objects.filter(name=composed_pseudo(num)):
num += 1
return composed_pseudo(num)
def can_edit(self, user_request, *_args, **_kwargs):
"""Check if a user can edit a user object.
:param self: The user which is to be edited.
:param user_request: The user who requests to edit self.
:return: a message and a boolean which is True if self is a club and
user_request one of its member, or if user_request is self, or if
user_request has the 'cableur' right.
"""
if self.state in (self.STATE_ARCHIVE, self.STATE_FULL_ARCHIVE):
warning_message = _("This user is archived.")
else:
warning_message = None
if self.is_class_club and user_request.is_class_adherent:
if (
self == user_request
or user_request.has_perm("users.change_user")
or user_request.adherent in self.club.administrators.all()
):
return True, warning_message, None
else:
return (
False,
_("You don't have the right to edit this club."),
("users.change_user",),
)
else:
if self == user_request:
return True, warning_message, None
elif user_request.has_perm("users.change_all_users"):
return True, warning_message, None
elif user_request.has_perm("users.change_user"):
if self.groups.filter(listright__critical=True):
return (
False,
_("User with critical rights, can't be edited."),
("users.change_all_users",),
)
elif self == AssoOption.get_cached_value("utilisateur_asso"):
return (
False,
_(
"Impossible to edit the organisation's"
" user without the \"change_all_users\" right."
),
("users.change_all_users",),
)
else:
return True, warning_message, None
elif user_request.has_perm("users.change_all_users"):
return True, warning_message, None
else:
return (
False,
_("You don't have the right to edit another user."),
("users.change_user", "users.change_all_users"),
)
def can_change_password(self, user_request, *_args, **_kwargs):
"""Check if a user can change a user's password
:param self: The user which is to be edited
:param user_request: The user who request to edit self
:returns: a message and a boolean which is True if self is a club
and user_request one of it's admins, or if user_request is self,
or if user_request has the right to change other's password
"""
if self.is_class_club and user_request.is_class_adherent:
if (
self == user_request
or user_request.has_perm("users.change_user_password")
or user_request.adherent in self.club.administrators.all()
):
return True, None, None
else:
return (
False,
_("You don't have the right to edit this club."),
("users.change_user_password",),
)
else:
if self == user_request or user_request.has_perm(
"users.change_user_groups"
):
# Peut éditer les groupes d'un user,
# c'est un privilège élevé, True
return True, None, None
elif user_request.has_perm("users.change_user") and not self.groups.all():
return True, None, None
else:
return (
False,
_("You don't have the right to edit another user."),
("users.change_user_groups", "users.change_user"),
)
def check_selfpasswd(self, user_request, *_args, **_kwargs):
""" Returns (True, None, None) if user_request is self, else returns
(False, None, None)
"""
return user_request == self, None, None
def can_change_room(self, user_request, *_args, **_kwargs):
""" Check if a user can change a room
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a state
"""
if not (
(
self.pk == user_request.pk
and OptionalUser.get_cached_value("self_room_policy") != OptionalUser.DISABLED
)
or user_request.has_perm("users.change_user")
):
return (
False,
_("You don't have the right to change the room."),
("users.change_user",),
)
else:
return True, None, None
@staticmethod
def can_change_state(user_request, *_args, **_kwargs):
""" Check if a user can change a state
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a state
"""
can = user_request.has_perm("users.change_user_state")
return (
can,
_("You don't have the right to change the state.") if not can else None,
("users.change_user_state",),
)
def can_change_shell(self, user_request, *_args, **_kwargs):
""" Check if a user can change a shell
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a shell
"""
if not (
(
self.pk == user_request.pk
and OptionalUser.get_cached_value("self_change_shell")
)
or user_request.has_perm("users.change_user_shell")
):
return (
False,
_("You don't have the right to change the shell."),
("users.change_user_shell",),
)
else:
return True, None, None
def can_change_pseudo(self, user_request, *_args, **_kwargs):
""" Check if a user can change a pseudo
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a shell
"""
if not (
(
self.pk == user_request.pk
and OptionalUser.get_cached_value("self_change_pseudo")
)
or user_request.has_perm("users.change_user_pseudo")
or not self.pk
):
return (
False,
_("You don't have the right to change the pseudo."),
("users.change_user_pseudo",),
)
else:
return True, None, None
@staticmethod
def can_change_local_email_redirect(user_request, *_args, **_kwargs):
""" Check if a user can change local_email_redirect.
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a redirection
"""
can = OptionalUser.get_cached_value("local_email_accounts_enabled")
return (
can,
_("Local email accounts must be enabled.") if not can else None,
None,
)
@staticmethod
def can_change_local_email_enabled(user_request, *_args, **_kwargs):
""" Check if a user can change internal address.
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change internal address
"""
can = OptionalUser.get_cached_value("local_email_accounts_enabled")
return (
can,
_("Local email accounts must be enabled.") if not can else None,
None,
)
@staticmethod
def can_change_force(user_request, *_args, **_kwargs):
""" Check if a user can change a force
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a force
"""
can = user_request.has_perm("users.change_user_force")
return (
can,
_("You don't have the right to force the move.") if not can else None,
("users.change_user_force",),
)
@staticmethod
def can_change_groups(user_request, *_args, **_kwargs):
""" Check if a user can change a group
:param user_request: The user who request
:returns: a message and a boolean which is True if the user has
the right to change a group
"""
can = user_request.has_perm("users.change_user_groups")
return (
can,
_("You don't have the right to edit the user's groups of rights.")
if not can
else None,
("users.change_user_groups"),
)
@staticmethod
def can_change_is_superuser(user_request, *_args, **_kwargs):
""" Check if an user can change a is_superuser flag
:param user_request: The user who request
:returns: a message and a boolean which is True if permission is granted.
"""
can = user_request.is_superuser
return (
can,
_("\"superuser\" right required to edit the superuser flag.")
if not can
else None,
[],
)
def can_view(self, user_request, *_args, **_kwargs):
"""Check if an user can view an user object.
:param self: The targeted user.
:param user_request: The user who ask for viewing the target.
:return: A boolean telling if the acces is granted and an explanation
text
"""
if self.is_class_club and user_request.is_class_adherent:
if (
self == user_request
or user_request.has_perm("users.view_user")
or user_request.adherent in self.club.administrators.all()
or user_request.adherent in self.club.members.all()
):
return True, None, None
else:
return (
False,
_("You don't have the right to view this club."),
("users.view_user",),
)
else:
if self == user_request or user_request.has_perm("users.view_user"):
return True, None, None
else:
return (
False,
_("You don't have the right to view another user."),
("users.view_user",),
)
@staticmethod
def can_view_all(user_request, *_args, **_kwargs):
"""Check if an user can access to the list of every user objects
:param user_request: The user who wants to view the list.
:return: True if the user can view the list and an explanation
message.
"""
can = user_request.has_perm("users.view_user")
return (
can,
_("You don't have the right to view the list of users.")
if not can
else None,
("users.view_user",),
)
def can_delete(self, user_request, *_args, **_kwargs):
"""Check if an user can delete an user object.
:param self: The user who is to be deleted.
:param user_request: The user who requests deletion.
:return: True if user_request has the right 'bureau', and a
message.
"""
can = user_request.has_perm("users.delete_user")
return (
can,
_("You don't have the right to delete this user.") if not can else None,
("users.delete_user",),
)
def __init__(self, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self.field_permissions = {
"shell": self.can_change_shell,
"pseudo": self.can_change_pseudo,
"force": self.can_change_force,
"selfpasswd": self.check_selfpasswd,
"local_email_redirect": self.can_change_local_email_redirect,
"local_email_enabled": self.can_change_local_email_enabled,
"room": self.can_change_room,
}
self.__original_state = self.state
self.__original_email = self.email
def clean_pseudo(self, *args, **kwargs):
"""Method, clean the pseudo value. The pseudo must be unique, but also
it must not already be used an an email address, so a check is performed.
Parameters:
self (user instance): user to clean pseudo value.
Returns:
Django ValidationError: if the pseudo value can not be used.
"""
if EMailAddress.objects.filter(local_part=self.pseudo.lower()).exclude(
user_id=self.id
):
raise ValidationError(_("This username is already used."))
def clean_email(self, *args, **kwargs):
"""Method, clean the email value.
Validate that:
* An email value has been provided; email field can't be nullified.
(the user must be reachable by email)
* The provided email is not a local email to avoid loops
* Set the email as lower.
Parameters:
self (user instance): user to clean email value.
Returns:
Django ValidationError: if the email value can not be used.
"""
is_created = not self.pk
if not self.email and (self.__original_email or is_created):
raise forms.ValidationError(
_("Email field cannot be empty.")
)
self.email = self.email.lower()
if OptionalUser.get_cached_value("local_email_domain") in self.email:
raise forms.ValidationError(
_("You can't use a {} address as an external contact address.").format(
OptionalUser.get_cached_value("local_email_domain")
)
)
def clean(self, *args, **kwargs):
"""Method, general clean for User model.
Clean pseudo and clean email.
Parameters:
self (user instance): user to clean.
"""
super(User, self).clean(*args, **kwargs)
self.clean_pseudo(*args, **kwargs)
self.clean_email(*args, **kwargs)
def __str__(self):
return self.pseudo
@property
def theme_name(self):
"""Return the theme without the extension
Returns:
str: name of theme
"""
return self.theme.split(".")[0]
class Adherent(User):
"""Base re2o Adherent model, inherit from User. Add other attributes.
Attributes:
name: name of the user
room: room of the user
gpg_fingerprint: The gpgfp of the user
"""
name = models.CharField(max_length=255)
room = models.OneToOneField(
"topologie.Room", on_delete=models.PROTECT, blank=True, null=True
)
gpg_fingerprint = models.CharField(max_length=49, blank=True, null=True)
class Meta(User.Meta):
verbose_name = _("member")
verbose_name_plural = _("members")
def format_gpgfp(self):
"""Method, format the gpgfp value, with blocks of 4 characters,
as AAAA BBBB instead of AAAABBBB.
Parameters:
self (user instance): user to clean gpgfp value.
"""
self.gpg_fingerprint = " ".join(
[
self.gpg_fingerprint[i : i + 4]
for i in range(0, len(self.gpg_fingerprint), 4)
]
)
def validate_gpgfp(self):
"""Method, clean the gpgfp value, validate if the raw entry is a valid gpg fp.
Parameters:
self (user instance): user to clean gpgfp check.
Returns:
Django ValidationError: if the gpgfp value is invalid.
"""
if self.gpg_fingerprint:
gpg_fingerprint = self.gpg_fingerprint.replace(" ", "").upper()
if not re.match("^[0-9A-F]{40}$", gpg_fingerprint):
raise ValidationError(
_("A GPG fingerprint must contain 40 hexadecimal characters.")
)
self.gpg_fingerprint = gpg_fingerprint
@classmethod
def get_instance(cls, object_id, *_args, **_kwargs):
"""Try to find an instance of `Adherent` with the given id.
:param object_id: The id of the adherent we are looking for.
:return: An adherent.
"""
return cls.objects.get(pk=adherentid)
@staticmethod
def can_create(user_request, *_args, **_kwargs):
"""Check if an user can create an user object.
:param user_request: The user who wants to create a user object.
:return: a message and a boolean which is True if the user can create
a user or if the `options.all_can_create` is set.
"""
if not user_request.is_authenticated:
if not OptionalUser.get_cached_value(
"self_adhesion"
):
return False, _("Self registration is disabled."), None
else:
return True, None, None
else:
if OptionalUser.get_cached_value("all_can_create_adherent"):
return True, None, None
else:
can = user_request.has_perm("users.add_user")
return (
can,
_("You don't have the right to create a user.")
if not can
else None,
("users.add_user",),
)
@classmethod
def can_list(cls, user_request, *_args, **_kwargs):
"""Users can list adherent only if they are :
- Members of view acl,
- Club administrator.
:param user_request: The user who wants to view the list.
:return: True if the user can view the list and an explanation
message.
"""
can, _message, _group = Club.can_view_all(user_request)
if user_request.has_perm("users.view_user") or can:
return (
True,
None,
None,
cls.objects.all()
)
else:
return (
True,
_("You don't have the right to list all adherents."),
("users.view_user",),
cls.objects.none(),
)
def clean(self, *args, **kwargs):
"""Method, clean and validate the gpgfp value.
Parameters:
self (user instance): user to perform clean.
"""
super(Adherent, self).clean(*args, **kwargs)
if self.gpg_fingerprint:
self.validate_gpgfp()
self.format_gpgfp()
class Club(User):
""" A class representing a club (it is considered as a user
with special informations)
Attributes:
administrators: administrators of the club
members: members of the club
room: room(s) of the club
mailing: Boolean, activate mailing list for this club.
"""
room = models.ForeignKey(
"topologie.Room", on_delete=models.PROTECT, blank=True, null=True
)
administrators = models.ManyToManyField(
blank=True, to="users.Adherent", related_name="club_administrator"
)
members = models.ManyToManyField(
blank=True, to="users.Adherent", related_name="club_members"
)
mailing = models.BooleanField(default=False)
class Meta(User.Meta):
verbose_name = _("club")
verbose_name_plural = _("clubs")
@staticmethod
def can_create(user_request, *_args, **_kwargs):
"""Check if an user can create an user object.
:param user_request: The user who wants to create a user object.
:return: a message and a boolean which is True if the user can create
an user or if the `options.all_can_create` is set.
"""
if not user_request.is_authenticated:
return False, _("You must be authenticated."), None
else:
if OptionalUser.get_cached_value("all_can_create_club"):
return True, None, None
else:
can = user_request.has_perm("users.add_user")
return (
can,
_("You don't have the right to create a club.")
if not can
else None,
("users.add_user",),
)
@staticmethod
def can_view_all(user_request, *_args, **_kwargs):
"""Check if an user can access to the list of every user objects
:param user_request: The user who wants to view the list.
:return: True if the user can view the list and an explanation
message.
"""
if user_request.has_perm("users.view_user"):
return True, None, None
if (
hasattr(user_request, "is_class_adherent")
and user_request.is_class_adherent
):
if (
user_request.adherent.club_administrator.all()
or user_request.adherent.club_members.all()
):
return True, None, None
return (
False,
_("You don't have the right to view the list of users."),
("users.view_user",),
)
@classmethod
def get_instance(cls, object_id, *_args, **_kwargs):
"""Try to find an instance of `Club` with the given id.
:param object_id: The id of the adherent we are looking for.
:return: A club.
"""
return cls.objects.get(pk=object_id)
@receiver(post_save, sender=Adherent)
@receiver(post_save, sender=Club)
@receiver(post_save, sender=User)
def user_post_save(**kwargs):
"""Django signal, post save operations on Adherent, Club and User.
Sync pseudo, sync authentication, create mailalias and send welcome email if needed
(new user)
"""
is_created = kwargs["created"]
user = kwargs["instance"]
EMailAddress.objects.get_or_create(local_part=user.pseudo.lower(), user=user)
if is_created:
user.notif_inscription(user.request)
user.set_active()
user.state_sync()
signals.synchronise.send(sender=User, instance=user, base=True, access_refresh=True, mac_refresh=False, group_refresh=True
)
regen("mailing")
@receiver(m2m_changed, sender=User.groups.through)
def user_group_relation_changed(**kwargs):
"""Django signal, used for User Groups change (related models).
Sync authentication, with calling group_refresh.
"""
action = kwargs["action"]
if action in ("post_add", "post_remove", "post_clear"):
user = kwargs["instance"]
signals.synchronise.send(sender=User, instance=user, base=False, access_refresh=False, mac_refresh=False, group_refresh=True
)
@receiver(post_delete, sender=Adherent)
@receiver(post_delete, sender=Club)
@receiver(post_delete, sender=User)
def user_post_delete(**kwargs):
"""Django signal, post delete operations on Adherent, Club and User.
Delete user in authentication.
"""
user = kwargs["instance"]
signals.remove.send(sender=User, instance=user)
regen("mailing")
class ServiceUser(RevMixin, AclMixin, AbstractBaseUser):
"""A class representing a serviceuser (it is considered as a user
with special informations).
The serviceuser is a special user used with special access to authentication tree. It is
its only usefullness, and service user can't connect to re2o.
Each service connected to authentication for auth (ex dokuwiki, owncloud, etc) should
have a different service user with special acl (readonly, auth) and password.
Attributes:
pseudo: login of the serviceuser
access_group: acl for this serviceuser
comment: Comment for this serviceuser.
"""
readonly = "readonly"
ACCESS = (("auth", "auth"), ("readonly", "readonly"), ("usermgmt", "usermgmt"))
pseudo = models.CharField(
max_length=32,
unique=True,
help_text=_("Must only contain letters, numerals or dashes."),
validators=[linux_user_validator],
)
access_group = models.CharField(choices=ACCESS, default=readonly, max_length=32)
comment = models.CharField(help_text=_("Comment."), max_length=255, blank=True)
USERNAME_FIELD = "pseudo"
objects = UserManager()
class Meta:
permissions = (("view_serviceuser", _("Can view a service user object")),)
verbose_name = _("service user")
verbose_name_plural = _("service users")
def get_full_name(self):
"""Shortcuts, return a pretty name for the serviceuser.
Parameters:
self (ServiceUser instance): serviceuser to return infos.
"""
return _("Service user <{name}>").format(name=self.pseudo)
def get_short_name(self):
"""Shortcuts, return the shortname (pseudo) of the serviceuser.
Parameters:
self (ServiceUser instance): serviceuser to return infos.
"""
return self.pseudo
def __str__(self):
return self.pseudo
@receiver(post_save, sender=ServiceUser)
def service_user_post_save(**kwargs):
"""Django signal, post save operations on ServiceUser.
Sync or create serviceuser in authentication.
"""
service_user = kwargs["instance"]
signals.synchronise.send(sender=ServiceUser, instance=service_user)
@receiver(post_delete, sender=ServiceUser)
def service_user_post_delete(**kwargs):
"""Django signal, post delete operations on ServiceUser.
Delete service user in authentication.
"""
service_user = kwargs["instance"]
signals.remove.send(sender=ServiceUser, instance=service_user)
class School(RevMixin, AclMixin, models.Model):
"""A class representing a school; which users are linked.
Attributes:
name: name of the school
"""
name = models.CharField(max_length=255)
class Meta:
permissions = (("view_school", _("Can view a school object")),)
verbose_name = _("school")
verbose_name_plural = _("schools")
@classmethod
def can_list(cls, user_request, *_args, **_kwargs):
"""All users can list schools
:param user_request: The user who wants to view the list.
:return: True if the user can view the list and an explanation
message.
"""
return (
True,
None,
None,
cls.objects.all()
)
def __str__(self):
return self.name
class ListRight(RevMixin, AclMixin, Group):
""" A class representing a listright, inherit from basic django Group object.
Each listrights/groups gathers several users, and can have individuals django
rights, like can_view, can_edit, etc.
Moreover, a ListRight is also a standard unix group, usefull for creating linux
unix groups for servers access or re2o single rights, or both.
Gid is used as a primary key, and can't be changed.
Attributes:
name: Inherited from Group, name of the ListRight
gid: Group id unix
critical: Boolean, if True the Group can't be changed without special acl
details: Details and description of the group
"""
unix_name = models.CharField(
max_length=255,
unique=True,
validators=[
RegexValidator(
"^[a-z]+$",
message=(_("UNIX group names can only contain lower case letters.")),
)
],
)
gid = models.PositiveIntegerField(unique=True, null=True)
critical = models.BooleanField(default=False)
details = models.CharField(help_text=_("Description."), max_length=255, blank=True)
class Meta:
permissions = (("view_listright", _("Can view a group of rights object")),)
verbose_name = _("group of rights")
verbose_name_plural = _("groups of rights")
def __str__(self):
return self.name
@receiver(post_save, sender=ListRight)
def listright_post_save(**kwargs):
"""Django signal, post save operations on ListRight/Group objects.
Sync or create group in authentication.
"""
right = kwargs["instance"]
signals.synchronise.send(sender=ListRight, instance=right)
@receiver(post_delete, sender=ListRight)
def listright_post_delete(**kwargs):
"""Django signal, post delete operations on ListRight/Group objects.
Delete group in authentication.
"""
right = kwargs["instance"]
signals.remove.send(sender=ListRight, instance=right)
class ListShell(RevMixin, AclMixin, models.Model):
"""A class representing a shell; which users are linked.
A standard linux user shell. (zsh, bash, etc)
Attributes:
shell: name of the shell
"""
shell = models.CharField(max_length=255, unique=True)
class Meta:
permissions = (("view_listshell", _("Can view a shell object")),)
verbose_name = _("shell")
verbose_name_plural = _("shells")
def get_pretty_name(self):
"""Method, returns a pretty name for a shell like "bash" or "zsh".
Parameters:
self (listshell): Shell to return a pretty name.
Returns:
pretty_name (string): Return a pretty name string for this shell.
"""
return self.shell.split("/")[-1]
@classmethod
def can_list(cls, user_request, *_args, **_kwargs):
"""All users can list shells
:param user_request: The user who wants to view the list.
:return: True if the user can view the list and an explanation
message.
"""
return (
True,
None,
None,
cls.objects.all()
)
def __str__(self):
return self.shell
class Ban(RevMixin, AclMixin, models.Model):
""" A class representing a ban, which cuts internet access,
as a sanction.
Attributes:
user: related user for this whitelist
raison: reason of this ban, can be null
date_start: Date of the start of the ban
date_end: Date of the end of the ban
state: Has no effect now, would specify this kind of ban
(hard, soft)
"""
STATE_HARD = 0
STATE_SOFT = 1
STATE_BRIDAGE = 2
STATES = (
(0, _("HARD (no access)")),
(1, _("SOFT (local access only)")),
(2, _("RESTRICTED (speed limitation)")),
)
user = models.ForeignKey("User", on_delete=models.PROTECT)
raison = models.CharField(max_length=255)
date_start = models.DateTimeField(auto_now_add=True)
date_end = models.DateTimeField()
state = models.IntegerField(choices=STATES, default=STATE_HARD)
request = None
class Meta:
permissions = (("view_ban", _("Can view a ban object")),)
verbose_name = _("ban")
verbose_name_plural = _("bans")
def notif_ban(self, request=None):
"""Function/method, send an email to notify that a ban has been
decided and internet access disabled.
Parameters:
self (ban instance): ban to notif disabled decision
request (django request): request to build email
Returns:
email: Notification email
"""
template = loader.get_template("users/email_ban_notif")
context = {
"name": self.user.get_full_name(),
"raison": self.raison,
"date_end": self.date_end,
"asso_name": AssoOption.get_cached_value("name"),
}
send_mail(
request,
"Déconnexion disciplinaire / Disciplinary disconnection",
template.render(context),
GeneralOption.get_cached_value("email_from"),
[self.user.email],
fail_silently=False,
)
return
def is_active(self):
"""Method, return if the ban is active now or not.
Parameters:
self (ban): Ban to test if is active.
Returns:
is_active (boolean): Return True if the ban is active.
"""
return self.date_end > timezone.now()
def can_view(self, user_request, *_args, **_kwargs):
"""Check if an user can view a Ban object.
:param self: The targeted object.
:param user_request: The user who ask for viewing the target.
:return: A boolean telling if the acces is granted and an explanation
text
"""
if not user_request.has_perm("users.view_ban") and self.user != user_request:
return (
False,
_("You don't have the right to view other bans than yours."),
("users.view_ban",),
)
else:
return True, None, None
def __str__(self):
return str(self.user) + " " + str(self.raison)
@receiver(post_save, sender=Ban)
def ban_post_save(**kwargs):
"""Django signal, post save operations on Ban objects.
Sync user's access state in authentication, call email notification if needed.
"""
ban = kwargs["instance"]
is_created = kwargs["created"]
user = ban.user
signals.synchronise.send(sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False)
regen("mailing")
if is_created:
ban.notif_ban(ban.request)
regen("dhcp")
regen("mac_ip_list")
if user.has_access():
regen("dhcp")
regen("mac_ip_list")
@receiver(post_delete, sender=Ban)
def ban_post_delete(**kwargs):
"""Django signal, post delete operations on Ban objects.
Sync user's access state in authentication.
"""
user = kwargs["instance"].user
signals.synchronise.send(sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False)
regen("mailing")
regen("dhcp")
regen("mac_ip_list")
class Whitelist(RevMixin, AclMixin, models.Model):
""" A class representing a whitelist, which gives a free internet
access to a user for special reason.
Is overrided by a ban object.
Attributes:
user: related user for this whitelist
raison: reason of this whitelist, can be null
date_start: Date of the start of the whitelist
date_end: Date of the end of the whitelist
"""
user = models.ForeignKey("User", on_delete=models.PROTECT)
raison = models.CharField(max_length=255)
date_start = models.DateTimeField(auto_now_add=True)
date_end = models.DateTimeField()
class Meta:
permissions = (("view_whitelist", _("Can view a whitelist object")),)
verbose_name = _("whitelist (free of charge access)")
verbose_name_plural = _("whitelists (free of charge access)")
def is_active(self):
"""Method, returns if the whitelist is active now or not.
Parameters:
self (whitelist): Whitelist to test if is active.
Returns:
is_active (boolean): Return True if the whistelist is active.
"""
return self.date_end > timezone.now()
def can_view(self, user_request, *_args, **_kwargs):
"""Check if an user can view a Whitelist object.
:param self: The targeted object.
:param user_request: The user who ask for viewing the target.
:return: A boolean telling if the acces is granted and an explanation
text
"""
if (
not user_request.has_perm("users.view_whitelist")
and self.user != user_request
):
return (
False,
_("You don't have the right to view other whitelists than yours."),
("users.view_whitelist",),
)
else:
return True, None, None
def __str__(self):
return str(self.user) + " " + str(self.raison)
@receiver(post_save, sender=Whitelist)
def whitelist_post_save(**kwargs):
"""Django signal, post save operations on Whitelist objects.
Sync user's access state in authentication.
"""
whitelist = kwargs["instance"]
user = whitelist.user
signals.synchronise.send(sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False)
is_created = kwargs["created"]
regen("mailing")
if is_created:
regen("dhcp")
regen("mac_ip_list")
if user.has_access():
regen("dhcp")
regen("mac_ip_list")
@receiver(post_delete, sender=Whitelist)
def whitelist_post_delete(**kwargs):
"""Django signal, post delete operations on Whitelist objects.
Sync user's access state in authentication.
"""
user = kwargs["instance"].user
signals.synchronise.send(sender=User, instance=user, base=False, access_refresh=True, mac_refresh=False)
regen("mailing")
regen("dhcp")
regen("mac_ip_list")
class Request(models.Model):
""" A class representing for user's request of reset password by email, or
confirm a new email address, with a link.
Attributes:
type: type of request (password, or confirm email address)
token: single-user token for this request
user: related user for this request
email: If needed, related email to send the request and the link
created_at: Date at the request was created
expires_at: The request will be invalid after the expires_at date
"""
PASSWD = "PW"
EMAIL = "EM"
TYPE_CHOICES = ((PASSWD, _("Password")), (EMAIL, _("Email address")))
type = models.CharField(max_length=2, choices=TYPE_CHOICES)
token = models.CharField(max_length=32)
user = models.ForeignKey("User", on_delete=models.CASCADE)
email = models.EmailField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True, editable=False)
expires_at = models.DateTimeField()
def save(self):
if not self.expires_at:
self.expires_at = timezone.now() + datetime.timedelta(
hours=GeneralOption.get_cached_value("req_expire_hrs")
)
if not self.token:
self.token = str(uuid.uuid4()).replace("-", "") # remove hyphens
super(Request, self).save()
class EMailAddress(RevMixin, AclMixin, models.Model):
""" A class representing an EMailAddress, for local emailaccounts
support. Each emailaddress belongs to a user.
Attributes:
user: parent user address for this email
local_part: local extension of the email
"""
user = models.ForeignKey(
User, on_delete=models.CASCADE, help_text=_("User of the local email account.")
)
local_part = models.CharField(
unique=True, max_length=128, help_text=_("Local part of the email address.")
)
class Meta:
permissions = (
("view_emailaddress", _("Can view a local email account object")),
)
verbose_name = _("local email account")
verbose_name_plural = _("local email accounts")
def __str__(self):
return str(self.local_part) + OptionalUser.get_cached_value(
"local_email_domain"
)
@cached_property
def complete_email_address(self):
"""Shortcuts, returns a complete mailaddress from localpart and emaildomain
specified in preferences.
Parameters:
self (emailaddress): emailaddress.
Returns:
emailaddress (string): Complete valid emailaddress
"""
return str(self.local_part) + OptionalUser.get_cached_value(
"local_email_domain"
)
@staticmethod
def can_create(user_request, userid, *_args, **_kwargs):
"""Check if a user can create a `EMailAddress` object.
Args:
user_request: The user who wants to create the object.
userid: The id of the user to whom the account is to be created
Returns:
a message and a boolean which is True if the user can create
a local email account.
"""
if user_request.has_perm("users.add_emailaddress"):
return True, None, None
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
return (False, _("The local email accounts are not enabled."), None)
if int(user_request.id) != int(userid):
return (
False,
_(
"You don't have the right to add a local email"
" account to another user."
),
("users.add_emailaddress",),
)
elif user_request.email_address.count() >= OptionalUser.get_cached_value(
"max_email_address"
):
return (
False,
_("You reached the limit of {} local email accounts.").format(
OptionalUser.get_cached_value("max_email_address")
),
None,
)
return True, None, None
def can_view(self, user_request, *_args, **_kwargs):
"""Check if a user can view the local email account
Args:
user_request: The user who wants to view the object.
Returns:
a message and a boolean which is True if the user can see
the local email account.
"""
if user_request.has_perm("users.view_emailaddress"):
return True, None, None
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
return (False, _("The local email accounts are not enabled."), None)
if user_request == self.user:
return True, None, None
return (
False,
_(
"You don't have the right to view another user's local"
" email account."
),
("users.view_emailaddress",),
)
def can_delete(self, user_request, *_args, **_kwargs):
"""Check if a user can delete the alias
Args:
user_request: The user who wants to delete the object.
Returns:
a message and a boolean which is True if the user can delete
the local email account.
"""
if self.local_part == self.user.pseudo.lower():
return (
False,
_(
"You can't delete a local email account whose"
" local part is the same as the username."
),
None,
)
if user_request.has_perm("users.delete_emailaddress"):
return True, None, None
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
return False, _("The local email accounts are not enabled."), None
if user_request == self.user:
return True, None, None
return (
False,
_(
"You don't have the right to delete another user's"
" local email account."
),
("users.delete_emailaddress",),
)
def can_edit(self, user_request, *_args, **_kwargs):
"""Check if a user can edit the alias
Args:
user_request: The user who wants to edit the object.
Returns:
a message and a boolean which is True if the user can edit
the local email account.
"""
if self.local_part == self.user.pseudo.lower():
return (
False,
_(
"You can't edit a local email account whose local"
" part is the same as the username."
),
None,
)
if user_request.has_perm("users.change_emailaddress"):
return True, None, None
if not OptionalUser.get_cached_value("local_email_accounts_enabled"):
return False, _("The local email accounts are not enabled."), None
if user_request == self.user:
return True, None, None
return (
False,
_(
"You don't have the right to edit another user's local"
" email account."
),
("users.change_emailaddress",),
)
def clean(self, *args, **kwargs):
"""Method, general clean for EMailAddres model.
Clean email local_part field, checking if it is available by calling
the smtp..
Parameters:
self (emailaddress): emailaddress local_part to clean.
Returns:
Django ValidationError, if the localpart does not comply with the policy.
"""
self.local_part = self.local_part.lower()
if "@" in self.local_part or "+" in self.local_part:
raise ValidationError(_("The local part must not contain @ or +."))
result, reason = smtp_check(self.local_part)
if result:
raise ValidationError(reason)
super(EMailAddress, self).clean(*args, **kwargs)