# -*- mode: python; coding: utf-8 -*- # Re2o est un logiciel d'administration développé initiallement au rezometz. Il # se veut agnostique au réseau considéré, de manière à être installable en # quelques clics. # # Copyright © 2016-2018 Gabriel Détraz # Copyright © 2017 Goulven Kermarec # Copyright © 2017 Augustin Lemesle # Copyright © 2018 Charlie Jacomme # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """machines.models The models definitions for the Machines app """ from __future__ import unicode_literals from datetime import timedelta import re from ipaddress import IPv6Address from itertools import chain from netaddr import mac_bare, EUI, IPSet, IPRange, IPNetwork, IPAddress import hashlib import base64 from django.db import models from django.db.models import Q from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.forms import ValidationError from django.utils.functional import cached_property from django.utils import timezone from django.core.validators import MaxValueValidator, MinValueValidator from django.utils.translation import ugettext_lazy as _ from macaddress.fields import MACAddressField from re2o.field_permissions import FieldPermissionModelMixin from re2o.mixins import AclMixin, RevMixin import users.models import preferences.models class Machine(RevMixin, FieldPermissionModelMixin, models.Model): """ Class définissant une machine, object parent user, objets fils interfaces""" user = models.ForeignKey('users.User', on_delete=models.PROTECT) name = models.CharField( max_length=255, help_text=_("Optional"), blank=True, null=True ) active = models.BooleanField(default=True) class Meta: permissions = ( ("view_machine", _("Can view a machine object")), ("change_machine_user", _("Can change the user of a machine")), ) verbose_name = _("machine") verbose_name_plural = _("machines") @classmethod def get_instance(cls, machineid, *_args, **_kwargs): """Get the Machine instance with machineid. :param userid: The id :return: The user """ return cls.objects.get(pk=machineid) def linked_objects(self): """Return linked objects : machine and domain. Usefull in history display""" return chain( self.interface_set.all(), Domain.objects.filter( interface_parent__in=self.interface_set.all() ) ) @staticmethod def can_change_user(user_request, *_args, **_kwargs): """Checks if an user is allowed to change the user who owns a Machine. Args: user_request: The user requesting to change owner. Returns: A tuple with a boolean stating if edition is allowed and an explanation message. """ return (user_request.has_perm('machines.change_machine_user'), _("You don't have the right to change the machine's user.")) @staticmethod def can_view_all(user_request, *_args, **_kwargs): """Vérifie qu'on peut bien afficher l'ensemble des machines, droit particulier correspondant :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if not user_request.has_perm('machines.view_machine'): return False, _("You don't have the right to view all the" " machines.") return True, None @staticmethod def can_create(user_request, userid, *_args, **_kwargs): """Vérifie qu'un user qui fait la requète peut bien créer la machine et n'a pas atteint son quota, et crée bien une machine à lui :param user_request: Utilisateur qui fait la requête :param userid: id de l'user dont on va créer une machine :return: soit True, soit False avec la raison de l'échec""" try: user = users.models.User.objects.get(pk=userid) except users.models.User.DoesNotExist: return False, _("Nonexistent user.") max_lambdauser_interfaces = (preferences.models.OptionalMachine .get_cached_value( 'max_lambdauser_interfaces' )) if not user_request.has_perm('machines.add_machine'): if not (preferences.models.OptionalMachine .get_cached_value('create_machine')): return False, (_("You don't have the right to add a machine.")) if user != user_request: return False, (_("You don't have the right to add a machine" " to another user.")) if user.user_interfaces().count() >= max_lambdauser_interfaces: return False, (_("You reached the maximum number of interfaces" " that you are allowed to create yourself" " (%s)." % max_lambdauser_interfaces)) return True, None def can_edit(self, user_request, *args, **kwargs): """Vérifie qu'on peut bien éditer cette instance particulière (soit machine de soi, soit droit particulier :param self: instance machine à éditer :param user_request: instance user qui fait l'edition :return: True ou False avec la raison le cas échéant""" if self.user != user_request: if (not user_request.has_perm('machines.change_interface') or not self.user.can_edit( self.user, user_request, *args, **kwargs )[0]): return False, (_("You don't have the right to edit a machine" " of another user.")) return True, None def can_delete(self, user_request, *args, **kwargs): """Vérifie qu'on peut bien supprimer cette instance particulière (soit machine de soi, soit droit particulier :param self: instance machine à supprimer :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if self.user != user_request: if (not user_request.has_perm('machines.change_interface') or not self.user.can_edit( self.user, user_request, *args, **kwargs )[0]): return False, _("You don't have the right to delete a machine" " of another user.") return True, None def can_view(self, user_request, *_args, **_kwargs): """Vérifie qu'on peut bien voir cette instance particulière (soit machine de soi, soit droit particulier :param self: instance machine à éditer :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if (not user_request.has_perm('machines.view_machine') and self.user != user_request): return False, _("You don't have the right to view other machines" " than yours.") return True, None @cached_property def short_name(self): """Par defaut, renvoie le nom de la première interface de cette machine""" return str(self.interface_set.first().domain.name) @cached_property def complete_name(self): """Par defaut, renvoie le nom de la première interface de cette machine""" return str(self.interface_set.first()) @cached_property def all_short_names(self): """Renvoie de manière unique, le nom des interfaces de cette machine""" return Domain.objects.filter( interface_parent__machine=self ).values_list('name', flat=True).distinct() @cached_property def all_complete_names(self): """Renvoie tous les tls complets de la machine""" return [str(domain) for domain in Domain.objects.filter( Q(cname__interface_parent__machine=self) | Q(interface_parent__machine=self) )] def __init__(self, *args, **kwargs): super(Machine, self).__init__(*args, **kwargs) self.field_permissions = { 'user': self.can_change_user, } def __str__(self): return str(self.user) + ' - ' + str(self.id) + ' - ' + str(self.name) class MachineType(RevMixin, AclMixin, models.Model): """ Type de machine, relié à un type d'ip, affecté aux interfaces""" type = models.CharField(max_length=255) ip_type = models.ForeignKey( 'IpType', on_delete=models.PROTECT, blank=True, null=True ) class Meta: permissions = ( ("view_machinetype", _("Can view a machine type object")), ("use_all_machinetype", _("Can use all machine types")), ) verbose_name = _("machine type") verbose_name_plural = _("machine types") def all_interfaces(self): """ Renvoie toutes les interfaces (cartes réseaux) de type machinetype""" return Interface.objects.filter(type=self) @staticmethod def can_use_all(user_request, *_args, **_kwargs): """Check if an user can use every MachineType. Args: user_request: The user requesting edition. Returns: A tuple with a boolean stating if user can acces and an explanation message is acces is not allowed. """ if not user_request.has_perm('machines.use_all_machinetype'): return False, (_("You don't have the right to use all machine" " types.")) return True, None def __str__(self): return self.type class IpType(RevMixin, AclMixin, models.Model): """ Type d'ip, définissant un range d'ip, affecté aux machine types""" type = models.CharField(max_length=255) extension = models.ForeignKey('Extension', on_delete=models.PROTECT) need_infra = models.BooleanField(default=False) domaine_ip_start = models.GenericIPAddressField(protocol='IPv4') domaine_ip_stop = models.GenericIPAddressField(protocol='IPv4') domaine_ip_network = models.GenericIPAddressField( protocol='IPv4', null=True, blank=True, help_text=_("Network containing the domain's IPv4 range (optional)") ) domaine_ip_netmask = models.IntegerField( default=24, validators=[ MaxValueValidator(31), MinValueValidator(8) ], help_text=_("Netmask for the domain's IPv4 range") ) reverse_v4 = models.BooleanField( default=False, help_text=_("Enable reverse DNS for IPv4"), ) prefix_v6 = models.GenericIPAddressField( protocol='IPv6', null=True, blank=True ) prefix_v6_length = models.IntegerField( default=64, validators=[ MaxValueValidator(128), MinValueValidator(0) ] ) reverse_v6 = models.BooleanField( default=False, help_text=_("Enable reverse DNS for IPv6"), ) vlan = models.ForeignKey( 'Vlan', on_delete=models.PROTECT, blank=True, null=True ) ouverture_ports = models.ForeignKey( 'OuverturePortList', blank=True, null=True ) class Meta: permissions = ( ("view_iptype", _("Can view an IP type object")), ("use_all_iptype", _("Can use all IP types")), ) verbose_name = _("IP type") verbose_name_plural = ("IP types") @cached_property def ip_range(self): """ Renvoie un objet IPRange à partir de l'objet IpType""" return IPRange(self.domaine_ip_start, end=self.domaine_ip_stop) @cached_property def ip_set(self): """ Renvoie une IPSet à partir de l'iptype""" return IPSet(self.ip_range) @cached_property def ip_set_as_str(self): """ Renvoie une liste des ip en string""" return [str(x) for x in self.ip_set] @cached_property def ip_set_cidrs_as_str(self): """Renvoie la liste des cidrs du range en str""" return [str(ip_range) for ip_range in self.ip_set.iter_cidrs()] @cached_property def ip_set_full_info(self): """Iter sur les range cidr, et renvoie network, broacast , etc""" return [ { 'network': str(ip_set.network), 'netmask': str(ip_set.netmask), 'netmask_cidr': str(ip_set.prefixlen), 'broadcast': str(ip_set.broadcast), 'vlan': str(self.vlan), 'vlan_id': self.vlan.vlan_id } for ip_set in self.ip_set.iter_cidrs() ] @cached_property def ip6_set_full_info(self): if self.prefix_v6: return { 'network' : str(self.prefix_v6), 'netmask' : 'ffff:ffff:ffff:ffff::', 'netmask_cidr' : str(self.prefix_v6_length), 'vlan': str(self.vlan), 'vlan_id': self.vlan.vlan_id } else: return None @cached_property def ip_network(self): """Renvoie le network parent du range start-stop, si spécifié Différent de ip_set_cidrs ou iP_set, car lui est supérieur ou égal""" if self.domaine_ip_network: return IPNetwork(str(self.domaine_ip_network) + '/' + str(self.domaine_ip_netmask)) return None @cached_property def ip_net_full_info(self): """Renvoie les infos du network contenant du range""" return { 'network' : str(self.ip_network.network), 'netmask' : str(self.ip_network.netmask), 'broadcast' : str(self.ip_network.broadcast), 'netmask_cidr' : str(self.ip_network.prefixlen), } @cached_property def complete_prefixv6(self): """Return the complete prefix v6 as cidr""" return str(self.prefix_v6) + "/" + str(self.prefix_v6_length) def ip_objects(self): """ Renvoie tous les objets ipv4 relié à ce type""" return IpList.objects.filter(ip_type=self) def free_ip(self): """ Renvoie toutes les ip libres associées au type donné (self)""" return IpList.objects.filter( interface__isnull=True ).filter(ip_type=self) def gen_ip_range(self): """ Cree les IpList associées au type self. Parcours pédestrement et crée les ip une par une. Si elles existent déjà, met à jour le type associé à l'ip""" # Creation du range d'ip dans les objets iplist ip_obj = [IpList(ip_type=self, ipv4=str(ip)) for ip in self.ip_range] listes_ip = IpList.objects.filter( ipv4__in=[str(ip) for ip in self.ip_range] ) # Si il n'y a pas d'ip, on les crée if not listes_ip: IpList.objects.bulk_create(ip_obj) # Sinon on update l'ip_type else: listes_ip.update(ip_type=self) return def del_ip_range(self): """ Methode dépréciée, IpList est en mode cascade et supprimé automatiquement""" if Interface.objects.filter(ipv4__in=self.ip_objects()): raise ValidationError(_("One or several IP addresses from the" " range are affected, impossible to delete" " the range.")) for ip in self.ip_objects(): ip.delete() def check_replace_prefixv6(self): """Remplace les prefixv6 des interfaces liées à ce type d'ip""" if not self.prefix_v6: return else: for ipv6 in Ipv6List.objects.filter( interface__in=Interface.objects.filter( type__in=MachineType.objects.filter(ip_type=self) ) ): ipv6.check_and_replace_prefix(prefix=self.prefix_v6) def get_associated_ptr_records(self): from re2o.utils import all_active_assigned_interfaces if self.reverse_v4: return (all_active_assigned_interfaces() .filter(type__ip_type=self) .filter(ipv4__isnull=False)) else: return None def get_associated_ptr_v6_records(self): from re2o.utils import all_active_interfaces if self.reverse_v6: return (all_active_interfaces(full=True) .filter(type__ip_type=self)) else: return None def clean(self): """ Nettoyage. Vérifie : - Que ip_stop est après ip_start - Qu'on ne crée pas plus gros qu'un /16 - Que le range crée ne recoupe pas un range existant - Formate l'ipv6 donnée en /64""" if IPAddress(self.domaine_ip_start) > IPAddress(self.domaine_ip_stop): raise ValidationError(_("Range end must be after range start...")) # On ne crée pas plus grand qu'un /16 if self.ip_range.size > 65536: raise ValidationError(_("The range is too large, you can't create" " a larger one than a /16.")) # On check que les / ne se recoupent pas for element in IpType.objects.all().exclude(pk=self.pk): if not self.ip_set.isdisjoint(element.ip_set): raise ValidationError(_("The specified range is not disjoint" " from existing ranges.")) # On formate le prefix v6 if self.prefix_v6: self.prefix_v6 = str(IPNetwork(self.prefix_v6 + '/64').network) # On vérifie qu'un domaine network/netmask contiens bien le domaine ip start-stop if self.domaine_ip_network: if not self.domaine_ip_start in self.ip_network or not self.domaine_ip_stop in self.ip_network: raise ValidationError(_("If you specify a domain network or" " netmask, it must contain the" " domain's IP range.")) return def save(self, *args, **kwargs): self.clean() super(IpType, self).save(*args, **kwargs) @staticmethod def can_use_all(user_request, *_args, **_kwargs): """Superdroit qui permet d'utiliser toutes les extensions sans restrictions :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" return user_request.has_perm('machines.use_all_iptype'), None def __str__(self): return self.type class Vlan(RevMixin, AclMixin, models.Model): """ Un vlan : vlan_id et nom On limite le vlan id entre 0 et 4096, comme défini par la norme""" vlan_id = models.PositiveIntegerField(validators=[MaxValueValidator(4095)]) name = models.CharField(max_length=256) comment = models.CharField(max_length=256, blank=True) #Réglages supplémentaires arp_protect = models.BooleanField(default=False) dhcp_snooping = models.BooleanField(default=False) dhcpv6_snooping = models.BooleanField(default=False) igmp = models.BooleanField( default=False, help_text="Gestion multicast v4" ) mld = models.BooleanField( default=False, help_text="Gestion multicast v6" ) class Meta: permissions = ( ("view_vlan", _("Can view a VLAN object")), ) verbose_name = _("VLAN") verbose_name_plural = _("VLANs") def __str__(self): return self.name class Nas(RevMixin, AclMixin, models.Model): """ Les nas. Associé à un machine_type. Permet aussi de régler le port_access_mode (802.1X ou mac-address) pour le radius. Champ autocapture de la mac à true ou false""" default_mode = '802.1X' AUTH = ( ('802.1X', '802.1X'), ('Mac-address', 'Mac-address'), ) name = models.CharField(max_length=255, unique=True) nas_type = models.ForeignKey( 'MachineType', on_delete=models.PROTECT, related_name='nas_type' ) machine_type = models.ForeignKey( 'MachineType', on_delete=models.PROTECT, related_name='machinetype_on_nas' ) port_access_mode = models.CharField( choices=AUTH, default=default_mode, max_length=32 ) autocapture_mac = models.BooleanField(default=False) class Meta: permissions = ( ("view_nas", _("Can view a NAS device object")), ) verbose_name = _("NAS device") verbose_name_plural = _("NAS devices") def __str__(self): return self.name class SOA(RevMixin, AclMixin, models.Model): """ Un enregistrement SOA associé à une extension Les valeurs par défault viennent des recommandations RIPE : https://www.ripe.net/publications/docs/ripe-203 """ name = models.CharField(max_length=255) mail = models.EmailField( help_text=_("Contact email address for the zone") ) refresh = models.PositiveIntegerField( default=86400, # 24 hours help_text=_("Seconds before the secondary DNS have to ask the primary" " DNS serial to detect a modification") ) retry = models.PositiveIntegerField( default=7200, # 2 hours help_text=_("Seconds before the secondary DNS ask the serial again in" " case of a primary DNS timeout") ) expire = models.PositiveIntegerField( default=3600000, # 1000 hours help_text=_("Seconds before the secondary DNS stop answering requests" " in case of primary DNS timeout") ) ttl = models.PositiveIntegerField( default=172800, # 2 days help_text=_("Time to Live") ) class Meta: permissions = ( ("view_soa", _("Can view an SOA record object")), ) verbose_name = _("SOA record") verbose_name_plural = _("SOA records") def __str__(self): return str(self.name) @cached_property def dns_soa_param(self): """ Renvoie la partie de l'enregistrement SOA correspondant aux champs : ; refresh ; retry ; expire ; TTL """ return ( ' {refresh}; refresh\n' ' {retry}; retry\n' ' {expire}; expire\n' ' {ttl}; TTL' ).format( refresh=str(self.refresh).ljust(12), retry=str(self.retry).ljust(12), expire=str(self.expire).ljust(12), ttl=str(self.ttl).ljust(12) ) @cached_property def dns_soa_mail(self): """ Renvoie le mail dans l'enregistrement SOA """ mail_fields = str(self.mail).split('@') return mail_fields[0].replace('.', '\\.') + '.' + mail_fields[1] + '.' @classmethod def new_default_soa(cls): """ Fonction pour créer un SOA par défaut, utile pour les nouvelles extensions . /!\ Ne jamais supprimer ou renommer cette fonction car elle est utilisée dans les migrations de la BDD. """ return cls.objects.get_or_create( name=_("SOA to edit"), mail="postmaser@example.com" )[0].pk class Extension(RevMixin, AclMixin, models.Model): """ Extension dns type example.org. Précise si tout le monde peut l'utiliser, associé à un origin (ip d'origine)""" name = models.CharField( max_length=255, unique=True, help_text=_("Zone name, must begin with a dot (.example.org)") ) need_infra = models.BooleanField(default=False) origin = models.ForeignKey( 'IpList', on_delete=models.PROTECT, blank=True, null=True, help_text=_("A record associated with the zone") ) origin_v6 = models.GenericIPAddressField( protocol='IPv6', null=True, blank=True, help_text=_("AAAA record associated with the zone") ) soa = models.ForeignKey( 'SOA', on_delete=models.CASCADE ) class Meta: permissions = ( ("view_extension", _("Can view an extension object")), ("use_all_extension", _("Can use all extensions")), ) verbose_name = _("DNS extension") verbose_name_plural = _("DNS extensions") @cached_property def dns_entry(self): """ Une entrée DNS A et AAAA sur origin (zone self)""" entry = "" if self.origin: entry += "@ IN A " + str(self.origin) if self.origin_v6: if entry: entry += "\n" entry += "@ IN AAAA " + str(self.origin_v6) return entry def get_associated_sshfp_records(self): from re2o.utils import all_active_assigned_interfaces return (all_active_assigned_interfaces() .filter(type__ip_type__extension=self) .filter(machine__id__in=SshFp.objects.values('machine'))) def get_associated_a_records(self): from re2o.utils import all_active_assigned_interfaces return (all_active_assigned_interfaces() .filter(type__ip_type__extension=self) .filter(ipv4__isnull=False)) def get_associated_aaaa_records(self): from re2o.utils import all_active_interfaces return (all_active_interfaces(full=True) .filter(type__ip_type__extension=self)) def get_associated_cname_records(self): from re2o.utils import all_active_assigned_interfaces return (Domain.objects .filter(extension=self) .filter(cname__interface_parent__in=all_active_assigned_interfaces()) .prefetch_related('cname')) @staticmethod def can_use_all(user_request, *_args, **_kwargs): """Superdroit qui permet d'utiliser toutes les extensions sans restrictions :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" return user_request.has_perm('machines.use_all_extension'), None def __str__(self): return self.name def clean(self, *args, **kwargs): if self.name and self.name[0] != '.': raise ValidationError(_("An extension must begin with a dot.")) super(Extension, self).clean(*args, **kwargs) class Mx(RevMixin, AclMixin, models.Model): """ Entrées des MX. Enregistre la zone (extension) associée et la priorité Todo : pouvoir associer un MX à une interface """ zone = models.ForeignKey('Extension', on_delete=models.PROTECT) priority = models.PositiveIntegerField() name = models.ForeignKey('Domain', on_delete=models.PROTECT) class Meta: permissions = ( ("view_mx", _("Can view an MX record object")), ) verbose_name = _("MX record") verbose_name_plural = _("MX records") @cached_property def dns_entry(self): """Renvoie l'entrée DNS complète pour un MX à mettre dans les fichiers de zones""" return "@ IN MX {prior} {name}".format( prior=str(self.priority).ljust(3), name=str(self.name) ) def __str__(self): return str(self.zone) + ' ' + str(self.priority) + ' ' + str(self.name) class Ns(RevMixin, AclMixin, models.Model): """Liste des enregistrements name servers par zone considéérée""" zone = models.ForeignKey('Extension', on_delete=models.PROTECT) ns = models.ForeignKey('Domain', on_delete=models.PROTECT) class Meta: permissions = ( ("view_ns", _("Can view an NS record object")), ) verbose_name = _("NS record") verbose_name_plural = _("NS records") @cached_property def dns_entry(self): """Renvoie un enregistrement NS complet pour les filezones""" return "@ IN NS " + str(self.ns) def __str__(self): return str(self.zone) + ' ' + str(self.ns) class Txt(RevMixin, AclMixin, models.Model): """ Un enregistrement TXT associé à une extension""" zone = models.ForeignKey('Extension', on_delete=models.PROTECT) field1 = models.CharField(max_length=255) field2 = models.TextField(max_length=2047) class Meta: permissions = ( ("view_txt", _("Can view a TXT record object")), ) verbose_name = _("TXT record") verbose_name_plural = _("TXT records") def __str__(self): return str(self.zone) + " : " + str(self.field1) + " " +\ str(self.field2) @cached_property def dns_entry(self): """Renvoie l'enregistrement TXT complet pour le fichier de zone""" return str(self.field1).ljust(15) + " IN TXT " + str(self.field2) class DName(RevMixin, AclMixin, models.Model): """A DNAME entry for the DNS.""" zone = models.ForeignKey('Extension', on_delete=models.PROTECT) alias = models.CharField(max_length=255) class Meta: permissions = ( ("view_dname", _("Can view a DNAME record object")), ) verbose_name = _("DNAME record") verbose_name_plural = _("DNAME records") def __str__(self): return str(self.zone) + " : " + str(self.alias) @cached_property def dns_entry(self): """Returns the DNAME record for the DNS zone file.""" return str(self.alias).ljust(15) + " IN DNAME " + str(self.zone) class Srv(RevMixin, AclMixin, models.Model): """ A SRV record """ TCP = 'TCP' UDP = 'UDP' service = models.CharField(max_length=31) protocole = models.CharField( max_length=3, choices=( (TCP, 'TCP'), (UDP, 'UDP'), ), default=TCP, ) extension = models.ForeignKey('Extension', on_delete=models.PROTECT) ttl = models.PositiveIntegerField( default=172800, # 2 days help_text=_("Time to Live") ) priority = models.PositiveIntegerField( default=0, validators=[MaxValueValidator(65535)], help_text=_("Priority of the target server (positive integer value," " the lower it is, the more the server will be used if" " available)") ) weight = models.PositiveIntegerField( default=0, validators=[MaxValueValidator(65535)], help_text=_("Relative weight for records with the same priority" " (integer value between 0 and 65535)") ) port = models.PositiveIntegerField( validators=[MaxValueValidator(65535)], help_text=_("TCP/UDP port") ) target = models.ForeignKey( 'Domain', on_delete=models.PROTECT, help_text=_("Target server") ) class Meta: permissions = ( ("view_srv", _("Can view an SRV record object")), ) verbose_name = _("SRV record") verbose_name_plural = _("SRV records") def __str__(self): return str(self.service) + ' ' + str(self.protocole) + ' ' +\ str(self.extension) + ' ' + str(self.priority) +\ ' ' + str(self.weight) + str(self.port) + str(self.target) @cached_property def dns_entry(self): """Renvoie l'enregistrement SRV complet pour le fichier de zone""" return str(self.service) + '._' + str(self.protocole).lower() +\ str(self.extension) + '. ' + str(self.ttl) + ' IN SRV ' +\ str(self.priority) + ' ' + str(self.weight) + ' ' +\ str(self.port) + ' ' + str(self.target) + '.' class SshFp(RevMixin, AclMixin, models.Model): """A fingerprint of an SSH public key""" ALGO = ( ("ssh-rsa", "ssh-rsa"), ("ssh-ed25519", "ssh-ed25519"), ("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"), ("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"), ("ecdsa-sha2-nistp521", "ecdsa-sha2-nistp521"), ) machine = models.ForeignKey('Machine', on_delete=models.CASCADE) pub_key_entry = models.TextField( help_text="SSH public key", max_length=2048 ) algo = models.CharField( choices=ALGO, max_length=32 ) comment = models.CharField( help_text="Comment", max_length=255, null=True, blank=True ) @cached_property def algo_id(self): """Return the id of the algorithm for this key""" if "ecdsa" in self.algo: return 3 elif "rsa" in self.algo: return 1 else: return 2 @cached_property def hash(self): """Return the hashess for the pub key with correct id cf RFC, 1 is sha1 , 2 sha256""" return { "1" : hashlib.sha1(base64.b64decode(self.pub_key_entry)).hexdigest(), "2" : hashlib.sha256(base64.b64decode(self.pub_key_entry)).hexdigest(), } class Meta: permissions = ( ("view_sshfp", _("Can view an SSHFP record object")), ) verbose_name = _("SSHFP record") verbose_name_plural = _("SSHFP records") def can_view(self, user_request, *_args, **_kwargs): return self.machine.can_view(user_request, *_args, **_kwargs) def can_edit(self, user_request, *args, **kwargs): return self.machine.can_edit(user_request, *args, **kwargs) def can_delete(self, user_request, *args, **kwargs): return self.machine.can_delete(user_request, *args, **kwargs) def __str__(self): return str(self.algo) + ' ' + str(self.comment) class Interface(RevMixin, AclMixin, FieldPermissionModelMixin, models.Model): """ Une interface. Objet clef de l'application machine : - une address mac unique. Possibilité de la rendre unique avec le typemachine - une onetoone vers IpList pour attribution ipv4 - le type parent associé au range ip et à l'extension - un objet domain associé contenant son nom - la liste des ports oiuvert""" ipv4 = models.OneToOneField( 'IpList', on_delete=models.PROTECT, blank=True, null=True ) mac_address = MACAddressField(integer=False) machine = models.ForeignKey('Machine', on_delete=models.CASCADE) type = models.ForeignKey('MachineType', on_delete=models.PROTECT) details = models.CharField(max_length=255, blank=True) port_lists = models.ManyToManyField('OuverturePortList', blank=True) class Meta: permissions = ( ("view_interface", _("Can view an interface object")), ("change_interface_machine", _("Can change the owner of an interface")), ) verbose_name = _("interface") verbose_name_plural = _("interfaces") @cached_property def is_active(self): """ Renvoie si une interface doit avoir accès ou non """ machine = self.machine user = self.machine.user return machine.active and user.has_access() @cached_property def ipv6_slaac(self): """ Renvoie un objet type ipv6 à partir du prefix associé à l'iptype parent""" if self.type.ip_type.prefix_v6: return EUI(self.mac_address).ipv6( IPNetwork(self.type.ip_type.prefix_v6).network ) else: return None @cached_property def gen_ipv6_dhcpv6(self): """Cree une ip, à assigner avec dhcpv6 sur une machine""" prefix_v6 = self.type.ip_type.prefix_v6.encode().decode('utf-8') if not prefix_v6: return None return IPv6Address( IPv6Address(prefix_v6).exploded[:20] + IPv6Address(self.id).exploded[20:] ) def sync_ipv6_dhcpv6(self): """Affecte une ipv6 dhcpv6 calculée à partir de l'id de la machine""" ipv6_dhcpv6 = self.gen_ipv6_dhcpv6 if not ipv6_dhcpv6: return ipv6 = Ipv6List.objects.filter(ipv6=str(ipv6_dhcpv6)).first() if not ipv6: ipv6 = Ipv6List(ipv6=str(ipv6_dhcpv6)) ipv6.interface = self ipv6.save() return def sync_ipv6_slaac(self): """Cree, mets à jour et supprime si il y a lieu l'ipv6 slaac associée à la machine Sans prefixe ipv6, on return Si l'ip slaac n'est pas celle qu'elle devrait être, on maj""" ipv6_slaac = self.ipv6_slaac if not ipv6_slaac: return ipv6_object = (Ipv6List.objects .filter(interface=self, slaac_ip=True) .first()) if not ipv6_object: ipv6_object = Ipv6List(interface=self, slaac_ip=True) if ipv6_object.ipv6 != str(ipv6_slaac): ipv6_object.ipv6 = str(ipv6_slaac) ipv6_object.save() def sync_ipv6(self): """Cree et met à jour l'ensemble des ipv6 en fonction du mode choisi""" if (preferences.models.OptionalMachine .get_cached_value('ipv6_mode') == 'SLAAC'): self.sync_ipv6_slaac() elif (preferences.models.OptionalMachine .get_cached_value('ipv6_mode') == 'DHCPV6'): self.sync_ipv6_dhcpv6() else: return def ipv6(self): """ Renvoie le queryset de la liste des ipv6 On renvoie l'ipv6 slaac que si le mode slaac est activé (et non dhcpv6)""" if (preferences.models.OptionalMachine .get_cached_value('ipv6_mode') == 'SLAAC'): return self.ipv6list.all() elif (preferences.models.OptionalMachine .get_cached_value('ipv6_mode') == 'DHCPV6'): return self.ipv6list.filter(slaac_ip=False) else: return None def mac_bare(self): """ Formatage de la mac type mac_bare""" return str(EUI(self.mac_address, dialect=mac_bare)).lower() def filter_macaddress(self): """ Tente un formatage mac_bare, si échoue, lève une erreur de validation""" try: self.mac_address = str(EUI(self.mac_address)) except: raise ValidationError(_("The given MAC address is invalid.")) def clean(self, *args, **kwargs): """ Formate l'addresse mac en mac_bare (fonction filter_mac) et assigne une ipv4 dans le bon range si inexistante ou incohérente""" # If type was an invalid value, django won't create an attribute type # but try clean() as we may be able to create it from another value # so even if the error as yet been detected at this point, django # continues because the error might not prevent us from creating the # instance. # But in our case, it's impossible to create a type value so we raise # the error. if not hasattr(self, 'type'): raise ValidationError(_("The selected IP type is invalid.")) self.filter_macaddress() self.mac_address = str(EUI(self.mac_address)) or None if not self.ipv4 or self.type.ip_type != self.ipv4.ip_type: self.assign_ipv4() super(Interface, self).clean(*args, **kwargs) def assign_ipv4(self): """ Assigne une ip à l'interface """ free_ips = self.type.ip_type.free_ip() if free_ips: self.ipv4 = free_ips[0] else: raise ValidationError(_("There is no IP address available in the" " slash.")) return def unassign_ipv4(self): """ Sans commentaire, désassigne une ipv4""" self.ipv4 = None def update_type(self): """ Lorsque le machinetype est changé de type d'ip, on réassigne""" self.clean() self.save() def save(self, *args, **kwargs): self.filter_macaddress() # On verifie la cohérence en forçant l'extension par la méthode if self.ipv4: if self.type.ip_type != self.ipv4.ip_type: raise ValidationError(_("The IPv4 address and the machine type" " don't match.")) super(Interface, self).save(*args, **kwargs) @staticmethod def can_create(user_request, machineid, *_args, **_kwargs): """Verifie que l'user a les bons droits infra pour créer une interface, ou bien que la machine appartient bien à l'user :param macineid: Id de la machine parente de l'interface :param user_request: instance utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" try: machine = Machine.objects.get(pk=machineid) except Machine.DoesNotExist: return False, _("Nonexistent machine.") if not user_request.has_perm('machines.add_interface'): if not (preferences.models.OptionalMachine .get_cached_value('create_machine')): return False, _("You can't add a machine.") max_lambdauser_interfaces = (preferences.models.OptionalMachine .get_cached_value( 'max_lambdauser_interfaces' )) if machine.user != user_request: return False, _("You don't have the right to add an interface" " to a machine of another user.") if (machine.user.user_interfaces().count() >= max_lambdauser_interfaces): return False, (_("You reached the maximum number of interfaces" " that you are allowed to create yourself" " (%s)." % max_lambdauser_interfaces)) return True, None @staticmethod def can_change_machine(user_request, *_args, **_kwargs): """Check if a user can change the machine associated with an Interface object """ return (user_request.has_perm('machines.change_interface_machine'), _("Permission required to edit the machine.")) def can_edit(self, user_request, *args, **kwargs): """Verifie que l'user a les bons droits infra pour editer cette instance interface, ou qu'elle lui appartient :param self: Instance interface à editer :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if self.machine.user != user_request: if (not user_request.has_perm('machines.change_interface') or not self.machine.user.can_edit( user_request, *args, **kwargs )[0]): return False, _("You don't have the right to edit a machine of" " another user.") return True, None def can_delete(self, user_request, *args, **kwargs): """Verifie que l'user a les bons droits delete object pour del cette instance interface, ou qu'elle lui appartient :param self: Instance interface à del :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if self.machine.user != user_request: if (not user_request.has_perm('machines.change_interface') or not self.machine.user.can_edit( user_request, *args, **kwargs )[0]): return False, _("You don't have the right to edit a machine of" " another user.") return True, None def can_view(self, user_request, *_args, **_kwargs): """Vérifie qu'on peut bien voir cette instance particulière avec droit view objet ou qu'elle appartient à l'user :param self: instance interface à voir :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if (not user_request.has_perm('machines.view_interface') and self.machine.user != user_request): return False, _("You don't have the right to view machines other" " than yours.") return True, None def __init__(self, *args, **kwargs): super(Interface, self).__init__(*args, **kwargs) self.field_permissions = { 'machine': self.can_change_machine, } def __str__(self): try: domain = self.domain except: domain = None return str(domain) def has_private_ip(self): """ True si l'ip associée est privée""" if self.ipv4: return IPAddress(str(self.ipv4)).is_private() else: return False def may_have_port_open(self): """ True si l'interface a une ip et une ip publique. Permet de ne pas exporter des ouvertures sur des ip privées (useless)""" return self.ipv4 and not self.has_private_ip() class Ipv6List(RevMixin, AclMixin, FieldPermissionModelMixin, models.Model): """ A list of IPv6 """ ipv6 = models.GenericIPAddressField( protocol='IPv6', ) interface = models.ForeignKey( 'Interface', on_delete=models.CASCADE, related_name='ipv6list' ) slaac_ip = models.BooleanField(default=False) class Meta: permissions = ( ("view_ipv6list", _("Can view an IPv6 addresses list object")), ("change_ipv6list_slaac_ip", _("Can change the SLAAC value of an" " IPv6 addresses list")), ) verbose_name = _("IPv6 addresses list") verbose_name_plural = _("IPv6 addresses lists") @staticmethod def can_create(user_request, interfaceid, *_args, **_kwargs): """Verifie que l'user a les bons droits infra pour créer une ipv6, ou possède l'interface associée :param interfaceid: Id de l'interface associée à cet objet domain :param user_request: instance utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" try: interface = Interface.objects.get(pk=interfaceid) except Interface.DoesNotExist: return False, _("Nonexistent interface.") if not user_request.has_perm('machines.add_ipv6list'): if interface.machine.user != user_request: return False, _("You don't have the right to add an alias to a" " machine of another user.") return True, None @staticmethod def can_change_slaac_ip(user_request, *_args, **_kwargs): """ Check if a user can change the slaac value """ return (user_request.has_perm('machines.change_ipv6list_slaac_ip'), _("Permission required to change the SLAAC value of an IPv6" " address")) def can_edit(self, user_request, *args, **kwargs): """Verifie que l'user a les bons droits infra pour editer cette instance interface, ou qu'elle lui appartient :param self: Instance interface à editer :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if self.interface.machine.user != user_request: if (not user_request.has_perm('machines.change_ipv6list') or not self.interface.machine.user.can_edit( user_request, *args, **kwargs )[0]): return False, _("You don't have the right to edit a machine of" " another user.") return True, None def can_delete(self, user_request, *args, **kwargs): """Verifie que l'user a les bons droits delete object pour del cette instance interface, ou qu'elle lui appartient :param self: Instance interface à del :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if self.interface.machine.user != user_request: if (not user_request.has_perm('machines.change_ipv6list') or not self.interface.machine.user.can_edit( user_request, *args, **kwargs )[0]): return False, _("You don't have the right to edit a machine of" " another user.") return True, None def can_view(self, user_request, *_args, **_kwargs): """Vérifie qu'on peut bien voir cette instance particulière avec droit view objet ou qu'elle appartient à l'user :param self: instance interface à voir :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if (not user_request.has_perm('machines.view_ipv6list') and self.interface.machine.user != user_request): return False, _("You don't have the right to view machines other" " than yours.") return True, None def __init__(self, *args, **kwargs): super(Ipv6List, self).__init__(*args, **kwargs) self.field_permissions = { 'slaac_ip': self.can_change_slaac_ip, } def check_and_replace_prefix(self, prefix=None): """Si le prefixe v6 est incorrect, on maj l'ipv6""" prefix_v6 = prefix or self.interface.type.ip_type.prefix_v6.encode().decode('utf-8') if not prefix_v6: return if (IPv6Address(self.ipv6.encode().decode('utf-8')).exploded[:20] != IPv6Address(prefix_v6).exploded[:20]): self.ipv6 = IPv6Address( IPv6Address(prefix_v6).exploded[:20] + IPv6Address(self.ipv6.encode().decode('utf-8')).exploded[20:] ) self.save() def clean(self, *args, **kwargs): if self.slaac_ip and (Ipv6List.objects .filter(interface=self.interface, slaac_ip=True) .exclude(id=self.id)): raise ValidationError(_("A SLAAC IP address is already registered.")) prefix_v6 = self.interface.type.ip_type.prefix_v6.encode().decode('utf-8') if prefix_v6: if (IPv6Address(self.ipv6.encode().decode('utf-8')).exploded[:20] != IPv6Address(prefix_v6).exploded[:20]): raise ValidationError(_("The v6 prefix is incorrect and" " doesn't match the type associated" " with the machine.")) super(Ipv6List, self).clean(*args, **kwargs) def save(self, *args, **kwargs): """Force à avoir appellé clean avant""" self.full_clean() super(Ipv6List, self).save(*args, **kwargs) def __str__(self): return str(self.ipv6) class Domain(RevMixin, AclMixin, models.Model): """ Objet domain. Enregistrement A et CNAME en même temps : permet de stocker les alias et les nom de machines, suivant si interface_parent ou cname sont remplis""" interface_parent = models.OneToOneField( 'Interface', on_delete=models.CASCADE, blank=True, null=True ) name = models.CharField( help_text=_("Mandatory and unique, must not contain dots."), max_length=255 ) extension = models.ForeignKey('Extension', on_delete=models.PROTECT) cname = models.ForeignKey( 'self', null=True, blank=True, related_name='related_domain' ) class Meta: unique_together = (("name", "extension"),) permissions = ( ("view_domain", _("Can view a domain object")), ) verbose_name = _("domain") verbose_name_plural = _("domains") def get_extension(self): """ Retourne l'extension de l'interface parente si c'est un A Retourne l'extension propre si c'est un cname, renvoie None sinon""" if self.interface_parent: return self.interface_parent.type.ip_type.extension elif hasattr(self, 'extension'): return self.extension else: return None def clean(self): """ Validation : - l'objet est bien soit A soit CNAME - le cname est pas pointé sur lui-même - le nom contient bien les caractères autorisés par la norme dns et moins de 63 caractères au total - le couple nom/extension est bien unique""" if self.get_extension(): self.extension = self.get_extension() if self.interface_parent and self.cname: raise ValidationError(_("You can't create a both A and CNAME" " record.")) if self.cname == self: raise ValidationError(_("You can't create a CNAME record pointing" " to itself.")) HOSTNAME_LABEL_PATTERN = re.compile( r"(?!-)[A-Z\d-]+(? 63: raise ValidationError(_("The domain name %s is too long (over 63" " characters).") % dns) if not HOSTNAME_LABEL_PATTERN.match(dns): raise ValidationError(_("The domain name %s contains forbidden" " characters.") % dns) self.validate_unique() super(Domain, self).clean() @cached_property def dns_entry(self): """ Une entrée DNS""" if self.cname: return "{name} IN CNAME {cname}.".format( name=str(self.name).ljust(15), cname=str(self.cname) ) def save(self, *args, **kwargs): """ Empèche le save sans extension valide. Force à avoir appellé clean avant""" if not self.get_extension(): raise ValidationError(_("Invalid extension.")) self.full_clean() super(Domain, self).save(*args, **kwargs) @cached_property def get_source_interface(self): """Renvoie l'interface source : - l'interface reliée si c'est un A - si c'est un cname, suit le cname jusqu'à atteindre le A et renvoie l'interface parente Fonction récursive""" if self.interface_parent: return self.interface_parent else: return self.cname.get_parent_interface() @staticmethod def can_create(user_request, interfaceid, *_args, **_kwargs): """Verifie que l'user a les bons droits infra pour créer un domain, ou possède l'interface associée :param interfaceid: Id de l'interface associée à cet objet domain :param user_request: instance utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" try: interface = Interface.objects.get(pk=interfaceid) except Interface.DoesNotExist: return False, _("Nonexistent interface.") if not user_request.has_perm('machines.add_domain'): max_lambdauser_aliases = (preferences.models.OptionalMachine .get_cached_value( 'max_lambdauser_aliases' )) if interface.machine.user != user_request: return False, _("You don't have the right to add an alias to a" " machine of another user.") if Domain.objects.filter( cname__in=Domain.objects.filter( interface_parent__in=(interface.machine.user .user_interfaces()) ) ).count() >= max_lambdauser_aliases: return False, _("You reached the maximum number of alias that" " you are allowed to create yourself (%s). " % max_lambdauser_aliases) return True, None def can_edit(self, user_request, *_args, **_kwargs): """Verifie que l'user a les bons droits pour editer cette instance domain :param self: Instance domain à editer :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if (not user_request.has_perm('machines.change_domain') and self.get_source_interface.machine.user != user_request): return False, _("You don't have the right to edit an alias of a" " machine of another user.") return True, None def can_delete(self, user_request, *_args, **_kwargs): """Verifie que l'user a les bons droits delete object pour del cette instance domain, ou qu'elle lui appartient :param self: Instance domain à del :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if (not user_request.has_perm('machines.delete_domain') and self.get_source_interface.machine.user != user_request): return False, _("You don't have the right to delete an alias of a" " machine of another user.") return True, None def can_view(self, user_request, *_args, **_kwargs): """Vérifie qu'on peut bien voir cette instance particulière avec droit view objet ou qu'elle appartient à l'user :param self: instance domain à voir :param user_request: instance user qui fait l'edition :return: True ou False avec la raison de l'échec le cas échéant""" if (not user_request.has_perm('machines.view_domain') and self.get_source_interface.machine.user != user_request): return False, _("You don't have the right to view machines other" " than yours.") return True, None def __str__(self): return str(self.name) + str(self.extension) class IpList(RevMixin, AclMixin, models.Model): """ A list of IPv4 """ ipv4 = models.GenericIPAddressField(protocol='IPv4', unique=True) ip_type = models.ForeignKey('IpType', on_delete=models.CASCADE) class Meta: permissions = ( ("view_iplist", _("Can view an IPv4 addresses list object")), ) verbose_name = _("IPv4 addresses list") verbose_name_plural = _("IPv4 addresses lists") @cached_property def need_infra(self): """ Permet de savoir si un user basique peut assigner cette ip ou non""" return self.ip_type.need_infra def clean(self): """ Erreur si l'ip_type est incorrect""" if not str(self.ipv4) in self.ip_type.ip_set_as_str: raise ValidationError(_("The IPv4 address and the range of the IP" " type don't match.")) return def save(self, *args, **kwargs): self.clean() super(IpList, self).save(*args, **kwargs) def __str__(self): return self.ipv4 class Role(RevMixin, AclMixin, models.Model): """Define the role of a machine. Allow automated generation of the server configuration. """ ROLE = ( ('dhcp-server', _("DHCP server")), ('switch-conf-server', _("Switches configuration server")), ('dns-recursif-server', _("Recursive DNS server")), ('ntp-server', _("NTP server")), ('radius-server', _("RADIUS server")), ('log-server', _("Log server")), ('ldap-master-server', _("LDAP master server")), ('ldap-backup-server', _("LDAP backup server")), ('smtp-server', _("SMTP server")), ('postgresql-server', _("postgreSQL server")), ('mysql-server', _("mySQL server")), ('sql-client', _("SQL client")), ('gateway', _("Gateway")), ) role_type = models.CharField(max_length=255, unique=True) servers = models.ManyToManyField('Interface') specific_role = models.CharField( choices=ROLE, null=True, blank=True, max_length=32, ) class Meta: permissions = ( ("view_role", _("Can view a role object")), ) verbose_name = _("server role") verbose_name_plural = _("server roles") @classmethod def get_instance(cls, roleid, *_args, **_kwargs): """Get the Role instance with roleid. Args: roleid: The id Returns: The role. """ return cls.objects.get(pk=roleid) @classmethod def interface_for_roletype(cls, roletype): """Return interfaces for a roletype""" return Interface.objects.filter( role=cls.objects.filter(specific_role=roletype) ) @classmethod def all_interfaces_for_roletype(cls, roletype): """Return all interfaces for a roletype""" return Interface.objects.filter( machine__interface__role=cls.objects.filter(specific_role=roletype) ) @classmethod def get_instance(cls, roleid, *_args, **_kwargs): """Get the Machine instance with machineid. :param userid: The id :return: The user """ return cls.objects.get(pk=roleid) @classmethod def interface_for_roletype(cls, roletype): """Return interfaces for a roletype""" return Interface.objects.filter(role=cls.objects.filter(specific_role=roletype)) def save(self, *args, **kwargs): super(Role, self).save(*args, **kwargs) def __str__(self): return str(self.role_type) class Service(RevMixin, AclMixin, models.Model): """ Definition d'un service (dhcp, dns, etc)""" service_type = models.CharField(max_length=255, blank=True, unique=True) min_time_regen = models.DurationField( default=timedelta(minutes=1), help_text=_("Minimal time before regeneration of the service.") ) regular_time_regen = models.DurationField( default=timedelta(hours=1), help_text=_("Maximal time before regeneration of the service.") ) servers = models.ManyToManyField('Interface', through='Service_link') class Meta: permissions = ( ("view_service", _("Can view a service object")), ) verbose_name = _("service to generate (DHCP, DNS, ...)") verbose_name_plural = _("services to generate (DHCP, DNS, ...)") def ask_regen(self): """ Marque à True la demande de régénération pour un service x """ Service_link.objects.filter(service=self).exclude(asked_regen=True)\ .update(asked_regen=True) return def process_link(self, servers): """ Django ne peut créer lui meme les relations manytomany avec table intermediaire explicite""" for serv in servers.exclude( pk__in=Interface.objects.filter(service=self) ): link = Service_link(service=self, server=serv) link.save() Service_link.objects.filter(service=self).exclude(server__in=servers)\ .delete() return def save(self, *args, **kwargs): super(Service, self).save(*args, **kwargs) def __str__(self): return str(self.service_type) def regen(service): """ Fonction externe pour régérération d'un service, prend un objet service en arg""" obj = Service.objects.filter(service_type=service) if obj: obj[0].ask_regen() return class Service_link(RevMixin, AclMixin, models.Model): """ Definition du lien entre serveurs et services""" service = models.ForeignKey('Service', on_delete=models.CASCADE) server = models.ForeignKey('Interface', on_delete=models.CASCADE) last_regen = models.DateTimeField(auto_now_add=True) asked_regen = models.BooleanField(default=False) class Meta: permissions = ( ("view_service_link", _("Can view a service server link object")), ) verbose_name = _("link between service and server") verbose_name_plural = _("links between service and server") def done_regen(self): """ Appellé lorsqu'un serveur a regénéré son service""" self.last_regen = timezone.now() self.asked_regen = False self.save() @property def need_regen(self): """ Décide si le temps minimal écoulé est suffisant pour provoquer une régénération de service""" return bool( (self.asked_regen and ( self.last_regen + self.service.min_time_regen ) < timezone.now() ) or ( self.last_regen + self.service.regular_time_regen ) < timezone.now() ) @need_regen.setter def need_regen(self, value): """ Force to set the need_regen value. True means a regen is asked and False means a regen has been done. :param value: (bool) The value to set to """ if not value: self.last_regen = timezone.now() self.asked_regen = value self.save() def __str__(self): return str(self.server) + " " + str(self.service) class OuverturePortList(RevMixin, AclMixin, models.Model): """Liste des ports ouverts sur une interface.""" name = models.CharField( help_text=_("Name of the ports configuration"), max_length=255 ) class Meta: permissions = ( ("view_ouvertureportlist", _("Can view a ports opening list" " object")), ) verbose_name = _("ports opening list") verbose_name_plural = _("ports opening lists") def can_delete(self, user_request, *_args, **_kwargs): """Verifie que l'user a les bons droits bureau pour delete cette instance ouvertureportlist :param self: Instance ouvertureportlist à delete :param user_request: Utilisateur qui fait la requête :return: soit True, soit False avec la raison de l'échec""" if not user_request.has_perm('machines.delete_ouvertureportlist'): return False, _("You don't have the right to delete a ports" " opening list.") if self.interface_set.all(): return False, _("This ports opening list is used.") return True, None def __str__(self): return self.name def tcp_ports_in(self): """Renvoie la liste des ports ouverts en TCP IN pour ce profil""" return self.ouvertureport_set.filter( protocole=OuverturePort.TCP, io=OuverturePort.IN ) def udp_ports_in(self): """Renvoie la liste des ports ouverts en UDP IN pour ce profil""" return self.ouvertureport_set.filter( protocole=OuverturePort.UDP, io=OuverturePort.IN ) def tcp_ports_out(self): """Renvoie la liste des ports ouverts en TCP OUT pour ce profil""" return self.ouvertureport_set.filter( protocole=OuverturePort.TCP, io=OuverturePort.OUT ) def udp_ports_out(self): """Renvoie la liste des ports ouverts en UDP OUT pour ce profil""" return self.ouvertureport_set.filter( protocole=OuverturePort.UDP, io=OuverturePort.OUT ) class OuverturePort(RevMixin, AclMixin, models.Model): """ Représente un simple port ou une plage de ports. Les ports de la plage sont compris entre begin et en inclus. Si begin == end alors on ne représente qu'un seul port. On limite les ports entre 0 et 65535, tels que défini par la RFC """ TCP = 'T' UDP = 'U' IN = 'I' OUT = 'O' begin = models.PositiveIntegerField(validators=[MaxValueValidator(65535)]) end = models.PositiveIntegerField(validators=[MaxValueValidator(65535)]) port_list = models.ForeignKey( 'OuverturePortList', on_delete=models.CASCADE ) protocole = models.CharField( max_length=1, choices=( (TCP, 'TCP'), (UDP, 'UDP'), ), default=TCP, ) io = models.CharField( max_length=1, choices=( (IN, 'IN'), (OUT, 'OUT'), ), default=OUT, ) class Meta: verbose_name = _("ports opening") verbose_name = _("ports openings") def __str__(self): if self.begin == self.end: return str(self.begin) return ':'.join([str(self.begin), str(self.end)]) def show_port(self): """Formatage plus joli, alias pour str""" return str(self) @receiver(post_save, sender=Machine) def machine_post_save(**kwargs): """Synchronisation ldap et régen parefeu/dhcp lors de la modification d'une machine""" user = kwargs['instance'].user user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) regen('dhcp') regen('mac_ip_list') @receiver(post_delete, sender=Machine) def machine_post_delete(**kwargs): """Synchronisation ldap et régen parefeu/dhcp lors de la suppression d'une machine""" machine = kwargs['instance'] user = machine.user user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) regen('dhcp') regen('mac_ip_list') @receiver(post_save, sender=Interface) def interface_post_save(**kwargs): """Synchronisation ldap et régen parefeu/dhcp lors de la modification d'une interface""" interface = kwargs['instance'] interface.sync_ipv6() user = interface.machine.user user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) # Regen services regen('dhcp') regen('mac_ip_list') @receiver(post_delete, sender=Interface) def interface_post_delete(**kwargs): """Synchronisation ldap et régen parefeu/dhcp lors de la suppression d'une interface""" interface = kwargs['instance'] user = interface.machine.user user.ldap_sync(base=False, access_refresh=False, mac_refresh=True) @receiver(post_save, sender=IpType) def iptype_post_save(**kwargs): """Generation des objets ip après modification d'un range ip""" iptype = kwargs['instance'] iptype.gen_ip_range() iptype.check_replace_prefixv6() @receiver(post_save, sender=MachineType) def machinetype_post_save(**kwargs): """Mise à jour des interfaces lorsque changement d'attribution d'une machinetype (changement iptype parent)""" machinetype = kwargs['instance'] for interface in machinetype.all_interfaces(): interface.update_type() @receiver(post_save, sender=Domain) def domain_post_save(**_kwargs): """Regeneration dns après modification d'un domain object""" regen('dns') @receiver(post_delete, sender=Domain) def domain_post_delete(**_kwargs): """Regeneration dns après suppression d'un domain object""" regen('dns') @receiver(post_save, sender=Extension) def extension_post_save(**_kwargs): """Regeneration dns après modification d'une extension""" regen('dns') @receiver(post_delete, sender=Extension) def extension_post_selete(**_kwargs): """Regeneration dns après suppression d'une extension""" regen('dns') @receiver(post_save, sender=SOA) def soa_post_save(**_kwargs): """Regeneration dns après modification d'un SOA""" regen('dns') @receiver(post_delete, sender=SOA) def soa_post_delete(**_kwargs): """Regeneration dns après suppresson d'un SOA""" regen('dns') @receiver(post_save, sender=Mx) def mx_post_save(**_kwargs): """Regeneration dns après modification d'un MX""" regen('dns') @receiver(post_delete, sender=Mx) def mx_post_delete(**_kwargs): """Regeneration dns après suppresson d'un MX""" regen('dns') @receiver(post_save, sender=Ns) def ns_post_save(**_kwargs): """Regeneration dns après modification d'un NS""" regen('dns') @receiver(post_delete, sender=Ns) def ns_post_delete(**_kwargs): """Regeneration dns après modification d'un NS""" regen('dns') @receiver(post_save, sender=Txt) def text_post_save(**_kwargs): """Regeneration dns après modification d'un TXT""" regen('dns') @receiver(post_delete, sender=Txt) def text_post_delete(**_kwargs): """Regeneration dns après modification d'un TX""" regen('dns') @receiver(post_save, sender=DName) def dname_post_save(**_kwargs): """Updates the DNS regen after modification of a DName object.""" regen('dns') @receiver(post_delete, sender=DName) def dname_post_delete(**_kwargs): """Updates the DNS regen after deletion of a DName object.""" regen('dns') @receiver(post_save, sender=Srv) def srv_post_save(**_kwargs): """Regeneration dns après modification d'un SRV""" regen('dns') @receiver(post_delete, sender=Srv) def srv_post_delete(**_kwargs): """Regeneration dns après modification d'un SRV""" regen('dns')