# Copyright 2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db scopes."""
import re
from typing import Any, Generic, TYPE_CHECKING, TypeAlias, TypeVar, Union, cast
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import QuerySet, UniqueConstraint
from debusine.db.context import context
from debusine.db.models import permissions
from debusine.db.models.permissions import (
PermissionUser,
ROLES,
permission_check,
permission_filter,
)
if TYPE_CHECKING:
from django.contrib.auth.models import AnonymousUser
from django_stubs_ext.db.models import TypedModelMeta
from debusine.db.models.auth import Group, User
else:
TypedModelMeta = object
#: Name of the fallback scope used for transitioning to scoped models
FALLBACK_SCOPE_NAME = "debusine"
#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
(
"accounts",
"admin",
"api",
"api-auth",
"artifact",
"task-status",
"user",
"workers",
"work-request",
"workspace",
)
)
#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_scope_name(value: str) -> bool:
"""Check if value is a valid scope name."""
if value in RESERVED_SCOPE_NAMES:
return False
return bool(scope_name_regex.match(value))
def validate_scope_name(value: str) -> None:
"""Validate scope names."""
if not is_valid_scope_name(value):
raise ValidationError(
"%(value)r is not a valid scope name", params={"value": value}
)
A = TypeVar("A")
class ScopeQuerySet(QuerySet["Scope", A], Generic[A]):
"""Custom QuerySet for Scope."""
@permission_filter
def can_display(
self, user: PermissionUser # noqa: U100
) -> "ScopeQuerySet[A]":
"""Keep only Scopes that can be displayed."""
assert user is not None # Enforced by decorator
return self
@permission_filter
def can_create_workspace(self, user: PermissionUser) -> "ScopeQuerySet[A]":
"""Keep only Scopes that can be displayed."""
assert user is not None # Enforced by decorator
if not user.is_authenticated:
return self.none()
return self.filter(ROLES(user, Scope.Roles.OWNER))
class ScopeManager(models.Manager["Scope"]):
"""Manager for Scope model."""
def get_roles_model(self) -> type["ScopeRole"]:
"""Get the model used for role assignment."""
return ScopeRole
def get_queryset(self) -> ScopeQuerySet[Any]:
"""Use the custom QuerySet."""
return ScopeQuerySet(self.model, using=self._db)
class ScopeRoles(permissions.Roles):
"""Available roles for a Scope."""
OWNER = "owner", "Owner"
[docs]
class Scope(models.Model):
"""
Scope model.
This is used to create different distinct sets of groups and workspaces
"""
Roles: TypeAlias = ScopeRoles
objects = ScopeManager.from_queryset(ScopeQuerySet)()
name = models.CharField(
max_length=255, unique=True, validators=[validate_scope_name]
)
def __str__(self) -> str:
"""Return basic information of Scope."""
return self.name
[docs]
@permission_check
def can_display(self, user: PermissionUser) -> bool: # noqa: U100
"""Check if the scope can be displayed."""
assert user is not None # enforced by decorator
return True
[docs]
@permission_check
def can_create_workspace(self, user: PermissionUser) -> bool:
"""Check if the scope can be displayed."""
assert user is not None # enforced by decorator
# Token is not taken into account here
if not user.is_authenticated:
return False
if self == context.scope:
return Scope.Roles.OWNER in context.scope_roles
return (
Scope.objects.can_create_workspace(user).filter(pk=self.pk).exists()
)
[docs]
def assign_role(
self, role: "ScopeRole.Roles", group: "Group"
) -> "ScopeRole":
"""Assign a role to a group."""
if group.scope != self:
raise ValueError(f"group {group} is not in scope {self}")
scope_role, _ = ScopeRole.objects.get_or_create(
resource=self, group=group, role=role
)
return scope_role
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs]
def get_roles(
self, user: Union["User", "AnonymousUser"]
) -> QuerySet["ScopeRole", "ScopeRoles"]:
"""Get the roles of the user on this scope."""
if not user.is_authenticated:
result = ScopeRole.objects.none().values_list("role", flat=True)
else:
result = (
ScopeRole.objects.filter(resource=self, group__users=user)
.values_list("role", flat=True)
.distinct()
)
# QuerySet sees a CharField, but we know it's a ScopeRoles enum
return cast(QuerySet["ScopeRole", "ScopeRoles"], result)
class ScopeRole(models.Model):
"""Role assignments for scopes."""
Roles: TypeAlias = ScopeRoles
resource = models.ForeignKey(
Scope,
on_delete=models.PROTECT,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.PROTECT,
related_name="scope_roles",
)
role = models.CharField(max_length=16, choices=Roles.choices)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["resource", "group", "role"],
name="%(app_label)s_%(class)s_unique_resource_group_role",
),
]
def __str__(self) -> str:
"""Return a description of the role assignment."""
return f"{self.group}─{self.role}⟶{self.resource}"