# -*- mode: python; coding: utf-8 -*- # Re2o un logiciel d'administration développé initiallement au rezometz. Il # se veut agnostique au réseau considéré, de manière à être installable en # quelques clics. # # Copyright © 2017 Gabriel Détraz # Copyright © 2017 Lara Kermarec # Copyright © 2017 Augustin Lemesle # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Reglages généraux, machines, utilisateurs, mail, general pour l'application. """ from __future__ import unicode_literals import os from django.utils.functional import cached_property from django.utils import timezone from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver from django.core.cache import cache from django.forms import ValidationError from django.utils.translation import ugettext_lazy as _ import machines.models from re2o.mixins import AclMixin, RevMixin from re2o.aes_field import AESEncryptedField from datetime import timedelta class PreferencesModel(models.Model): """ Base object for the Preferences objects Defines methods to handle the cache of the settings (they should not change a lot) """ @classmethod def set_in_cache(cls): """ Save the preferences in a server-side cache """ instance, _created = cls.objects.get_or_create() cache.set(cls().__class__.__name__.lower(), instance, None) return instance @classmethod def get_cached_value(cls, key): """ Get the preferences from the server-side cache """ instance = cache.get(cls().__class__.__name__.lower()) if instance is None: instance = cls.set_in_cache() return getattr(instance, key) class Meta: abstract = True class OptionalUser(AclMixin, PreferencesModel): """Options pour l'user : obligation ou nom du telephone, activation ou non du solde, autorisation du negatif, fingerprint etc""" is_tel_mandatory = models.BooleanField(default=True) gpg_fingerprint = models.BooleanField(default=True) all_can_create_club = models.BooleanField( default=False, help_text=_("Users can create a club.") ) all_can_create_adherent = models.BooleanField( default=False, help_text=_("Users can create a member.") ) shell_default = models.OneToOneField( "users.ListShell", on_delete=models.PROTECT, blank=True, null=True ) self_change_shell = models.BooleanField( default=False, help_text=_("Users can edit their shell.") ) self_change_room = models.BooleanField( default=False, help_text=_("Users can edit their room.") ) local_email_accounts_enabled = models.BooleanField( default=False, help_text=_("Enable local email accounts for users.") ) local_email_domain = models.CharField( max_length=32, default="@example.org", help_text=_("Domain to use for local email accounts"), ) max_email_address = models.IntegerField( default=15, help_text=_("Maximum number of local email addresses for a standard" " user."), ) delete_notyetactive = models.IntegerField( default=15, help_text=_( "Not yet active users will be deleted after this number of" " days." ), ) self_adhesion = models.BooleanField( default=False, help_text=_("A new user can create their account on Re2o.") ) all_users_active = models.BooleanField( default=False, help_text=_( "If True, all new created and connected users are active." " If False, only when a valid registration has been paid." ), ) allow_archived_connexion = models.BooleanField( default=False, help_text=_("If True, archived users are allowed to connect.") ) class Meta: permissions = (("view_optionaluser", _("Can view the user options")),) verbose_name = _("user options") def clean(self): """Clean model: Check the mail_extension """ if self.local_email_domain[0] != "@": raise ValidationError(_("Email domain must begin with @")) @receiver(post_save, sender=OptionalUser) def optionaluser_post_save(**kwargs): """Ecriture dans le cache""" user_pref = kwargs["instance"] user_pref.set_in_cache() class OptionalMachine(AclMixin, PreferencesModel): """Options pour les machines : maximum de machines ou d'alias par user sans droit, activation de l'ipv6""" SLAAC = "SLAAC" DHCPV6 = "DHCPV6" DISABLED = "DISABLED" CHOICE_IPV6 = ( (SLAAC, _("Autoconfiguration by RA")), (DHCPV6, _("IP addresses assigning by DHCPv6")), (DISABLED, _("Disabled")), ) password_machine = models.BooleanField(default=False) max_lambdauser_interfaces = models.IntegerField(default=10) max_lambdauser_aliases = models.IntegerField(default=10) ipv6_mode = models.CharField(max_length=32, choices=CHOICE_IPV6, default="DISABLED") create_machine = models.BooleanField(default=True) default_dns_ttl = models.PositiveIntegerField( verbose_name=_("Default Time To Live (TTL) for CNAME, A and AAA records."), default=172800, # 2 days ) @cached_property def ipv6(self): """ Check if the IPv6 option is activated """ return not self.get_cached_value("ipv6_mode") == "DISABLED" class Meta: permissions = (("view_optionalmachine", _("Can view the machine options")),) verbose_name = _("machine options") @receiver(post_save, sender=OptionalMachine) def optionalmachine_post_save(**kwargs): """Synchronisation ipv6 et ecriture dans le cache""" machine_pref = kwargs["instance"] machine_pref.set_in_cache() if machine_pref.ipv6_mode != "DISABLED": for interface in machines.models.Interface.objects.all(): interface.sync_ipv6() class OptionalTopologie(AclMixin, PreferencesModel): """Reglages pour la topologie : mode d'accès radius, vlan où placer les machines en accept ou reject""" MACHINE = "MACHINE" DEFINED = "DEFINED" CHOICE_RADIUS = ( (MACHINE, _("On the IP range's VLAN of the machine")), (DEFINED, _("Preset in 'VLAN for machines accepted by RADIUS'")), ) CHOICE_PROVISION = (("sftp", "sftp"), ("tftp", "tftp")) switchs_web_management = models.BooleanField( default=False, help_text=_("Web management, activated in case of automatic provision"), ) switchs_web_management_ssl = models.BooleanField( default=False, help_text=_( "SSL web management, make sure that a certificate is" " installed on the switch" ), ) switchs_rest_management = models.BooleanField( default=False, help_text=_("REST management, activated in case of automatic provision"), ) switchs_ip_type = models.OneToOneField( "machines.IpType", on_delete=models.PROTECT, blank=True, null=True, help_text=_("IP range for the management of switches"), ) switchs_provision = models.CharField( max_length=32, choices=CHOICE_PROVISION, default="tftp", help_text=_("Provision of configuration mode for switches"), ) sftp_login = models.CharField( max_length=32, null=True, blank=True, help_text=_("SFTP login for switches") ) sftp_pass = AESEncryptedField( max_length=63, null=True, blank=True, help_text=_("SFTP password") ) @cached_property def provisioned_switchs(self): """Liste des switches provisionnés""" from topologie.models import Switch return Switch.objects.filter(automatic_provision=True).order_by( "interface__domain__name" ) @cached_property def switchs_management_interface(self): """Return the ip of the interface that the switch have to contact to get it's config""" if self.switchs_ip_type: from machines.models import Role, Interface return ( Interface.objects.filter( machine__interface__in=Role.interface_for_roletype( "switch-conf-server" ) ) .filter(machine_type__ip_type=self.switchs_ip_type) .first() ) else: return None @cached_property def switchs_management_interface_ip(self): """Same, but return the ipv4""" if not self.switchs_management_interface: return None return self.switchs_management_interface.ipv4 @cached_property def switchs_management_sftp_creds(self): """Credentials des switchs pour provion sftp""" if self.sftp_login and self.sftp_pass: return {"login": self.sftp_login, "pass": self.sftp_pass} else: return None @cached_property def switchs_management_utils(self): """Used for switch_conf, return a list of ip on vlans""" from machines.models import Role, Ipv6List, Interface def return_ips_dict(interfaces): return { "ipv4": [str(interface.ipv4) for interface in interfaces], "ipv6": Ipv6List.objects.filter(interface__in=interfaces).values_list( "ipv6", flat=True ), } ntp_servers = Role.all_interfaces_for_roletype("ntp-server").filter( machine_type__ip_type=self.switchs_ip_type ) log_servers = Role.all_interfaces_for_roletype("log-server").filter( machine_type__ip_type=self.switchs_ip_type ) radius_servers = Role.all_interfaces_for_roletype("radius-server").filter( machine_type__ip_type=self.switchs_ip_type ) dhcp_servers = Role.all_interfaces_for_roletype("dhcp-server") dns_recursive_servers = Role.all_interfaces_for_roletype( "dns-recursive-server" ).filter(machine_type__ip_type=self.switchs_ip_type) subnet = None subnet6 = None if self.switchs_ip_type: subnet = ( self.switchs_ip_type.ip_net_full_info or self.switchs_ip_type.ip_set_full_info[0] ) subnet6 = self.switchs_ip_type.ip6_set_full_info return { "ntp_servers": return_ips_dict(ntp_servers), "log_servers": return_ips_dict(log_servers), "radius_servers": return_ips_dict(radius_servers), "dhcp_servers": return_ips_dict(dhcp_servers), "dns_recursive_servers": return_ips_dict(dns_recursive_servers), "subnet": subnet, "subnet6": subnet6, } @cached_property def provision_switchs_enabled(self): """Return true if all settings are ok : switchs on automatic provision, ip_type""" return bool( self.provisioned_switchs and self.switchs_ip_type and SwitchManagementCred.objects.filter(default_switch=True).exists() and self.switchs_management_interface_ip and bool( self.switchs_provision != "sftp" or self.switchs_management_sftp_creds ) ) class Meta: permissions = (("view_optionaltopologie", _("Can view the topology options")),) verbose_name = _("topology options") @receiver(post_save, sender=OptionalTopologie) def optionaltopologie_post_save(**kwargs): """Ecriture dans le cache""" topologie_pref = kwargs["instance"] topologie_pref.set_in_cache() class RadiusKey(AclMixin, models.Model): """Class of a radius key""" radius_key = AESEncryptedField(max_length=255, help_text=_("RADIUS key")) comment = models.CharField( max_length=255, null=True, blank=True, help_text=_("Comment for this key") ) default_switch = models.BooleanField( default=False, help_text=_("Default key for switches") ) class Meta: permissions = (("view_radiuskey", _("Can view a RADIUS key object")),) verbose_name = _("RADIUS key") verbose_name_plural = _("RADIUS keys") def clean(self): """Clean model: Check default switch is unique """ if RadiusKey.objects.filter(default_switch=True).count() > 1: raise ValidationError(_("Default radiuskey for switchs already exist")) def __str__(self): return _("RADIUS key ") + str(self.id) + " " + str(self.comment) class SwitchManagementCred(AclMixin, models.Model): """Class of a management creds of a switch, for rest management""" management_id = models.CharField(max_length=63, help_text=_("Switch login")) management_pass = AESEncryptedField(max_length=63, help_text=_("Password")) default_switch = models.BooleanField( default=True, unique=True, help_text=_("Default credentials for switches") ) class Meta: permissions = ( ( "view_switchmanagementcred", _("Can view a switch management" " credentials object"), ), ) verbose_name = _("switch management credentials") def __str__(self): return _("Switch login ") + str(self.management_id) class Reminder(AclMixin, models.Model): """Options pour les mails de notification de fin d'adhésion. Days: liste des nombres de jours pour lesquells un mail est envoyé optionalMessage: message additionel pour le mail """ days = models.IntegerField( default=7, unique=True, help_text=_("Delay between the email and the membership's end"), ) message = models.TextField( default="", null=True, blank=True, help_text=_("Message displayed specifically for this reminder"), ) class Meta: permissions = (("view_reminder", _("Can view a reminder object")),) verbose_name = _("reminder") verbose_name_plural = _("reminders") def users_to_remind(self): from re2o.utils import all_has_access date = timezone.now().replace(minute=0, hour=0) futur_date = date + timedelta(days=self.days) users = all_has_access(futur_date).exclude( pk__in=all_has_access(futur_date + timedelta(days=1)) ) return users class GeneralOption(AclMixin, PreferencesModel): """Options générales : nombre de resultats par page, nom du site, temps où les liens sont valides""" general_message_fr = models.TextField( default="", blank=True, help_text=_( "General message displayed on the French version of the" " website (e.g. in case of maintenance)" ), ) general_message_en = models.TextField( default="", blank=True, help_text=_( "General message displayed on the English version of the" " website (e.g. in case of maintenance)" ), ) search_display_page = models.IntegerField(default=15) pagination_number = models.IntegerField(default=25) pagination_large_number = models.IntegerField(default=8) req_expire_hrs = models.IntegerField(default=48) site_name = models.CharField(max_length=32, default="Re2o") email_from = models.EmailField(default="www-data@example.com") main_site_url = models.URLField(max_length=255, default="http://re2o.example.org") GTU_sum_up = models.TextField(default="", blank=True) GTU = models.FileField(upload_to="", default="", null=True, blank=True) class Meta: permissions = (("view_generaloption", _("Can view the general options")),) verbose_name = _("general options") @receiver(post_save, sender=GeneralOption) def generaloption_post_save(**kwargs): """Ecriture dans le cache""" general_pref = kwargs["instance"] general_pref.set_in_cache() class Service(AclMixin, models.Model): """Liste des services affichés sur la page d'accueil : url, description, image et nom""" name = models.CharField(max_length=32) url = models.URLField() description = models.TextField() image = models.ImageField(upload_to="logo", blank=True) class Meta: permissions = (("view_service", _("Can view the service options")),) verbose_name = _("service") verbose_name_plural = _("services") def __str__(self): return str(self.name) class MailContact(AclMixin, models.Model): """Contact email adress with a commentary.""" address = models.EmailField( default="contact@example.org", help_text=_("Contact email address") ) commentary = models.CharField( blank=True, null=True, help_text=_("Description of the associated email address."), max_length=256, ) @cached_property def get_name(self): return self.address.split("@")[0] class Meta: permissions = ( ("view_mailcontact", _("Can view a contact email address object")), ) verbose_name = _("contact email address") verbose_name_plural = _("contact email addresses") def __str__(self): return self.address class Mandate(RevMixin, AclMixin, models.Model): class Meta: verbose_name = _("Mandate") verbose_name_plural = _("Mandates") permissions = (("view_mandate", _("Can view a mandate")),) president = models.ForeignKey( "users.User", on_delete=models.SET_NULL, null=True, blank=True, verbose_name=_("President of the association"), help_text=_("Displayed on subscription vouchers"), ) start_date = models.DateTimeField(verbose_name=_("start date")) end_date = models.DateTimeField(verbose_name=_("end date"), blank=True, null=True) @classmethod def get_mandate(cls, date=timezone.now): """"Find the mandate taking place at the given date.""" if callable(date): date = date() mandate = ( cls.objects.exclude(end_date__lte=date).order_by("start_date").first() or cls.objects.order_by("start_date").last() ) if not mandate: raise cls.DoesNotExist( "No mandate have been created. Please go to the preferences page to create one." ) return mandate def is_over(self): return self.end_date is None def __str__(self): return str(self.president) + " " + str(self.start_date.year) class AssoOption(AclMixin, PreferencesModel): """Options générales de l'asso : siret, addresse, nom, etc""" name = models.CharField( default=_("Networking organisation school Something"), max_length=256 ) siret = models.CharField(default="00000000000000", max_length=32) adresse1 = models.CharField(default=_("Threadneedle Street"), max_length=128) adresse2 = models.CharField(default=_("London EC2R 8AH"), max_length=128) contact = models.EmailField(default="contact@example.org") telephone = models.CharField(max_length=15, default="0000000000") pseudo = models.CharField(default=_("Organisation"), max_length=32) utilisateur_asso = models.OneToOneField( "users.User", on_delete=models.PROTECT, blank=True, null=True ) description = models.TextField(null=True, blank=True) class Meta: permissions = (("view_assooption", _("Can view the organisation options")),) verbose_name = _("organisation options") @receiver(post_save, sender=AssoOption) def assooption_post_save(**kwargs): """Ecriture dans le cache""" asso_pref = kwargs["instance"] asso_pref.set_in_cache() class HomeOption(AclMixin, PreferencesModel): """Settings of the home page (facebook/twitter etc)""" facebook_url = models.URLField(null=True, blank=True) twitter_url = models.URLField(null=True, blank=True) twitter_account_name = models.CharField(max_length=32, null=True, blank=True) class Meta: permissions = (("view_homeoption", _("Can view the homepage options")),) verbose_name = _("homepage options") @receiver(post_save, sender=HomeOption) def homeoption_post_save(**kwargs): """Ecriture dans le cache""" home_pref = kwargs["instance"] home_pref.set_in_cache() class MailMessageOption(AclMixin, models.Model): """Reglages, mail de bienvenue et autre""" welcome_mail_fr = models.TextField( default="", blank=True, help_text=_("Welcome email in French") ) welcome_mail_en = models.TextField( default="", blank=True, help_text=_("Welcome email in English") ) class Meta: permissions = ( ("view_mailmessageoption", _("Can view the email message" " options")), ) verbose_name = _("email message options") class RadiusAttribute(RevMixin, AclMixin, models.Model): class Meta: verbose_name = _("RADIUS attribute") verbose_name_plural = _("RADIUS attributes") attribute = models.CharField( max_length=255, verbose_name=_("Attribute"), help_text=_("See http://freeradius.org/rfc/attributes.html"), ) value = models.CharField(max_length=255, verbose_name=_("Value")) comment = models.TextField( verbose_name=_("Comment"), help_text=_("Use this field to document this attribute."), blank=True, default="", ) def __str__(self): return " ".join([self.attribute, "=", self.value]) class RadiusOption(AclMixin, PreferencesModel): class Meta: verbose_name = _("RADIUS policy") verbose_name_plural = _("RADIUS policies") MACHINE = "MACHINE" DEFINED = "DEFINED" CHOICE_RADIUS = ( (MACHINE, _("On the IP range's VLAN of the machine")), (DEFINED, _("Preset in 'VLAN for machines accepted by RADIUS'")), ) REJECT = "REJECT" SET_VLAN = "SET_VLAN" CHOICE_POLICY = ( (REJECT, _("Reject the machine")), (SET_VLAN, _("Place the machine on the VLAN")), ) radius_general_policy = models.CharField( max_length=32, choices=CHOICE_RADIUS, default="DEFINED" ) unknown_machine = models.CharField( max_length=32, choices=CHOICE_POLICY, default=REJECT, verbose_name=_("Policy for unknown machines"), ) unknown_machine_vlan = models.ForeignKey( "machines.Vlan", on_delete=models.PROTECT, related_name="unknown_machine_vlan", blank=True, null=True, verbose_name=_("Unknown machines VLAN"), help_text=_("VLAN for unknown machines if not rejected"), ) unknown_machine_attributes = models.ManyToManyField( RadiusAttribute, related_name="unknown_machine_attribute", blank=True, verbose_name=_("Unknown machines attributes."), help_text=_("Answer attributes for unknown machines."), ) unknown_port = models.CharField( max_length=32, choices=CHOICE_POLICY, default=REJECT, verbose_name=_("Policy for unknown ports"), ) unknown_port_vlan = models.ForeignKey( "machines.Vlan", on_delete=models.PROTECT, related_name="unknown_port_vlan", blank=True, null=True, verbose_name=_("Unknown ports VLAN"), help_text=_("VLAN for unknown ports if not rejected"), ) unknown_port_attributes = models.ManyToManyField( RadiusAttribute, related_name="unknown_port_attribute", blank=True, verbose_name=_("Unknown ports attributes."), help_text=_("Answer attributes for unknown ports."), ) unknown_room = models.CharField( max_length=32, choices=CHOICE_POLICY, default=REJECT, verbose_name=_( "Policy for machines connecting from unregistered rooms" " (relevant on ports with STRICT RADIUS mode)" ), ) unknown_room_vlan = models.ForeignKey( "machines.Vlan", related_name="unknown_room_vlan", on_delete=models.PROTECT, blank=True, null=True, verbose_name=_("Unknown rooms VLAN"), help_text=_("VLAN for unknown rooms if not rejected"), ) unknown_room_attributes = models.ManyToManyField( RadiusAttribute, related_name="unknown_room_attribute", blank=True, verbose_name=_("Unknown rooms attributes."), help_text=_("Answer attributes for unknown rooms."), ) non_member = models.CharField( max_length=32, choices=CHOICE_POLICY, default=REJECT, verbose_name=_("Policy for non members"), ) non_member_vlan = models.ForeignKey( "machines.Vlan", related_name="non_member_vlan", on_delete=models.PROTECT, blank=True, null=True, verbose_name=_("Non members VLAN"), help_text=_("VLAN for non members if not rejected"), ) non_member_attributes = models.ManyToManyField( RadiusAttribute, related_name="non_member_attribute", blank=True, verbose_name=_("Non member attributes."), help_text=_("Answer attributes for non members."), ) banned = models.CharField( max_length=32, choices=CHOICE_POLICY, default=REJECT, verbose_name=_("Policy for banned users"), ) banned_vlan = models.ForeignKey( "machines.Vlan", related_name="banned_vlan", on_delete=models.PROTECT, blank=True, null=True, verbose_name=_("Banned users VLAN"), help_text=_("VLAN for banned users if not rejected"), ) banned_attributes = models.ManyToManyField( RadiusAttribute, related_name="banned_attribute", blank=True, verbose_name=_("Banned attributes."), help_text=_("Answer attributes for banned users."), ) vlan_decision_ok = models.OneToOneField( "machines.Vlan", on_delete=models.PROTECT, related_name="vlan_ok_option", blank=True, null=True, ) ok_attributes = models.ManyToManyField( RadiusAttribute, related_name="ok_attribute", blank=True, verbose_name=_("Accepted users attributes."), help_text=_("Answer attributes for accepted users."), ) @classmethod def get_attributes(cls, name, attribute_kwargs={}): return ( (str(attribute.attribute), str(attribute.value % attribute_kwargs)) for attribute in cls.get_cached_value(name).all() ) def default_invoice(): tpl, _ = DocumentTemplate.objects.get_or_create( name="Re2o default invoice", template="templates/default_invoice.tex" ) return tpl.id def default_voucher(): tpl, _ = DocumentTemplate.objects.get_or_create( name="Re2o default voucher", template="templates/default_voucher.tex" ) return tpl.id class CotisationsOption(AclMixin, PreferencesModel): class Meta: verbose_name = _("cotisations options") invoice_template = models.OneToOneField( "preferences.DocumentTemplate", verbose_name=_("Template for invoices"), related_name="invoice_template", on_delete=models.PROTECT, default=default_invoice, ) voucher_template = models.OneToOneField( "preferences.DocumentTemplate", verbose_name=_("Template for subscription voucher"), related_name="voucher_template", on_delete=models.PROTECT, default=default_voucher, ) send_voucher_mail = models.BooleanField( verbose_name=_("Send voucher by email when the invoice is controlled."), help_text=_( "Be carefull, if no mandate is defined on the preferences page, errors will be triggered when generating vouchers." ), default=False, ) class DocumentTemplate(RevMixin, AclMixin, models.Model): """Represent a template in order to create documents such as invoice or subscription voucher. """ template = models.FileField(upload_to="templates/", verbose_name=_("template")) name = models.CharField(max_length=125, verbose_name=_("name"), unique=True) class Meta: verbose_name = _("document template") verbose_name_plural = _("document templates") def __str__(self): return str(self.name) @receiver(models.signals.post_delete, sender=DocumentTemplate) def auto_delete_file_on_delete(sender, instance, **kwargs): """ Deletes file from filesystem when corresponding `DocumentTemplate` object is deleted. """ if instance.template: if os.path.isfile(instance.template.path): os.remove(instance.template.path) @receiver(models.signals.pre_save, sender=DocumentTemplate) def auto_delete_file_on_change(sender, instance, **kwargs): """ Deletes old file from filesystem when corresponding `DocumentTemplate` object is updated with new file. """ if not instance.pk: return False try: old_file = DocumentTemplate.objects.get(pk=instance.pk).template except DocumentTemplate.DoesNotExist: return False new_file = instance.template if not old_file == new_file: if os.path.isfile(old_file.path): os.remove(old_file.path)