8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-11-16 00:13:12 +00:00
re2o/machines/models.py
2019-11-25 15:25:52 +01:00

2219 lines
79 KiB
Python

# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2016-2018 Gabriel Détraz
# Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle
# Copyright © 2018 Charlie Jacomme
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""machines.models
The models definitions for the Machines app
"""
from __future__ import unicode_literals
import base64
import hashlib
import re
from datetime import timedelta
from ipaddress import IPv6Address
from itertools import chain
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import Q
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.forms import ValidationError
from django.utils import timezone
from django.db import transaction
from reversion import revisions as reversion
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from macaddress.fields import MACAddressField, default_dialect
from netaddr import (
mac_bare,
EUI,
NotRegisteredError,
IPSet,
IPRange,
IPNetwork,
IPAddress,
)
import preferences.models
import users.models
from re2o.field_permissions import FieldPermissionModelMixin
from re2o.mixins import AclMixin, RevMixin
class Machine(RevMixin, FieldPermissionModelMixin, models.Model):
""" Class définissant une machine, object parent user, objets fils
interfaces"""
user = models.ForeignKey("users.User", on_delete=models.CASCADE)
name = models.CharField(
max_length=255, help_text=_("Optional."), blank=True, null=True
)
active = models.BooleanField(default=True)
class Meta:
permissions = (
("view_machine", _("Can view a machine object")),
("change_machine_user", _("Can change the user of a machine")),
)
verbose_name = _("machine")
verbose_name_plural = _("machines")
@classmethod
def get_instance(cls, machineid, *_args, **_kwargs):
"""Get the Machine instance with machineid.
:param userid: The id
:return: The user
"""
return cls.objects.get(pk=machineid)
def linked_objects(self):
"""Return linked objects : machine and domain.
Usefull in history display"""
return chain(
self.interface_set.all(),
Domain.objects.filter(interface_parent__in=self.interface_set.all()),
)
@staticmethod
def can_change_user(user_request, *_args, **_kwargs):
"""Checks if an user is allowed to change the user who owns a
Machine.
Args:
user_request: The user requesting to change owner.
Returns:
A tuple with a boolean stating if edition is allowed and an
explanation message.
"""
can = user_request.has_perm("machines.change_machine_user")
return (
can,
_("You don't have the right to change the machine's user.")
if not can
else None,
("machines.change_machine_user",),
)
@staticmethod
def can_view_all(user_request, *_args, **_kwargs):
"""Vérifie qu'on peut bien afficher l'ensemble des machines,
droit particulier correspondant
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if not user_request.has_perm("machines.view_machine"):
return (
False,
_("You don't have the right to view all the machines."),
("machines.view_machine",),
)
return True, None, None
@staticmethod
def can_create(user_request, userid, *_args, **_kwargs):
"""Vérifie qu'un user qui fait la requète peut bien créer la machine
et n'a pas atteint son quota, et crée bien une machine à lui
:param user_request: Utilisateur qui fait la requête
:param userid: id de l'user dont on va créer une machine
:return: soit True, soit False avec la raison de l'échec"""
try:
user = users.models.User.objects.get(pk=userid)
except users.models.User.DoesNotExist:
return False, _("Nonexistent user."), None
max_lambdauser_interfaces = preferences.models.OptionalMachine.get_cached_value(
"max_lambdauser_interfaces"
)
if not user_request.has_perm("machines.add_machine"):
if not (
preferences.models.OptionalMachine.get_cached_value("create_machine")
):
return (
False,
_("You don't have the right to add a machine."),
("machines.add_machine",),
)
if user != user_request:
return (
False,
_("You don't have the right to add a machine to another"
" user."),
("machines.add_machine",),
)
if user.user_interfaces().count() >= max_lambdauser_interfaces:
return (
False,
_(
"You reached the maximum number of interfaces"
" that you are allowed to create yourself"
" (%s)." % max_lambdauser_interfaces
),
None,
)
return True, None, None
def can_edit(self, user_request, *args, **kwargs):
"""Vérifie qu'on peut bien éditer cette instance particulière (soit
machine de soi, soit droit particulier
:param self: instance machine à éditer
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison le cas échéant"""
if self.user != user_request:
can_user, _message, permissions = self.user.can_edit(
self.user, user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_interface") and can_user):
return (
False,
_("You don't have the right to edit a machine of another"
" user."),
("machines.change_interface",) + permissions,
)
return True, None, None
def can_delete(self, user_request, *args, **kwargs):
"""Vérifie qu'on peut bien supprimer cette instance particulière (soit
machine de soi, soit droit particulier
:param self: instance machine à supprimer
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if self.user != user_request:
can_user, _message, permissions = self.user.can_edit(
self.user, user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_interface") and can_user):
return (
False,
_(
"You don't have the right to delete a machine"
" of another user."
),
("machines.change_interface",) + permissions,
)
return True, None, None
def can_view(self, user_request, *_args, **_kwargs):
"""Vérifie qu'on peut bien voir cette instance particulière (soit
machine de soi, soit droit particulier
:param self: instance machine à éditer
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if (
not user_request.has_perm("machines.view_machine")
and self.user != user_request
):
return (
False,
_("You don't have the right to view other machines than"
" yours."),
("machines.view_machine",),
)
return True, None, None
@cached_property
def short_name(self):
"""Par defaut, renvoie le nom de la première interface
de cette machine"""
interfaces_set = self.interface_set.first()
if interfaces_set:
return str(interfaces_set.domain.name)
else:
return _("No name")
@cached_property
def complete_name(self):
"""Par defaut, renvoie le nom de la première interface
de cette machine"""
return str(self.interface_set.first())
@cached_property
def all_short_names(self):
"""Renvoie de manière unique, le nom des interfaces de cette
machine"""
return (
Domain.objects.filter(interface_parent__machine=self)
.values_list("name", flat=True)
.distinct()
)
@cached_property
def get_name(self):
"""Return a name : user provided name or first interface name"""
return self.name or self.short_name
@classmethod
def mass_delete(cls, machine_queryset):
"""Mass delete for machine queryset"""
from topologie.models import AccessPoint
Domain.objects.filter(
cname__interface_parent__machine__in=machine_queryset
)._raw_delete(machine_queryset.db)
Domain.objects.filter(
interface_parent__machine__in=machine_queryset
)._raw_delete(machine_queryset.db)
Ipv6List.objects.filter(interface__machine__in=machine_queryset)._raw_delete(
machine_queryset.db
)
Interface.objects.filter(machine__in=machine_queryset).filter(
port_lists__isnull=False
).delete()
Interface.objects.filter(machine__in=machine_queryset)._raw_delete(
machine_queryset.db
)
AccessPoint.objects.filter(machine_ptr__in=machine_queryset)._raw_delete(
machine_queryset.db
)
machine_queryset._raw_delete(machine_queryset.db)
@cached_property
def all_complete_names(self):
"""Renvoie tous les tls complets de la machine"""
return [
str(domain)
for domain in Domain.objects.filter(
Q(cname__interface_parent__machine=self)
| Q(interface_parent__machine=self)
)
]
def __init__(self, *args, **kwargs):
super(Machine, self).__init__(*args, **kwargs)
self.field_permissions = {"user": self.can_change_user}
def __str__(self):
return str(self.user) + " - " + str(self.id) + " - " + str(self.name)
class MachineType(RevMixin, AclMixin, models.Model):
""" Type de machine, relié à un type d'ip, affecté aux interfaces"""
name = models.CharField(max_length=255)
ip_type = models.ForeignKey(
"IpType", on_delete=models.PROTECT, blank=True, null=True
)
class Meta:
permissions = (
("view_machinetype", _("Can view a machine type object")),
("use_all_machinetype", _("Can use all machine types")),
)
verbose_name = _("machine type")
verbose_name_plural = _("machine types")
def all_interfaces(self):
""" Renvoie toutes les interfaces (cartes réseaux) de type
machinetype"""
return Interface.objects.filter(machine_type=self)
@staticmethod
def can_use_all(user_request, *_args, **_kwargs):
"""Check if an user can use every MachineType.
Args:
user_request: The user requesting edition.
Returns:
A tuple with a boolean stating if user can acces and an explanation
message is acces is not allowed.
"""
if not user_request.has_perm("machines.use_all_machinetype"):
return (
False,
_("You don't have the right to use all machine types."),
("machines.use_all_machinetype",),
)
return True, None, None
def __str__(self):
return self.name
class IpType(RevMixin, AclMixin, models.Model):
""" Type d'ip, définissant un range d'ip, affecté aux machine types"""
name = models.CharField(max_length=255)
extension = models.ForeignKey("Extension", on_delete=models.PROTECT)
need_infra = models.BooleanField(default=False)
domaine_ip_start = models.GenericIPAddressField(protocol="IPv4")
domaine_ip_stop = models.GenericIPAddressField(protocol="IPv4")
domaine_ip_network = models.GenericIPAddressField(
protocol="IPv4",
null=True,
blank=True,
help_text=_("Network containing the domain's IPv4 range (optional)."),
)
domaine_ip_netmask = models.IntegerField(
default=24,
validators=[MaxValueValidator(31), MinValueValidator(8)],
help_text=_("Netmask for the domain's IPv4 range."),
)
reverse_v4 = models.BooleanField(
default=False, help_text=_("Enable reverse DNS for IPv4.")
)
prefix_v6 = models.GenericIPAddressField(protocol="IPv6", null=True, blank=True)
prefix_v6_length = models.IntegerField(
default=64, validators=[MaxValueValidator(128), MinValueValidator(0)]
)
reverse_v6 = models.BooleanField(
default=False, help_text=_("Enable reverse DNS for IPv6.")
)
vlan = models.ForeignKey("Vlan", on_delete=models.PROTECT, blank=True, null=True)
ouverture_ports = models.ForeignKey("OuverturePortList", blank=True, null=True)
class Meta:
permissions = (
("view_iptype", _("Can view an IP type object")),
("use_all_iptype", _("Can use all IP types")),
)
verbose_name = _("IP type")
verbose_name_plural = _("IP types")
@cached_property
def ip_range(self):
""" Renvoie un objet IPRange à partir de l'objet IpType"""
return IPRange(self.domaine_ip_start, end=self.domaine_ip_stop)
@cached_property
def ip_set(self):
""" Renvoie une IPSet à partir de l'iptype"""
return IPSet(self.ip_range)
@cached_property
def ip_set_as_str(self):
""" Renvoie une liste des ip en string"""
return [str(x) for x in self.ip_set]
@cached_property
def ip_set_cidrs_as_str(self):
"""Renvoie la liste des cidrs du range en str"""
return [str(ip_range) for ip_range in self.ip_set.iter_cidrs()]
@cached_property
def ip_set_full_info(self):
"""Iter sur les range cidr, et renvoie network, broacast , etc"""
return [
{
"network": str(ip_set.network),
"netmask": str(ip_set.netmask),
"netmask_cidr": str(ip_set.prefixlen),
"broadcast": str(ip_set.broadcast),
"vlan": str(self.vlan),
"vlan_id": self.vlan.vlan_id,
}
for ip_set in self.ip_set.iter_cidrs()
]
@cached_property
def ip6_set_full_info(self):
if self.prefix_v6:
return {
"network": str(self.prefix_v6),
"netmask": "ffff:ffff:ffff:ffff::",
"netmask_cidr": str(self.prefix_v6_length),
"vlan": str(self.vlan),
"vlan_id": self.vlan.vlan_id,
}
else:
return None
@cached_property
def ip_network(self):
"""Renvoie le network parent du range start-stop, si spécifié
Différent de ip_set_cidrs ou iP_set, car lui est supérieur ou égal"""
if self.domaine_ip_network:
return IPNetwork(
str(self.domaine_ip_network) + "/" + str(self.domaine_ip_netmask)
)
return None
@cached_property
def ip_net_full_info(self):
"""Renvoie les infos du network contenant du range"""
if self.ip_network:
return {
"network": str(self.ip_network.network),
"netmask": str(self.ip_network.netmask),
"broadcast": str(self.ip_network.broadcast),
"netmask_cidr": str(self.ip_network.prefixlen),
"vlan": str(self.vlan),
"vlan_id": self.vlan.vlan_id,
}
else:
return None
@cached_property
def complete_prefixv6(self):
"""Return the complete prefix v6 as cidr"""
return str(self.prefix_v6) + "/" + str(self.prefix_v6_length)
def ip_objects(self):
""" Renvoie tous les objets ipv4 relié à ce type"""
return IpList.objects.filter(ip_type=self)
def free_ip(self):
""" Renvoie toutes les ip libres associées au type donné (self)"""
return IpList.objects.filter(interface__isnull=True).filter(ip_type=self)
def gen_ip_range(self):
""" Cree les IpList associées au type self. Parcours pédestrement et
crée les ip une par une. Si elles existent déjà, met à jour le type
associé à l'ip"""
# Creation du range d'ip dans les objets iplist
ip_obj = [IpList(ip_type=self, ipv4=str(ip)) for ip in self.ip_range]
listes_ip = IpList.objects.filter(ipv4__in=[str(ip) for ip in self.ip_range])
# Si il n'y a pas d'ip, on les crée
if not listes_ip:
IpList.objects.bulk_create(ip_obj)
# Sinon on update l'ip_type
else:
listes_ip.update(ip_type=self)
return
def del_ip_range(self):
""" Methode dépréciée, IpList est en mode cascade et supprimé
automatiquement"""
if Interface.objects.filter(ipv4__in=self.ip_objects()):
raise ValidationError(
_(
"One or several IP addresses from the"
" range are affected, impossible to delete"
" the range."
)
)
for ip in self.ip_objects():
ip.delete()
def check_replace_prefixv6(self):
"""Remplace les prefixv6 des interfaces liées à ce type d'ip"""
if not self.prefix_v6:
return
else:
for ipv6 in Ipv6List.objects.filter(
interface__in=Interface.objects.filter(
machine_type__in=MachineType.objects.filter(ip_type=self)
)
):
ipv6.check_and_replace_prefix(prefix=self.prefix_v6)
def get_associated_ptr_records(self):
from re2o.utils import all_active_assigned_interfaces
if self.reverse_v4:
return (
all_active_assigned_interfaces()
.filter(machine_type__ip_type=self)
.filter(ipv4__isnull=False)
)
else:
return None
def get_associated_ptr_v6_records(self):
from re2o.utils import all_active_interfaces
if self.reverse_v6:
return all_active_interfaces(full=True).filter(machine_type__ip_type=self)
else:
return None
def clean(self):
""" Nettoyage. Vérifie :
- Que ip_stop est après ip_start
- Qu'on ne crée pas plus gros qu'un /16
- Que le range crée ne recoupe pas un range existant
- Formate l'ipv6 donnée en /64"""
if IPAddress(self.domaine_ip_start) > IPAddress(self.domaine_ip_stop):
raise ValidationError(_("Range end must be after range start..."))
# On ne crée pas plus grand qu'un /16
if self.ip_range.size > 65536:
raise ValidationError(
_(
"The range is too large, you can't create"
" a larger one than a /16."
)
)
# On check que les / ne se recoupent pas
for element in IpType.objects.all().exclude(pk=self.pk):
if not self.ip_set.isdisjoint(element.ip_set):
raise ValidationError(
_("The specified range is not disjoint from existing"
" ranges.")
)
# On formate le prefix v6
if self.prefix_v6:
self.prefix_v6 = str(IPNetwork(self.prefix_v6 + "/64").network)
# On vérifie qu'un domaine network/netmask contiens bien le domaine ip start-stop
if self.domaine_ip_network:
if (
not self.domaine_ip_start in self.ip_network
or not self.domaine_ip_stop in self.ip_network
):
raise ValidationError(
_(
"If you specify a domain network or"
" netmask, it must contain the"
" domain's IP range."
)
)
return
def save(self, *args, **kwargs):
self.clean()
super(IpType, self).save(*args, **kwargs)
@staticmethod
def can_use_all(user_request, *_args, **_kwargs):
"""Superdroit qui permet d'utiliser toutes les extensions sans
restrictions
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
return (
user_request.has_perm("machines.use_all_iptype"),
None,
("machines.use_all_iptype",),
)
def __str__(self):
return self.name
class Vlan(RevMixin, AclMixin, models.Model):
""" Un vlan : vlan_id et nom
On limite le vlan id entre 0 et 4096, comme défini par la norme"""
vlan_id = models.PositiveIntegerField(validators=[MaxValueValidator(4095)])
name = models.CharField(max_length=256)
comment = models.CharField(max_length=256, blank=True)
# Réglages supplémentaires
arp_protect = models.BooleanField(default=False)
dhcp_snooping = models.BooleanField(default=False)
dhcpv6_snooping = models.BooleanField(default=False)
igmp = models.BooleanField(default=False, help_text=_("v4 multicast management."))
mld = models.BooleanField(default=False, help_text=_("v6 multicast management."))
class Meta:
permissions = (("view_vlan", _("Can view a VLAN object")),)
verbose_name = _("VLAN")
verbose_name_plural = _("VLANs")
def __str__(self):
return self.name
class Nas(RevMixin, AclMixin, models.Model):
""" Les nas. Associé à un machine_type.
Permet aussi de régler le port_access_mode (802.1X ou mac-address) pour
le radius. Champ autocapture de la mac à true ou false"""
default_mode = "802.1X"
AUTH = (("802.1X", "802.1X"), ("Mac-address", _("MAC-address")))
name = models.CharField(max_length=255, unique=True)
nas_type = models.ForeignKey(
"MachineType", on_delete=models.PROTECT, related_name="nas_type"
)
machine_type = models.ForeignKey(
"MachineType", on_delete=models.PROTECT, related_name="machinetype_on_nas"
)
port_access_mode = models.CharField(
choices=AUTH, default=default_mode, max_length=32
)
autocapture_mac = models.BooleanField(default=False)
class Meta:
permissions = (("view_nas", _("Can view a NAS device object")),)
verbose_name = _("NAS device")
verbose_name_plural = _("NAS devices")
def __str__(self):
return self.name
class SOA(RevMixin, AclMixin, models.Model):
"""
Un enregistrement SOA associé à une extension
Les valeurs par défault viennent des recommandations RIPE :
https://www.ripe.net/publications/docs/ripe-203
"""
name = models.CharField(max_length=255)
mail = models.EmailField(help_text=_("Contact email address for the zone."))
refresh = models.PositiveIntegerField(
default=86400, # 24 hours
help_text=_(
"Seconds before the secondary DNS have to ask the primary"
" DNS serial to detect a modification."
),
)
retry = models.PositiveIntegerField(
default=7200, # 2 hours
help_text=_(
"Seconds before the secondary DNS ask the serial again in"
" case of a primary DNS timeout."
),
)
expire = models.PositiveIntegerField(
default=3600000, # 1000 hours
help_text=_(
"Seconds before the secondary DNS stop answering requests"
" in case of primary DNS timeout."
),
)
ttl = models.PositiveIntegerField(
default=172800, help_text=_("Time To Live.") # 2 days
)
class Meta:
permissions = (("view_soa", _("Can view an SOA record object")),)
verbose_name = _("SOA record")
verbose_name_plural = _("SOA records")
def __str__(self):
return str(self.name)
@cached_property
def dns_soa_param(self):
"""
Renvoie la partie de l'enregistrement SOA correspondant aux champs :
<refresh> ; refresh
<retry> ; retry
<expire> ; expire
<ttl> ; TTL
"""
return (
" {refresh}; refresh\n"
" {retry}; retry\n"
" {expire}; expire\n"
" {ttl}; TTL"
).format(
refresh=str(self.refresh).ljust(12),
retry=str(self.retry).ljust(12),
expire=str(self.expire).ljust(12),
ttl=str(self.ttl).ljust(12),
)
@cached_property
def dns_soa_mail(self):
""" Renvoie le mail dans l'enregistrement SOA """
mail_fields = str(self.mail).split("@")
return mail_fields[0].replace(".", "\\.") + "." + mail_fields[1] + "."
@classmethod
def new_default_soa(cls):
""" Fonction pour créer un SOA par défaut, utile pour les nouvelles
extensions .
/!\ Ne jamais supprimer ou renommer cette fonction car elle est
utilisée dans les migrations de la BDD. """
return cls.objects.get_or_create(
name=_("SOA to edit"), mail="postmaster@example.com"
)[0].pk
class Extension(RevMixin, AclMixin, models.Model):
""" Extension dns type example.org. Précise si tout le monde peut
l'utiliser, associé à un origin (ip d'origine)"""
name = models.CharField(
max_length=255,
unique=True,
help_text=_("Zone name, must begin with a dot (.example.org)."),
)
need_infra = models.BooleanField(default=False)
origin = models.ForeignKey(
"IpList",
on_delete=models.PROTECT,
blank=True,
null=True,
help_text=_("A record associated with the zone."),
)
origin_v6 = models.GenericIPAddressField(
protocol="IPv6",
null=True,
blank=True,
help_text=_("AAAA record associated with the zone."),
)
soa = models.ForeignKey("SOA", on_delete=models.CASCADE)
dnssec = models.BooleanField(
default=False, help_text=_("Should the zone be signed with DNSSEC.")
)
class Meta:
permissions = (
("view_extension", _("Can view an extension object")),
("use_all_extension", _("Can use all extensions")),
)
verbose_name = _("DNS extension")
verbose_name_plural = _("DNS extensions")
@cached_property
def dns_entry(self):
""" Une entrée DNS A et AAAA sur origin (zone self)"""
entry = ""
if self.origin:
entry += "@ IN A " + str(self.origin)
if self.origin_v6:
if entry:
entry += "\n"
entry += "@ IN AAAA " + str(self.origin_v6)
return entry
def get_associated_sshfp_records(self):
from re2o.utils import all_active_assigned_interfaces
return (
all_active_assigned_interfaces()
.filter(machine_type__ip_type__extension=self)
.filter(machine__id__in=SshFp.objects.values("machine"))
)
def get_associated_a_records(self):
from re2o.utils import all_active_assigned_interfaces
return (
all_active_assigned_interfaces()
.filter(machine_type__ip_type__extension=self)
.filter(ipv4__isnull=False)
)
def get_associated_aaaa_records(self):
from re2o.utils import all_active_interfaces
return all_active_interfaces(full=True).filter(
machine_type__ip_type__extension=self
)
def get_associated_cname_records(self):
from re2o.utils import all_active_assigned_interfaces
return (
Domain.objects.filter(extension=self)
.filter(cname__interface_parent__in=all_active_assigned_interfaces())
.prefetch_related("cname")
)
def get_associated_dname_records(self):
return DName.objects.filter(alias=self)
@staticmethod
def can_use_all(user_request, *_args, **_kwargs):
"""Superdroit qui permet d'utiliser toutes les extensions sans
restrictions
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
can = user_request.has_perm("machines.use_all_extension")
return (
can,
_("You don't have the right to use all extensions.") if not can else None,
("machines.use_all_extension",),
)
def __str__(self):
return self.name
def clean(self, *args, **kwargs):
if self.name and self.name[0] != ".":
raise ValidationError(_("An extension must begin with a dot."))
super(Extension, self).clean(*args, **kwargs)
class Mx(RevMixin, AclMixin, models.Model):
""" Entrées des MX. Enregistre la zone (extension) associée et la
priorité
Todo : pouvoir associer un MX à une interface """
zone = models.ForeignKey("Extension", on_delete=models.PROTECT)
priority = models.PositiveIntegerField()
name = models.ForeignKey("Domain", on_delete=models.PROTECT)
ttl = models.PositiveIntegerField(
verbose_name=_("Time To Live (TTL)"), default=172800 # 2 days
)
class Meta:
permissions = (("view_mx", _("Can view an MX record object")),)
verbose_name = _("MX record")
verbose_name_plural = _("MX records")
@cached_property
def dns_entry(self):
"""Renvoie l'entrée DNS complète pour un MX à mettre dans les
fichiers de zones"""
return "@ IN MX {prior} {name}".format(
prior=str(self.priority).ljust(3), name=str(self.name)
)
def __str__(self):
return str(self.zone) + " " + str(self.priority) + " " + str(self.name)
class Ns(RevMixin, AclMixin, models.Model):
"""Liste des enregistrements name servers par zone considéérée"""
zone = models.ForeignKey("Extension", on_delete=models.PROTECT)
ns = models.ForeignKey("Domain", on_delete=models.PROTECT)
ttl = models.PositiveIntegerField(
verbose_name=_("Time To Live (TTL)"), default=172800 # 2 days
)
class Meta:
permissions = (("view_ns", _("Can view an NS record object")),)
verbose_name = _("NS record")
verbose_name_plural = _("NS records")
@cached_property
def dns_entry(self):
"""Renvoie un enregistrement NS complet pour les filezones"""
return "@ IN NS " + str(self.ns)
def __str__(self):
return str(self.zone) + " " + str(self.ns)
class Txt(RevMixin, AclMixin, models.Model):
""" Un enregistrement TXT associé à une extension"""
zone = models.ForeignKey("Extension", on_delete=models.PROTECT)
field1 = models.CharField(max_length=255)
field2 = models.TextField(max_length=2047)
ttl = models.PositiveIntegerField(
verbose_name=_("Time To Live (TTL)"), default=172800 # 2 days
)
class Meta:
permissions = (("view_txt", _("Can view a TXT record object")),)
verbose_name = _("TXT record")
verbose_name_plural = _("TXT records")
def __str__(self):
return str(self.zone) + " : " + str(self.field1) + " " + str(self.field2)
@cached_property
def dns_entry(self):
"""Renvoie l'enregistrement TXT complet pour le fichier de zone"""
return str(self.field1).ljust(15) + " IN TXT " + str(self.field2)
class DName(RevMixin, AclMixin, models.Model):
"""A DNAME entry for the DNS."""
zone = models.ForeignKey("Extension", on_delete=models.PROTECT)
alias = models.CharField(max_length=255)
ttl = models.PositiveIntegerField(
verbose_name=_("Time To Live (TTL)"), default=172800 # 2 days
)
class Meta:
permissions = (("view_dname", _("Can view a DNAME record object")),)
verbose_name = _("DNAME record")
verbose_name_plural = _("DNAME records")
def __str__(self):
return str(self.zone) + " : " + str(self.alias)
@cached_property
def dns_entry(self):
"""Returns the DNAME record for the DNS zone file."""
return str(self.alias).ljust(15) + " IN DNAME " + str(self.zone)
class Srv(RevMixin, AclMixin, models.Model):
""" A SRV record """
TCP = "TCP"
UDP = "UDP"
service = models.CharField(max_length=31)
protocole = models.CharField(
max_length=3, choices=((TCP, "TCP"), (UDP, "UDP")), default=TCP
)
extension = models.ForeignKey("Extension", on_delete=models.PROTECT)
ttl = models.PositiveIntegerField(
default=172800, help_text=_("Time To Live.") # 2 days
)
priority = models.PositiveIntegerField(
default=0,
validators=[MaxValueValidator(65535)],
help_text=_(
"Priority of the target server (positive integer value,"
" the lower it is, the more the server will be used if"
" available)."
),
)
weight = models.PositiveIntegerField(
default=0,
validators=[MaxValueValidator(65535)],
help_text=_(
"Relative weight for records with the same priority"
" (integer value between 0 and 65535)."
),
)
port = models.PositiveIntegerField(
validators=[MaxValueValidator(65535)], help_text=_("TCP/UDP port.")
)
target = models.ForeignKey(
"Domain", on_delete=models.PROTECT, help_text=_("Target server.")
)
class Meta:
permissions = (("view_srv", _("Can view an SRV record object")),)
verbose_name = _("SRV record")
verbose_name_plural = _("SRV records")
def __str__(self):
return (
str(self.service)
+ " "
+ str(self.protocole)
+ " "
+ str(self.extension)
+ " "
+ str(self.priority)
+ " "
+ str(self.weight)
+ str(self.port)
+ str(self.target)
)
@cached_property
def dns_entry(self):
"""Renvoie l'enregistrement SRV complet pour le fichier de zone"""
return (
str(self.service)
+ "._"
+ str(self.protocole).lower()
+ str(self.extension)
+ ". "
+ str(self.ttl)
+ " IN SRV "
+ str(self.priority)
+ " "
+ str(self.weight)
+ " "
+ str(self.port)
+ " "
+ str(self.target)
+ "."
)
class SshFp(RevMixin, AclMixin, models.Model):
"""A fingerprint of an SSH public key"""
ALGO = (
("ssh-rsa", "ssh-rsa"),
("ssh-ed25519", "ssh-ed25519"),
("ecdsa-sha2-nistp256", "ecdsa-sha2-nistp256"),
("ecdsa-sha2-nistp384", "ecdsa-sha2-nistp384"),
("ecdsa-sha2-nistp521", "ecdsa-sha2-nistp521"),
)
machine = models.ForeignKey("Machine", on_delete=models.CASCADE)
pub_key_entry = models.TextField(help_text=_("SSH public key."), max_length=2048)
algo = models.CharField(choices=ALGO, max_length=32)
comment = models.CharField(
help_text=_("Comment."), max_length=255, null=True, blank=True
)
@cached_property
def algo_id(self):
"""Return the id of the algorithm for this key"""
if "ecdsa" in self.algo:
return 3
elif "rsa" in self.algo:
return 1
else:
return 2
@cached_property
def hash(self):
"""Return the hashess for the pub key with correct id
cf RFC, 1 is sha1 , 2 sha256"""
return {
"1": hashlib.sha1(base64.b64decode(self.pub_key_entry)).hexdigest(),
"2": hashlib.sha256(base64.b64decode(self.pub_key_entry)).hexdigest(),
}
class Meta:
permissions = (("view_sshfp", _("Can view an SSHFP record object")),)
verbose_name = _("SSHFP record")
verbose_name_plural = _("SSHFP records")
def can_view(self, user_request, *_args, **_kwargs):
return self.machine.can_view(user_request, *_args, **_kwargs)
def can_edit(self, user_request, *args, **kwargs):
return self.machine.can_edit(user_request, *args, **kwargs)
def can_delete(self, user_request, *args, **kwargs):
return self.machine.can_delete(user_request, *args, **kwargs)
def __str__(self):
return str(self.algo) + " " + str(self.comment)
class Interface(RevMixin, AclMixin, FieldPermissionModelMixin, models.Model):
""" Une interface. Objet clef de l'application machine :
- une address mac unique. Possibilité de la rendre unique avec le
typemachine
- une onetoone vers IpList pour attribution ipv4
- le type parent associé au range ip et à l'extension
- un objet domain associé contenant son nom
- la liste des ports oiuvert"""
ipv4 = models.OneToOneField(
"IpList", on_delete=models.PROTECT, blank=True, null=True
)
mac_address = MACAddressField(integer=False)
machine = models.ForeignKey("Machine", on_delete=models.CASCADE)
machine_type = models.ForeignKey("MachineType", on_delete=models.PROTECT)
details = models.CharField(max_length=255, blank=True)
port_lists = models.ManyToManyField("OuverturePortList", blank=True)
class Meta:
permissions = (
("view_interface", _("Can view an interface object")),
("change_interface_machine", _("Can change the owner of an interface")),
)
verbose_name = _("interface")
verbose_name_plural = _("interfaces")
@cached_property
def is_active(self):
""" Renvoie si une interface doit avoir accès ou non """
machine = self.machine
user = self.machine.user
return machine.active and user.has_access()
@cached_property
def ipv6_slaac(self):
""" Renvoie un objet type ipv6 à partir du prefix associé à
l'iptype parent"""
if self.machine_type.ip_type.prefix_v6:
return EUI(self.mac_address).ipv6(
IPNetwork(self.machine_type.ip_type.prefix_v6).network
)
else:
return None
@cached_property
def gen_ipv6_dhcpv6(self):
"""Cree une ip, à assigner avec dhcpv6 sur une machine"""
prefix_v6 = self.machine_type.ip_type.prefix_v6.encode().decode("utf-8")
if not prefix_v6:
return None
return IPv6Address(
IPv6Address(prefix_v6).exploded[:20] + IPv6Address(self.id).exploded[20:]
)
@cached_property
def get_vendor(self):
"""Retourne le vendeur associé à la mac de l'interface"""
mac = EUI(self.mac_address)
try:
oui = mac.oui
vendor = oui.registration().org
except NotRegisteredError:
vendor = _("Unknown vendor.")
return vendor
def sync_ipv6_dhcpv6(self):
"""Affecte une ipv6 dhcpv6 calculée à partir de l'id de la machine"""
ipv6_dhcpv6 = self.gen_ipv6_dhcpv6
if not ipv6_dhcpv6:
return
ipv6 = Ipv6List.objects.filter(ipv6=str(ipv6_dhcpv6)).first()
if not ipv6:
ipv6 = Ipv6List(ipv6=str(ipv6_dhcpv6))
ipv6.interface = self
ipv6.save()
return
def sync_ipv6_slaac(self):
"""Cree, mets à jour et supprime si il y a lieu l'ipv6 slaac associée
à la machine
Sans prefixe ipv6, on return
Si l'ip slaac n'est pas celle qu'elle devrait être, on maj"""
ipv6_slaac = self.ipv6_slaac
if not ipv6_slaac:
return
ipv6_object = Ipv6List.objects.filter(interface=self, slaac_ip=True).first()
if not ipv6_object:
ipv6_object = Ipv6List(interface=self, slaac_ip=True)
if ipv6_object.ipv6 != str(ipv6_slaac):
ipv6_object.ipv6 = str(ipv6_slaac)
ipv6_object.save()
def sync_ipv6(self):
"""Cree et met à jour l'ensemble des ipv6 en fonction du mode choisi"""
if preferences.models.OptionalMachine.get_cached_value("ipv6_mode") == "SLAAC":
self.sync_ipv6_slaac()
elif (
preferences.models.OptionalMachine.get_cached_value("ipv6_mode") == "DHCPV6"
):
self.sync_ipv6_dhcpv6()
else:
return
def ipv6(self):
""" Renvoie le queryset de la liste des ipv6
On renvoie l'ipv6 slaac que si le mode slaac est activé
(et non dhcpv6)"""
if preferences.models.OptionalMachine.get_cached_value("ipv6_mode") == "SLAAC":
return self.ipv6list.all()
elif (
preferences.models.OptionalMachine.get_cached_value("ipv6_mode") == "DHCPV6"
):
return self.ipv6list.filter(slaac_ip=False)
else:
return []
def mac_bare(self):
""" Formatage de la mac type mac_bare"""
return str(EUI(self.mac_address, dialect=mac_bare)).lower()
def filter_macaddress(self):
""" Tente un formatage mac_bare, si échoue, lève une erreur de
validation"""
try:
self.mac_address = str(EUI(self.mac_address, dialect=default_dialect()))
except:
raise ValidationError(_("The given MAC address is invalid."))
def assign_ipv4(self):
""" Assigne une ip à l'interface """
free_ips = self.machine_type.ip_type.free_ip()
if free_ips:
self.ipv4 = free_ips[0]
else:
raise ValidationError(
_("There are no IP addresses available in the slash.")
)
return
def unassign_ipv4(self):
""" Sans commentaire, désassigne une ipv4"""
self.ipv4 = None
@classmethod
def mass_unassign_ipv4(cls, interface_list):
"""Unassign ipv4 to multiple interfaces"""
with transaction.atomic(), reversion.create_revision():
interface_list.update(ipv4=None)
reversion.set_comment("IPv4 unassignment")
@classmethod
def mass_assign_ipv4(cls, interface_list):
for interface in interface_list:
with transaction.atomic(), reversion.create_revision():
interface.assign_ipv4()
interface.save()
reversion.set_comment("IPv4 assignment")
def update_type(self):
""" Lorsque le machinetype est changé de type d'ip, on réassigne"""
self.clean()
self.save()
def has_private_ip(self):
""" True si l'ip associée est privée"""
if self.ipv4:
return IPAddress(str(self.ipv4)).is_private()
else:
return False
def may_have_port_open(self):
""" True si l'interface a une ip et une ip publique.
Permet de ne pas exporter des ouvertures sur des ip privées
(useless)"""
return self.ipv4 and not self.has_private_ip()
def clean(self, *args, **kwargs):
""" Formate l'addresse mac en mac_bare (fonction filter_mac)
et assigne une ipv4 dans le bon range si inexistante ou incohérente"""
# If type was an invalid value, django won't create an attribute type
# but try clean() as we may be able to create it from another value
# so even if the error as yet been detected at this point, django
# continues because the error might not prevent us from creating the
# instance.
# But in our case, it's impossible to create a type value so we raise
# the error.
if not hasattr(self, "machine_type"):
raise ValidationError(_("The selected IP type is invalid."))
self.filter_macaddress()
if not self.ipv4 or self.machine_type.ip_type != self.ipv4.ip_type:
self.assign_ipv4()
super(Interface, self).clean(*args, **kwargs)
def validate_unique(self, *args, **kwargs):
super(Interface, self).validate_unique(*args, **kwargs)
interfaces_similar = Interface.objects.filter(
mac_address=self.mac_address,
machine_type__ip_type=self.machine_type.ip_type,
)
if interfaces_similar and interfaces_similar.first() != self:
raise ValidationError(
_("MAC address already registered in this machine type/subnet.")
)
def save(self, *args, **kwargs):
self.filter_macaddress()
# On verifie la cohérence en forçant l'extension par la méthode
if self.ipv4:
if self.machine_type.ip_type != self.ipv4.ip_type:
raise ValidationError(
_("The IPv4 address and the machine type don't match.")
)
self.validate_unique()
super(Interface, self).save(*args, **kwargs)
@staticmethod
def can_create(user_request, machineid, *_args, **_kwargs):
"""Verifie que l'user a les bons droits infra pour créer
une interface, ou bien que la machine appartient bien à l'user
:param macineid: Id de la machine parente de l'interface
:param user_request: instance utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
try:
machine = Machine.objects.get(pk=machineid)
except Machine.DoesNotExist:
return False, _("Nonexistent machine."), None
if not user_request.has_perm("machines.add_interface"):
if not (
preferences.models.OptionalMachine.get_cached_value("create_machine")
):
return False, _("You don't have the right to add a machine."), ("machines.add_interface",)
max_lambdauser_interfaces = preferences.models.OptionalMachine.get_cached_value(
"max_lambdauser_interfaces"
)
if machine.user != user_request:
return (
False,
_(
"You don't have the right to add an interface"
" to a machine of another user."
),
("machines.add_interface",),
)
if machine.user.user_interfaces().count() >= max_lambdauser_interfaces:
return (
False,
_(
"You reached the maximum number of interfaces"
" that you are allowed to create yourself"
" (%s)." % max_lambdauser_interfaces
),
("machines.add_interface",),
)
return True, None, None
@staticmethod
def can_change_machine(user_request, *_args, **_kwargs):
"""Check if a user can change the machine associated with an
Interface object """
can = user_request.has_perm("machines.change_interface_machine")
return (
can,
_("You don't have the right to edit the machine.") if not can else None,
("machines.change_interface_machine",),
)
def can_edit(self, user_request, *args, **kwargs):
"""Verifie que l'user a les bons droits infra pour editer
cette instance interface, ou qu'elle lui appartient
:param self: Instance interface à editer
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if self.machine.user != user_request:
can_user, _message, permissions = self.machine.user.can_edit(
user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_interface") and can_user):
return (
False,
_("You don't have the right to edit a machine of another"
" user."),
("machines.change_interface",) + permissions,
)
return True, None, None
def can_delete(self, user_request, *args, **kwargs):
"""Verifie que l'user a les bons droits delete object pour del
cette instance interface, ou qu'elle lui appartient
:param self: Instance interface à del
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if self.machine.user != user_request:
can_user, _message, permissions = self.machine.user.can_edit(
user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_interface") and can_user):
return (
False,
_("You don't have the right to edit a machine of another"
" user."),
("machines.change_interface",) + permissions,
)
return True, None, None
def can_view(self, user_request, *_args, **_kwargs):
"""Vérifie qu'on peut bien voir cette instance particulière avec
droit view objet ou qu'elle appartient à l'user
:param self: instance interface à voir
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if (
not user_request.has_perm("machines.view_interface")
and self.machine.user != user_request
):
return (
False,
_("You don't have the right to view machines other than yours."),
("machines.view_interface",),
)
return True, None, None
def __init__(self, *args, **kwargs):
super(Interface, self).__init__(*args, **kwargs)
self.field_permissions = {"machine": self.can_change_machine}
def __str__(self):
try:
domain = self.domain
except:
domain = None
return str(domain)
class Ipv6List(RevMixin, AclMixin, FieldPermissionModelMixin, models.Model):
""" A list of IPv6 """
ipv6 = models.GenericIPAddressField(protocol="IPv6")
interface = models.ForeignKey(
"Interface", on_delete=models.CASCADE, related_name="ipv6list"
)
slaac_ip = models.BooleanField(default=False)
class Meta:
permissions = (
("view_ipv6list", _("Can view an IPv6 addresses list object")),
(
"change_ipv6list_slaac_ip",
_("Can change the SLAAC value of an IPv6 addresses list"),
),
)
verbose_name = _("IPv6 addresses list")
verbose_name_plural = _("IPv6 addresses lists")
@staticmethod
def can_create(user_request, interfaceid, *_args, **_kwargs):
"""Verifie que l'user a les bons droits infra pour créer
une ipv6, ou possède l'interface associée
:param interfaceid: Id de l'interface associée à cet objet domain
:param user_request: instance utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
try:
interface = Interface.objects.get(pk=interfaceid)
except Interface.DoesNotExist:
return False, _("Nonexistent interface."), None
if not user_request.has_perm("machines.add_ipv6list"):
if interface.machine.user != user_request:
return (
False,
_(
"You don't have the right to add an alias to a"
" machine of another user."
),
("machines.add_ipv6list",),
)
return True, None, None
@staticmethod
def can_change_slaac_ip(user_request, *_args, **_kwargs):
""" Check if a user can change the slaac value """
can = user_request.has_perm("machines.change_ipv6list_slaac_ip")
return (
can,
_("You don't have the right to change the SLAAC value of an IPv6 address.")
if not can
else None,
("machines.change_ipv6list_slaac_ip",),
)
def can_edit(self, user_request, *args, **kwargs):
"""Verifie que l'user a les bons droits infra pour editer
cette instance interface, ou qu'elle lui appartient
:param self: Instance interface à editer
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if self.interface.machine.user != user_request:
can_user, _message, permissions = self.interface.machine.user.can_edit(
user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_ipv6list") and can_user):
return (
False,
_("You don't have the right to edit a machine of another user."),
("machines.change_ipv6list",),
)
return True, None, None
def can_delete(self, user_request, *args, **kwargs):
"""Verifie que l'user a les bons droits delete object pour del
cette instance interface, ou qu'elle lui appartient
:param self: Instance interface à del
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if self.interface.machine.user != user_request:
can_user, _message, permissions = self.interface.machine.user.can_edit(
user_request, *args, **kwargs
)
if not (user_request.has_perm("machines.change_ipv6list") and can_user):
return (
False,
_("You don't have the right to edit a machine of another user."),
("machines.change_ipv6list",) + permissions,
)
return True, None, None
def can_view(self, user_request, *_args, **_kwargs):
"""Vérifie qu'on peut bien voir cette instance particulière avec
droit view objet ou qu'elle appartient à l'user
:param self: instance interface à voir
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if (
not user_request.has_perm("machines.view_ipv6list")
and self.interface.machine.user != user_request
):
return (
False,
_("You don't have the right to view machines other than yours."),
("machines.view_ipv6list",),
)
return True, None, None
def __init__(self, *args, **kwargs):
super(Ipv6List, self).__init__(*args, **kwargs)
self.field_permissions = {"slaac_ip": self.can_change_slaac_ip}
def check_and_replace_prefix(self, prefix=None):
"""Si le prefixe v6 est incorrect, on maj l'ipv6"""
prefix_v6 = prefix or self.interface.machine_type.ip_type.prefix_v6.encode().decode(
"utf-8"
)
if not prefix_v6:
return
if (
IPv6Address(self.ipv6.encode().decode("utf-8")).exploded[:20]
!= IPv6Address(prefix_v6).exploded[:20]
):
self.ipv6 = IPv6Address(
IPv6Address(prefix_v6).exploded[:20]
+ IPv6Address(self.ipv6.encode().decode("utf-8")).exploded[20:]
)
self.save()
def clean(self, *args, **kwargs):
if self.slaac_ip and (
Ipv6List.objects.filter(interface=self.interface, slaac_ip=True).exclude(
id=self.id
)
):
raise ValidationError(_("A SLAAC IP address is already registered."))
try:
prefix_v6 = self.interface.machine_type.ip_type.prefix_v6.encode().decode(
"utf-8"
)
except AttributeError: # Prevents from crashing when there is no defined prefix_v6
prefix_v6 = None
if prefix_v6:
if (
IPv6Address(self.ipv6.encode().decode("utf-8")).exploded[:20]
!= IPv6Address(prefix_v6).exploded[:20]
):
raise ValidationError(
_(
"The v6 prefix is incorrect and"
" doesn't match the type associated"
" with the machine."
)
)
super(Ipv6List, self).clean(*args, **kwargs)
def save(self, *args, **kwargs):
"""Force à avoir appellé clean avant"""
self.full_clean()
super(Ipv6List, self).save(*args, **kwargs)
def __str__(self):
return str(self.ipv6)
class Domain(RevMixin, AclMixin, FieldPermissionModelMixin, models.Model):
""" Objet domain. Enregistrement A et CNAME en même temps : permet de
stocker les alias et les nom de machines, suivant si interface_parent
ou cname sont remplis"""
interface_parent = models.OneToOneField(
"Interface", on_delete=models.CASCADE, blank=True, null=True
)
name = models.CharField(
help_text=_("Mandatory and unique, must not contain dots."), max_length=255
)
extension = models.ForeignKey("Extension", on_delete=models.PROTECT)
cname = models.ForeignKey(
"self", null=True, blank=True, related_name="related_domain"
)
ttl = models.PositiveIntegerField(
verbose_name=_("Time To Live (TTL)"),
default=0 # 0 means that the re2o-service for DNS should retrieve the
# default TTL
)
class Meta:
unique_together = (("name", "extension"),)
permissions = (
("view_domain", _("Can view a domain object")),
("change_ttl", _("Can change the TTL of a domain object")),
)
verbose_name = _("domain")
verbose_name_plural = _("domains")
def get_extension(self):
""" Retourne l'extension de l'interface parente si c'est un A
Retourne l'extension propre si c'est un cname, renvoie None sinon"""
if self.interface_parent:
return self.interface_parent.machine_type.ip_type.extension
elif hasattr(self, "extension"):
return self.extension
else:
return None
def clean(self):
""" Validation :
- l'objet est bien soit A soit CNAME
- le cname est pas pointé sur lui-même
- le nom contient bien les caractères autorisés par la norme
dns et moins de 63 caractères au total
- le couple nom/extension est bien unique"""
if self.get_extension():
self.extension = self.get_extension()
if self.interface_parent and self.cname:
raise ValidationError(_("You can't create a both A and CNAME record."))
if self.cname == self:
raise ValidationError(
_("You can't create a CNAME record pointing to itself.")
)
HOSTNAME_LABEL_PATTERN = re.compile(r"(?!-)[A-Z\d-]+(?<!-)$", re.IGNORECASE)
dns = self.name.lower()
if len(dns) > 63:
raise ValidationError(
_("The domain name %s is too long (over 63 characters).") % dns
)
if not HOSTNAME_LABEL_PATTERN.match(dns):
raise ValidationError(
_("The domain name %s contains forbidden characters.") % dns
)
self.validate_unique()
super(Domain, self).clean()
@cached_property
def dns_entry(self):
""" Une entrée DNS"""
if self.cname:
return "{name} IN CNAME {cname}.".format(
name=str(self.name).ljust(15), cname=str(self.cname)
)
def save(self, *args, **kwargs):
""" Empèche le save sans extension valide.
Force à avoir appellé clean avant"""
if not self.get_extension():
raise ValidationError(_("Invalid extension."))
self.full_clean()
super(Domain, self).save(*args, **kwargs)
@cached_property
def get_source_interface(self):
"""Renvoie l'interface source :
- l'interface reliée si c'est un A
- si c'est un cname, suit le cname jusqu'à atteindre le A
et renvoie l'interface parente
Fonction récursive"""
if self.interface_parent:
return self.interface_parent
else:
return self.cname.get_source_interface
@staticmethod
def can_create(user_request, interfaceid, *_args, **_kwargs):
"""Verifie que l'user a les bons droits infra pour créer
un domain, ou possède l'interface associée
:param interfaceid: Id de l'interface associée à cet objet domain
:param user_request: instance utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
try:
interface = Interface.objects.get(pk=interfaceid)
except Interface.DoesNotExist:
return False, _("Nonexistent interface."), None
if not user_request.has_perm("machines.add_domain"):
max_lambdauser_aliases = preferences.models.OptionalMachine.get_cached_value(
"max_lambdauser_aliases"
)
if interface.machine.user != user_request:
return (
False,
_(
"You don't have the right to add an alias to a"
" machine of another user."
),
("machines.add_domain",),
)
if (
Domain.objects.filter(
cname__in=Domain.objects.filter(
interface_parent__in=(interface.machine.user.user_interfaces())
)
).count()
>= max_lambdauser_aliases
):
return (
False,
_(
"You reached the maximum number of alias that"
" you are allowed to create yourself (%s). "
% max_lambdauser_aliases
),
("machines.add_domain",),
)
return True, None, None
def can_edit(self, user_request, *_args, **_kwargs):
"""Verifie que l'user a les bons droits pour editer
cette instance domain
:param self: Instance domain à editer
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if (
not user_request.has_perm("machines.change_domain")
and self.get_source_interface.machine.user != user_request
):
return (
False,
_(
"You don't have the right to edit an alias of a"
" machine of another user."
),
("machines.change_domain",),
)
return True, None, None
def can_delete(self, user_request, *_args, **_kwargs):
"""Verifie que l'user a les bons droits delete object pour del
cette instance domain, ou qu'elle lui appartient
:param self: Instance domain à del
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if (
not user_request.has_perm("machines.delete_domain")
and self.get_source_interface.machine.user != user_request
):
return (
False,
_(
"You don't have the right to delete an alias of a"
" machine of another user."
),
("machines.delete_domain",),
)
return True, None, None
def can_view(self, user_request, *_args, **_kwargs):
"""Vérifie qu'on peut bien voir cette instance particulière avec
droit view objet ou qu'elle appartient à l'user
:param self: instance domain à voir
:param user_request: instance user qui fait l'edition
:return: True ou False avec la raison de l'échec le cas échéant"""
if (
not user_request.has_perm("machines.view_domain")
and self.get_source_interface.machine.user != user_request
):
return (
False,
_("You don't have the right to view other machines than"
" yours."),
("machines.view_domain",),
)
return True, None, None
@staticmethod
def can_change_ttl(user_request, *_args, **_kwargs):
can = user_request.has_perm("machines.change_ttl")
return (
can,
_("You don't have the right to change the domain's TTL.")
if not can
else None,
("machines.change_ttl",),
)
def __str__(self):
return str(self.name) + str(self.extension)
class IpList(RevMixin, AclMixin, models.Model):
""" A list of IPv4 """
ipv4 = models.GenericIPAddressField(protocol="IPv4", unique=True)
ip_type = models.ForeignKey("IpType", on_delete=models.CASCADE)
class Meta:
permissions = (("view_iplist", _("Can view an IPv4 addresses list object")),)
verbose_name = _("IPv4 addresses list")
verbose_name_plural = _("IPv4 addresses lists")
@cached_property
def need_infra(self):
""" Permet de savoir si un user basique peut assigner cette ip ou
non"""
return self.ip_type.need_infra
def clean(self):
""" Erreur si l'ip_type est incorrect"""
if not str(self.ipv4) in self.ip_type.ip_set_as_str:
raise ValidationError(
_("The IPv4 address and the range of the IP type don't match.")
)
return
def save(self, *args, **kwargs):
self.clean()
super(IpList, self).save(*args, **kwargs)
def __str__(self):
return self.ipv4
class Role(RevMixin, AclMixin, models.Model):
"""Define the role of a machine.
Allow automated generation of the server configuration.
"""
ROLE = (
("dhcp-server", _("DHCP server")),
("switch-conf-server", _("Switches configuration server")),
("dns-recursive-server", _("Recursive DNS server")),
("ntp-server", _("NTP server")),
("radius-server", _("RADIUS server")),
("log-server", _("Log server")),
("ldap-master-server", _("LDAP master server")),
("ldap-backup-server", _("LDAP backup server")),
("smtp-server", _("SMTP server")),
("postgresql-server", _("postgreSQL server")),
("mysql-server", _("mySQL server")),
("sql-client", _("SQL client")),
("gateway", _("Gateway")),
)
role_type = models.CharField(max_length=255, unique=True)
servers = models.ManyToManyField("Interface")
specific_role = models.CharField(choices=ROLE, null=True, blank=True, max_length=32)
class Meta:
permissions = (("view_role", _("Can view a role object")),)
verbose_name = _("server role")
verbose_name_plural = _("server roles")
@classmethod
def interface_for_roletype(cls, roletype):
"""Return interfaces for a roletype"""
return Interface.objects.filter(role=cls.objects.filter(specific_role=roletype))
@classmethod
def all_interfaces_for_roletype(cls, roletype):
"""Return all interfaces for a roletype"""
return Interface.objects.filter(
machine__interface__role=cls.objects.filter(specific_role=roletype)
)
@classmethod
def interface_for_roletype(cls, roletype):
"""Return interfaces for a roletype"""
return Interface.objects.filter(role=cls.objects.filter(specific_role=roletype))
def save(self, *args, **kwargs):
super(Role, self).save(*args, **kwargs)
def __str__(self):
return str(self.role_type)
class Service(RevMixin, AclMixin, models.Model):
""" Definition d'un service (dhcp, dns, etc)"""
service_type = models.CharField(max_length=255, blank=True, unique=True)
min_time_regen = models.DurationField(
default=timedelta(minutes=1),
help_text=_("Minimal time before regeneration of the service."),
)
regular_time_regen = models.DurationField(
default=timedelta(hours=1),
help_text=_("Maximal time before regeneration of the service."),
)
servers = models.ManyToManyField("Interface", through="Service_link")
class Meta:
permissions = (("view_service", _("Can view a service object")),)
verbose_name = _("service to generate (DHCP, DNS, ...)")
verbose_name_plural = _("services to generate (DHCP, DNS, ...)")
def ask_regen(self):
""" Marque à True la demande de régénération pour un service x """
Service_link.objects.filter(service=self).exclude(asked_regen=True).update(
asked_regen=True
)
return
def process_link(self, servers):
""" Django ne peut créer lui meme les relations manytomany avec table
intermediaire explicite"""
for serv in servers.exclude(pk__in=Interface.objects.filter(service=self)):
link = Service_link(service=self, server=serv)
link.save()
Service_link.objects.filter(service=self).exclude(server__in=servers).delete()
return
def save(self, *args, **kwargs):
super(Service, self).save(*args, **kwargs)
def __str__(self):
return str(self.service_type)
def regen(service):
""" Fonction externe pour régérération d'un service, prend un objet service
en arg"""
obj = Service.objects.filter(service_type=service)
if obj:
obj[0].ask_regen()
return
class Service_link(RevMixin, AclMixin, models.Model):
""" Definition du lien entre serveurs et services"""
service = models.ForeignKey("Service", on_delete=models.CASCADE)
server = models.ForeignKey("Interface", on_delete=models.CASCADE)
last_regen = models.DateTimeField(auto_now_add=True)
asked_regen = models.BooleanField(default=False)
class Meta:
permissions = (
("view_service_link", _("Can view a service server link object")),
)
verbose_name = _("link between service and server")
verbose_name_plural = _("links between service and server")
def done_regen(self):
""" Appellé lorsqu'un serveur a regénéré son service"""
self.last_regen = timezone.now()
self.asked_regen = False
self.save()
@property
def need_regen(self):
""" Décide si le temps minimal écoulé est suffisant pour provoquer une
régénération de service"""
return bool(
(
self.asked_regen
and (self.last_regen + self.service.min_time_regen) < timezone.now()
)
or (self.last_regen + self.service.regular_time_regen) < timezone.now()
)
@need_regen.setter
def need_regen(self, value):
"""
Force to set the need_regen value. True means a regen is asked and False
means a regen has been done.
:param value: (bool) The value to set to
"""
if not value:
self.last_regen = timezone.now()
self.asked_regen = value
self.save()
def __str__(self):
return str(self.server) + " " + str(self.service)
class OuverturePortList(RevMixin, AclMixin, models.Model):
"""Liste des ports ouverts sur une interface."""
name = models.CharField(
help_text=_("Name of the ports configuration"), max_length=255
)
class Meta:
permissions = (
("view_ouvertureportlist", _("Can view a ports opening list"
" object")),
)
verbose_name = _("ports opening list")
verbose_name_plural = _("ports opening lists")
def can_delete(self, user_request, *_args, **_kwargs):
"""Verifie que l'user a les bons droits bureau pour delete
cette instance ouvertureportlist
:param self: Instance ouvertureportlist à delete
:param user_request: Utilisateur qui fait la requête
:return: soit True, soit False avec la raison de l'échec"""
if not user_request.has_perm("machines.delete_ouvertureportlist"):
return (
False,
_("You don't have the right to delete a ports opening list."),
("machines.delete_ouvertureportlist",),
)
if self.interface_set.all():
return False, _("This ports opening list is used."), None
return True, None, None
def __str__(self):
return self.name
def tcp_ports_in(self):
"""Renvoie la liste des ports ouverts en TCP IN pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.TCP, io=OuverturePort.IN
)
def udp_ports_in(self):
"""Renvoie la liste des ports ouverts en UDP IN pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.UDP, io=OuverturePort.IN
)
def tcp_ports_out(self):
"""Renvoie la liste des ports ouverts en TCP OUT pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.TCP, io=OuverturePort.OUT
)
def udp_ports_out(self):
"""Renvoie la liste des ports ouverts en UDP OUT pour ce profil"""
return self.ouvertureport_set.filter(
protocole=OuverturePort.UDP, io=OuverturePort.OUT
)
class OuverturePort(RevMixin, AclMixin, models.Model):
"""
Représente un simple port ou une plage de ports.
Les ports de la plage sont compris entre begin et en inclus.
Si begin == end alors on ne représente qu'un seul port.
On limite les ports entre 0 et 65535, tels que défini par la RFC
"""
TCP = "T"
UDP = "U"
IN = "I"
OUT = "O"
begin = models.PositiveIntegerField(validators=[MaxValueValidator(65535)])
end = models.PositiveIntegerField(validators=[MaxValueValidator(65535)])
port_list = models.ForeignKey("OuverturePortList", on_delete=models.CASCADE)
protocole = models.CharField(
max_length=1, choices=((TCP, "TCP"), (UDP, "UDP")), default=TCP
)
io = models.CharField(max_length=1, choices=((IN, "IN"), (OUT, "OUT")), default=OUT)
class Meta:
verbose_name = _("ports opening")
verbose_name_plural = _("ports openings")
def __str__(self):
if self.begin == self.end:
return str(self.begin)
return ":".join([str(self.begin), str(self.end)])
def show_port(self):
"""Formatage plus joli, alias pour str"""
return str(self)
@receiver(post_save, sender=Machine)
def machine_post_save(**kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la modification
d'une machine"""
user = kwargs["instance"].user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
regen("dhcp")
regen("mac_ip_list")
@receiver(post_delete, sender=Machine)
def machine_post_delete(**kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la suppression
d'une machine"""
machine = kwargs["instance"]
user = machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
regen("dhcp")
regen("mac_ip_list")
@receiver(post_save, sender=Interface)
def interface_post_save(**kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la modification
d'une interface"""
interface = kwargs["instance"]
interface.sync_ipv6()
user = interface.machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
# Regen services
regen("dhcp")
regen("mac_ip_list")
@receiver(post_delete, sender=Interface)
def interface_post_delete(**kwargs):
"""Synchronisation ldap et régen parefeu/dhcp lors de la suppression
d'une interface"""
interface = kwargs["instance"]
user = interface.machine.user
user.ldap_sync(base=False, access_refresh=False, mac_refresh=True)
@receiver(post_save, sender=IpType)
def iptype_post_save(**kwargs):
"""Generation des objets ip après modification d'un range ip"""
iptype = kwargs["instance"]
iptype.gen_ip_range()
iptype.check_replace_prefixv6()
@receiver(post_save, sender=MachineType)
def machinetype_post_save(**kwargs):
"""Mise à jour des interfaces lorsque changement d'attribution
d'une machinetype (changement iptype parent)"""
machinetype = kwargs["instance"]
for interface in machinetype.all_interfaces():
interface.update_type()
@receiver(post_save, sender=Domain)
def domain_post_save(**_kwargs):
"""Regeneration dns après modification d'un domain object"""
regen("dns")
@receiver(post_delete, sender=Domain)
def domain_post_delete(**_kwargs):
"""Regeneration dns après suppression d'un domain object"""
regen("dns")
@receiver(post_save, sender=Extension)
def extension_post_save(**_kwargs):
"""Regeneration dns après modification d'une extension"""
regen("dns")
@receiver(post_delete, sender=Extension)
def extension_post_selete(**_kwargs):
"""Regeneration dns après suppression d'une extension"""
regen("dns")
@receiver(post_save, sender=SOA)
def soa_post_save(**_kwargs):
"""Regeneration dns après modification d'un SOA"""
regen("dns")
@receiver(post_delete, sender=SOA)
def soa_post_delete(**_kwargs):
"""Regeneration dns après suppresson d'un SOA"""
regen("dns")
@receiver(post_save, sender=Mx)
def mx_post_save(**_kwargs):
"""Regeneration dns après modification d'un MX"""
regen("dns")
@receiver(post_delete, sender=Mx)
def mx_post_delete(**_kwargs):
"""Regeneration dns après suppresson d'un MX"""
regen("dns")
@receiver(post_save, sender=Ns)
def ns_post_save(**_kwargs):
"""Regeneration dns après modification d'un NS"""
regen("dns")
@receiver(post_delete, sender=Ns)
def ns_post_delete(**_kwargs):
"""Regeneration dns après modification d'un NS"""
regen("dns")
@receiver(post_save, sender=Txt)
def text_post_save(**_kwargs):
"""Regeneration dns après modification d'un TXT"""
regen("dns")
@receiver(post_delete, sender=Txt)
def text_post_delete(**_kwargs):
"""Regeneration dns après modification d'un TX"""
regen("dns")
@receiver(post_save, sender=DName)
def dname_post_save(**_kwargs):
"""Updates the DNS regen after modification of a DName object."""
regen("dns")
@receiver(post_delete, sender=DName)
def dname_post_delete(**_kwargs):
"""Updates the DNS regen after deletion of a DName object."""
regen("dns")
@receiver(post_save, sender=Srv)
def srv_post_save(**_kwargs):
"""Regeneration dns après modification d'un SRV"""
regen("dns")
@receiver(post_delete, sender=Srv)
def srv_post_delete(**_kwargs):
"""Regeneration dns après modification d'un SRV"""
regen("dns")