8
0
Fork 0
mirror of https://gitlab2.federez.net/re2o/re2o synced 2024-12-24 07:53:47 +00:00
re2o/logs/models.py

687 lines
23 KiB
Python
Raw Normal View History

2020-04-22 16:17:06 +00:00
# -*- mode: python; coding: utf-8 -*-
# Re2o est un logiciel d'administration développé initiallement au rezometz. Il
# se veut agnostique au réseau considéré, de manière à être installable en
# quelques clics.
#
# Copyright © 2020 Jean-Romain Garnier
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""logs.models
The models definitions for the logs app
2020-04-22 16:17:06 +00:00
"""
2020-04-24 13:37:05 +00:00
from reversion.models import Version, Revision
from django.utils.translation import ugettext_lazy as _
from django.contrib.auth.models import Group
2020-04-24 13:37:05 +00:00
from django.db.models import Q
2020-04-22 16:17:06 +00:00
from machines.models import IpList
from machines.models import Interface
from machines.models import Machine
from machines.models import MachineType
2020-04-22 16:17:06 +00:00
from users.models import User
from users.models import Adherent
from users.models import Club
from topologie.models import Room
from topologie.models import Port
2020-04-22 16:17:06 +00:00
from .forms import classes_for_action_type
2020-04-22 16:17:06 +00:00
2020-04-24 14:43:11 +00:00
2020-04-24 13:37:05 +00:00
class ActionsSearch:
def get(self, params):
"""
:param params: dict built by the search view
:return: QuerySet of Revision objects
"""
user = params.get("u", None)
start = params.get("s", None)
end = params.get("e", None)
action_types = params.get("t", None)
2020-04-24 13:37:05 +00:00
query = Q()
if user:
query &= Q(user=user)
if start:
2020-04-24 14:43:11 +00:00
query &= Q(date_created__gte=start)
2020-04-24 13:37:05 +00:00
if end:
2020-04-24 14:43:11 +00:00
query &= Q(date_created__lte=end)
2020-04-24 13:37:05 +00:00
2020-04-24 14:34:36 +00:00
action_models = self.models_for_action_types(action_types)
if action_models:
query &= Q(version__content_type__model__in=action_models)
2020-04-24 13:37:05 +00:00
return (
Revision.objects.all()
.filter(query)
.select_related("user")
.prefetch_related("version_set__object")
)
2020-04-24 14:34:36 +00:00
def models_for_action_types(self, action_types):
if action_types is None:
return None
classes = []
for action_type in action_types:
c = classes_for_action_type(action_type)
# Selecting "all" removes the filter
if c is None:
return None
classes += c
return classes
2020-04-24 13:37:05 +00:00
class MachineHistorySearchEvent:
def __init__(self, user, machine, interface, start=None, end=None):
"""
:param user: User, The user owning the maching at the time of the event
:param machine: Version, the machine version related to the interface
:param interface: Version, the interface targeted by this event
:param start: datetime, the date at which this version was created
:param end: datetime, the date at which this version was replace by a new one
"""
2020-04-22 16:17:06 +00:00
self.user = user
self.machine = machine
self.interface = interface
self.ipv4 = IpList.objects.get(id=interface.field_dict["ipv4_id"]).ipv4
self.mac = self.interface.field_dict["mac_address"]
self.start_date = start
self.end_date = end
self.comment = interface.revision.get_comment() or None
def is_similar(self, elt2):
2020-04-22 16:39:05 +00:00
"""
Checks whether two events are similar enough to be merged
:return: bool
2020-04-22 16:39:05 +00:00
"""
2020-04-22 16:17:06 +00:00
return (
elt2 is not None
and self.user.id == elt2.user.id
and self.ipv4 == elt2.ipv4
and self.machine.field_dict["id"] == elt2.machine.field_dict["id"]
and self.interface.field_dict["id"] == elt2.interface.field_dict["id"]
)
def __repr__(self):
return "{} ({} - ): from {} to {} ({})".format(
self.machine,
self.mac,
self.ipv4,
self.start_date,
self.end_date,
self.comment or "No comment"
)
class MachineHistorySearch:
2020-04-22 16:17:06 +00:00
def __init__(self):
self.events = []
2020-04-23 15:46:30 +00:00
self._last_evt = None
2020-04-22 16:17:06 +00:00
def get(self, search, params):
"""
:param search: ip or mac to lookup
:param params: dict built by the search view
:return: list or None, a list of MachineHistorySearchEvent in reverse chronological order
"""
2020-04-22 16:17:06 +00:00
self.start = params.get("s", None)
self.end = params.get("e", None)
search_type = params.get("t", 0)
self.events = []
if search_type == "ip":
2020-04-23 15:46:30 +00:00
return self._get_by_ip(search)[::-1]
2020-04-22 16:17:06 +00:00
elif search_type == "mac":
2020-04-23 15:46:30 +00:00
return self._get_by_mac(search)[::-1]
2020-04-22 16:17:06 +00:00
return None
2020-04-23 15:46:30 +00:00
def _add_revision(self, user, machine, interface):
2020-04-22 16:39:05 +00:00
"""
Add a new revision to the chronological order
:param user: User, The user owning the maching at the time of the event
:param machine: Version, the machine version related to the interface
:param interface: Version, the interface targeted by this event
2020-04-22 16:39:05 +00:00
"""
evt = MachineHistorySearchEvent(user, machine, interface)
2020-04-22 16:17:06 +00:00
evt.start_date = interface.revision.date_created
2020-04-22 16:39:05 +00:00
# Try not to recreate events if it's unnecessary
2020-04-23 15:46:30 +00:00
if evt.is_similar(self._last_evt):
2020-04-22 16:17:06 +00:00
return
# Mark the end of validity of the last element
2020-04-23 15:46:30 +00:00
if self._last_evt and not self._last_evt.end_date:
self._last_evt.end_date = evt.start_date
2020-04-22 16:17:06 +00:00
# If the event ends before the given date, remove it
if self.start and evt.start_date.date() < self.start:
2020-04-23 15:46:30 +00:00
self._last_evt = None
2020-04-22 16:17:06 +00:00
self.events.pop()
# Make sure the new event starts before the given end date
if self.end and evt.start_date.date() > self.end:
2020-04-22 16:17:06 +00:00
return
# Save the new element
self.events.append(evt)
2020-04-23 15:46:30 +00:00
self._last_evt = evt
2020-04-22 16:17:06 +00:00
2020-04-23 15:46:30 +00:00
def _get_interfaces_for_ip(self, ip):
2020-04-22 16:17:06 +00:00
"""
:param ip: str
:return: An iterable object with the Version objects
of Interfaces with the given IP
2020-04-22 16:17:06 +00:00
"""
2020-04-22 16:39:05 +00:00
# TODO: What if ip list was deleted?
try:
ip_id = IpList.objects.get(ipv4=ip).id
except IpList.DoesNotExist:
return []
2020-04-22 16:17:06 +00:00
return filter(
lambda x: x.field_dict["ipv4_id"] == ip_id,
Version.objects.get_for_model(Interface).order_by("revision__date_created")
)
2020-04-23 15:46:30 +00:00
def _get_interfaces_for_mac(self, mac):
2020-04-22 16:17:06 +00:00
"""
:param mac: str
:return: An iterable object with the Version objects
of Interfaces with the given MAC address
2020-04-22 16:17:06 +00:00
"""
return filter(
lambda x: str(x.field_dict["mac_address"]) == mac,
Version.objects.get_for_model(Interface).order_by("revision__date_created")
)
2020-04-23 15:46:30 +00:00
def _get_machines_for_interface(self, interface):
2020-04-22 16:17:06 +00:00
"""
:param interface: Version, the interface for which to find the machines
:return: An iterable object with the Version objects of Machine to
which the given interface was attributed
2020-04-22 16:17:06 +00:00
"""
machine_id = interface.field_dict["machine_id"]
return filter(
lambda x: x.field_dict["id"] == machine_id,
Version.objects.get_for_model(Machine).order_by("revision__date_created")
)
2020-04-23 15:46:30 +00:00
def _get_user_for_machine(self, machine):
2020-04-22 16:17:06 +00:00
"""
:param machine: Version, the machine of which the owner must be found
:return: The user to which the given machine belongs
2020-04-22 16:17:06 +00:00
"""
# TODO: What if user was deleted?
user_id = machine.field_dict["user_id"]
return User.objects.get(id=user_id)
2020-04-23 15:46:30 +00:00
def _get_by_ip(self, ip):
"""
:param ip: str, The IP to lookup
:returns: list, a list of MachineHistorySearchEvent
"""
2020-04-23 15:46:30 +00:00
interfaces = self._get_interfaces_for_ip(ip)
2020-04-22 16:17:06 +00:00
for interface in interfaces:
2020-04-23 15:46:30 +00:00
machines = self._get_machines_for_interface(interface)
2020-04-22 16:17:06 +00:00
for machine in machines:
2020-04-23 15:46:30 +00:00
user = self._get_user_for_machine(machine)
self._add_revision(user, machine, interface)
2020-04-22 16:17:06 +00:00
return self.events
2020-04-23 15:46:30 +00:00
def _get_by_mac(self, mac):
"""
:param mac: str, The MAC address to lookup
:returns: list, a list of MachineHistorySearchEvent
"""
2020-04-23 15:46:30 +00:00
interfaces = self._get_interfaces_for_mac(mac)
2020-04-22 16:17:06 +00:00
for interface in interfaces:
2020-04-23 15:46:30 +00:00
machines = self._get_machines_for_interface(interface)
2020-04-22 16:17:06 +00:00
for machine in machines:
2020-04-23 15:46:30 +00:00
user = self._get_user_for_machine(machine)
self._add_revision(user, machine, interface)
2020-04-22 16:17:06 +00:00
return self.events
2020-04-23 11:18:16 +00:00
2020-04-23 17:06:59 +00:00
class RelatedHistory:
2020-04-23 18:00:38 +00:00
def __init__(self, name, model_name, object_id):
2020-04-23 17:06:59 +00:00
"""
2020-04-23 18:00:38 +00:00
:param name: Name of this instance
2020-04-23 17:06:59 +00:00
:param model_name: Name of the related model (e.g. "user")
:param object_id: ID of the related object
"""
2020-04-23 18:00:38 +00:00
self.name = "{} (id = {})".format(name, object_id)
self.model_name = model_name
self.object_id = object_id
2020-04-23 17:06:59 +00:00
def __eq__(self, other):
return (
self.model_name == other.model_name
2020-04-23 18:00:38 +00:00
and self.object_id == other.object_id
)
def __hash__(self):
return hash((self.model_name, self.object_id))
2020-04-23 17:06:59 +00:00
2020-04-23 15:39:45 +00:00
class HistoryEvent:
def __init__(self, version, previous_version=None, edited_fields=None):
2020-04-23 11:18:16 +00:00
"""
2020-04-23 15:39:45 +00:00
:param version: Version, the version of the object for this event
:param previous_version: Version, the version of the object before this event
2020-04-23 11:18:16 +00:00
:param edited_fields: list, The list of modified fields by this event
"""
self.version = version
self.previous_version = previous_version
self.edited_fields = edited_fields
self.date = version.revision.date_created
self.performed_by = version.revision.user
self.comment = version.revision.get_comment() or None
2020-04-23 15:46:30 +00:00
def _repr(self, name, value):
2020-04-23 15:39:45 +00:00
"""
Returns the best representation of the given field
:param name: the name of the field
:param value: the value of the field
:return: object
"""
if value is None:
return _("None")
return value
def edits(self, hide=[]):
"""
Build a list of the changes performed during this event
:param hide: list, the list of fields for which not to show details
:return: str
"""
edits = []
for field in self.edited_fields:
if field in hide:
# Don't show sensitive information
edits.append((field, None, None))
else:
edits.append((
field,
2020-04-23 15:46:30 +00:00
self._repr(field, self.previous_version.field_dict[field]),
self._repr(field, self.version.field_dict[field])
2020-04-23 15:39:45 +00:00
))
return edits
class History:
def __init__(self):
self.name = None
2020-04-23 15:39:45 +00:00
self.events = []
2020-04-23 17:06:59 +00:00
self.related = [] # For example, a machine has a list of its interfaces
2020-04-23 15:46:30 +00:00
self._last_version = None
2020-04-23 17:06:59 +00:00
self.event_type = HistoryEvent
def get(self, instance_id, model):
2020-04-23 17:06:59 +00:00
"""
:param instance_id: int, The id of the instance to lookup
:param model: class, The type of object to lookup
2020-04-23 17:06:59 +00:00
:return: list or None, a list of HistoryEvent, in reverse chronological order
"""
self.events = []
# Get all the versions for this instance, with the oldest first
2020-04-23 17:06:59 +00:00
self._last_version = None
interface_versions = filter(
lambda x: x.field_dict["id"] == instance_id,
Version.objects.get_for_model(model).order_by("revision__date_created")
2020-04-23 17:06:59 +00:00
)
for version in interface_versions:
self._add_revision(version)
# Return None if interface_versions was empty
if self._last_version is None:
return None
2020-04-23 17:06:59 +00:00
return self.events[::-1]
2020-04-23 15:39:45 +00:00
2020-04-23 15:46:30 +00:00
def _compute_diff(self, v1, v2, ignoring=[]):
2020-04-23 15:39:45 +00:00
"""
Find the edited field between two versions
:param v1: Version
:param v2: Version
:param ignoring: List, a list of fields to ignore
:return: List of field names
"""
fields = []
for key in v1.field_dict.keys():
if key not in ignoring and v1.field_dict[key] != v2.field_dict[key]:
fields.append(key)
return fields
2020-04-23 17:06:59 +00:00
def _add_revision(self, version):
"""
Add a new revision to the chronological order
:param version: Version, The version of the interface for this event
"""
diff = None
if self._last_version is not None:
diff = self._compute_diff(version, self._last_version)
# Ignore "empty" events
if not diff:
self._last_version = version
return
evt = self.event_type(version, self._last_version, diff)
self.events.append(evt)
self._last_version = version
2020-04-23 15:39:45 +00:00
class UserHistoryEvent(HistoryEvent):
2020-04-23 15:46:30 +00:00
def _repr(self, name, value):
"""
Returns the best representation of the given field
:param name: the name of the field
:param value: the value of the field
:return: object
"""
2020-04-23 12:44:23 +00:00
if name == "groups":
if len(value) == 0:
# Removed all the user's groups
return _("None")
# value is a list of ints
groups = []
for gid in value:
# Try to get the group name, if it's not deleted
try:
2020-04-23 12:44:23 +00:00
groups.append(Group.objects.get(id=gid).name)
except Group.DoesNotExist:
# TODO: Find the group name in the versions?
groups.append("{} ({})".format(_("Deleted"), gid))
2020-04-23 12:44:23 +00:00
return ", ".join(groups)
elif name == "state":
if value is not None:
2020-04-23 12:44:23 +00:00
return User.STATES[value][1]
else:
return _("Unknown")
elif name == "email_state":
if value is not None:
2020-04-23 12:44:23 +00:00
return User.EMAIL_STATES[value][1]
else:
return _("Unknown")
elif name == "room_id" and value is not None:
# Try to get the room name, if it's not deleted
try:
return Room.objects.get(id=value)
except Room.DoesNotExist:
# TODO: Find the room name in the versions?
return "{} ({})".format(_("Deleted"), value)
elif name == "members" or name == "administrators":
if len(value) == 0:
# Removed all the club's members
return _("None")
# value is a list of ints
users = []
for uid in value:
# Try to get the user's name, if theyr're not deleted
try:
users.append(User.objects.get(id=uid).pseudo)
except User.DoesNotExist:
# TODO: Find the user's name in the versions?
2020-04-23 14:13:58 +00:00
users.append("{} ({})".format(_("Deleted"), uid))
return ", ".join(users)
return super(UserHistoryEvent, self)._repr(name, value)
def edits(self, hide=["password", "pwd_ntlm", "gpg_fingerprint"]):
2020-04-23 11:18:16 +00:00
"""
Build a list of the changes performed during this event
:param hide: list, the list of fields for which not to show details
:return: str
"""
2020-04-23 15:39:45 +00:00
return super(UserHistoryEvent, self).edits(hide)
2020-04-23 11:18:16 +00:00
def __eq__(self, other):
return (
2020-04-23 18:30:54 +00:00
self.edited_fields == other.edited_fields
and self.date == other.date
and self.performed_by == other.performed_by
and self.comment == other.comment
)
def __hash__(self):
return hash((frozenset(self.edited_fields), self.date, self.performed_by, self.comment))
2020-04-23 11:18:16 +00:00
def __repr__(self):
return "{} edited fields {} ({})".format(
2020-04-23 11:18:16 +00:00
self.performed_by,
self.edited_fields or "nothing",
self.comment or "No comment"
)
2020-04-23 15:39:45 +00:00
class UserHistory(History):
2020-04-23 11:18:16 +00:00
def __init__(self):
2020-04-23 15:46:30 +00:00
super(UserHistory, self).__init__()
2020-04-23 17:06:59 +00:00
self.event_type = UserHistoryEvent
2020-04-23 11:18:16 +00:00
def get(self, user_id):
2020-04-23 11:18:16 +00:00
"""
:param user_id: int, the id of the user to lookup
2020-04-23 11:18:16 +00:00
:return: list or None, a list of UserHistoryEvent, in reverse chronological order
"""
self.events = []
# Try to find an Adherent object
adherents = filter(
lambda x: x.field_dict["user_ptr_id"] == user_id,
Version.objects.get_for_model(Adherent)
)
obj = next(adherents, None)
model = Adherent
# Fallback on a Club
if obj is None:
clubs = filter(
lambda x: x.field_dict["user_ptr_id"] == user_id,
Version.objects.get_for_model(Club)
)
obj = next(clubs, None)
model = Club
# If nothing was found, abort
if obj is None:
return None
# Add in "related" elements the list of Machine objects
2020-04-23 17:06:59 +00:00
# that were once owned by this user
self.related = filter(
lambda x: x.field_dict["user_id"] == user_id,
Version.objects.get_for_model(Machine).order_by("-revision__date_created")
2020-04-23 17:06:59 +00:00
)
2020-04-23 18:00:38 +00:00
self.related = [RelatedHistory(
m.field_dict["name"] or _("None"),
"machine",
m.field_dict["id"]) for m in self.related]
self.related = list(dict.fromkeys(self.related))
2020-04-23 17:06:59 +00:00
2020-04-23 11:18:16 +00:00
# Get all the versions for this user, with the oldest first
2020-04-23 15:46:30 +00:00
self._last_version = None
2020-04-23 11:18:16 +00:00
user_versions = filter(
lambda x: x.field_dict["id"] == user_id,
2020-04-23 11:18:16 +00:00
Version.objects.get_for_model(User).order_by("revision__date_created")
)
for version in user_versions:
2020-04-23 18:30:54 +00:00
self._add_revision(version)
2020-04-23 11:18:16 +00:00
# Update name
self.name = self._last_version.field_dict["pseudo"]
# Do the same thing for the Adherent of Club
2020-04-23 15:46:30 +00:00
self._last_version = None
obj_versions = filter(
lambda x: x.field_dict["id"] == user_id,
Version.objects.get_for_model(model).order_by("revision__date_created")
)
for version in obj_versions:
2020-04-23 18:30:54 +00:00
self._add_revision(version)
# Remove duplicates and sort
self.events = list(dict.fromkeys(self.events))
return sorted(
self.events,
key=lambda e: e.date,
reverse=True
)
2020-04-23 11:18:16 +00:00
2020-04-23 18:30:54 +00:00
def _add_revision(self, version):
2020-04-23 11:18:16 +00:00
"""
2020-04-23 15:39:45 +00:00
Add a new revision to the chronological order
:param user: User, The user displayed in this history
:param version: Version, The version of the user for this event
2020-04-23 11:18:16 +00:00
"""
2020-04-23 15:39:45 +00:00
diff = None
2020-04-23 15:46:30 +00:00
if self._last_version is not None:
diff = self._compute_diff(
2020-04-23 15:39:45 +00:00
version,
2020-04-23 15:46:30 +00:00
self._last_version,
2020-04-23 15:39:45 +00:00
ignoring=["last_login", "pwd_ntlm", "email_change_date"]
)
2020-04-23 11:18:16 +00:00
2020-04-23 15:39:45 +00:00
# Ignore "empty" events like login
if not diff:
2020-04-23 15:46:30 +00:00
self._last_version = version
2020-04-23 15:39:45 +00:00
return
2020-04-23 11:18:16 +00:00
2020-04-23 18:30:54 +00:00
evt = UserHistoryEvent(version, self._last_version, diff)
2020-04-23 15:39:45 +00:00
self.events.append(evt)
2020-04-23 15:46:30 +00:00
self._last_version = version
2020-04-23 11:18:16 +00:00
2020-04-23 15:39:45 +00:00
2020-04-23 17:06:59 +00:00
class MachineHistoryEvent(HistoryEvent):
def _repr(self, name, value):
"""
Returns the best representation of the given field
:param name: the name of the field
:param value: the value of the field
:return: object
"""
if name == "user_id":
try:
return User.objects.get(id=value).pseudo
except User.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
return super(MachineHistoryEvent, self)._repr(name, value)
class MachineHistory(History):
def __init__(self):
super(MachineHistory, self).__init__()
self.event_type = MachineHistoryEvent
def get(self, machine_id):
2020-04-23 17:06:59 +00:00
# Add as "related" histories the list of Interface objects
# that were once assigned to this machine
self.related = list(filter(
lambda x: x.field_dict["machine_id"] == machine_id,
Version.objects.get_for_model(Interface).order_by("-revision__date_created")
2020-04-23 17:06:59 +00:00
))
# Create RelatedHistory objects and remove duplicates
2020-04-23 18:00:38 +00:00
self.related = [RelatedHistory(
i.field_dict["mac_address"] or _("None"),
"interface",
i.field_dict["id"]) for i in self.related]
self.related = list(dict.fromkeys(self.related))
events = super(MachineHistory, self).get(machine_id, Machine)
# Update name
self.name = self._last_version.field_dict["name"]
return events
2020-04-23 17:06:59 +00:00
2020-04-23 15:39:45 +00:00
class InterfaceHistoryEvent(HistoryEvent):
def _repr(self, name, value):
"""
Returns the best representation of the given field
:param name: the name of the field
:param value: the value of the field
:return: object
"""
if name == "ipv4_id" and value is not None:
try:
return IpList.objects.get(id=value)
except IpList.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "machine_type_id":
try:
return MachineType.objects.get(id=value).name
except MachineType.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "machine_id":
try:
return Machine.objects.get(id=value).get_name() or _("No name")
except Machine.DoesNotExist:
return "{} ({})".format(_("Deleted"), value)
elif name == "port_lists":
if len(value) == 0:
return _("None")
ports = []
for pid in value:
try:
ports.append(Port.objects.get(id=pid).pretty_name())
except Group.DoesNotExist:
ports.append("{} ({})".format(_("Deleted"), pid))
return super(InterfaceHistoryEvent, self)._repr(name, value)
class InterfaceHistory(History):
2020-04-23 17:06:59 +00:00
def __init__(self):
super(InterfaceHistory, self).__init__()
self.event_type = InterfaceHistoryEvent
def get(self, interface_id):
events = super(InterfaceHistory, self).get(interface_id, Interface)
# Update name
self.name = self._last_version.field_dict["mac_address"]
return events