import logging from django.contrib.auth.models import AbstractUser from django.db import models from registrar.models.user_domain_role import UserDomainRole from .domain_invitation import DomainInvitation from .transition_domain import TransitionDomain from .verified_by_staff import VerifiedByStaff from .domain import Domain from phonenumber_field.modelfields import PhoneNumberField # type: ignore logger = logging.getLogger(__name__) class User(AbstractUser): """ A custom user model that performs identically to the default user model but can be customized later. """ # #### Constants for choice fields #### RESTRICTED = "restricted" STATUS_CHOICES = ((RESTRICTED, RESTRICTED),) status = models.CharField( max_length=10, choices=STATUS_CHOICES, default=None, # Set the default value to None null=True, # Allow the field to be null blank=True, # Allow the field to be blank ) domains = models.ManyToManyField( "registrar.Domain", through="registrar.UserDomainRole", related_name="users", ) phone = PhoneNumberField( null=True, blank=True, help_text="Phone", db_index=True, ) def __str__(self): # this info is pulled from Login.gov if self.first_name or self.last_name: return f"{self.first_name or ''} {self.last_name or ''} {self.email or ''}" elif self.email: return self.email else: return self.username def restrict_user(self): self.status = self.RESTRICTED self.save() def unrestrict_user(self): self.status = None self.save() def is_restricted(self): return self.status == self.RESTRICTED @classmethod def needs_identity_verification(cls, email, uuid): """A method used by our oidc classes to test whether a user needs email/uuid verification or the full identity PII verification""" # An existing user who is a domain manager of a domain (that is, # they have an entry in UserDomainRole for their User) try: existing_user = cls.objects.get(username=uuid) if existing_user and UserDomainRole.objects.filter(user=existing_user).exists(): return False except cls.DoesNotExist: # Do nothing when the user is not found, as we're checking for existence. pass except Exception as err: raise err # A new incoming user who is a domain manager for one of the domains # that we inputted from Verisign (that is, their email address appears # in the username field of a TransitionDomain) if TransitionDomain.objects.filter(username=email).exists(): return False # New users flagged by Staff to bypass ial2 if VerifiedByStaff.objects.filter(email=email).exists(): return False # A new incoming user who is being invited to be a domain manager (that is, # their email address is in DomainInvitation for an invitation that is not yet "retrieved"). invited = DomainInvitation.DomainInvitationStatus.INVITED if DomainInvitation.objects.filter(email=email, status=invited).exists(): return False return True def check_domain_invitations_on_login(self): """When a user first arrives on the site, we need to retrieve any domain invitations that match their email address.""" for invitation in DomainInvitation.objects.filter( email__iexact=self.email, status=DomainInvitation.DomainInvitationStatus.INVITED ): try: invitation.retrieve() invitation.save() except RuntimeError: # retrieving should not fail because of a missing user, but # if it does fail, log the error so a new user can continue # logging in logger.warn("Failed to retrieve invitation %s", invitation, exc_info=True) def create_domain_and_invite(self, transition_domain: TransitionDomain): transition_domain_name = transition_domain.domain_name transition_domain_status = transition_domain.status transition_domain_email = transition_domain.username # type safety check. name should never be none if transition_domain_name is not None: new_domain = Domain(name=transition_domain_name, state=transition_domain_status) new_domain.save() # check that a domain invitation doesn't already # exist for this e-mail / Domain pair domain_email_already_in_domain_invites = DomainInvitation.objects.filter( email=transition_domain_email.lower(), domain=new_domain ).exists() if not domain_email_already_in_domain_invites: # Create new domain invitation new_domain_invitation = DomainInvitation(email=transition_domain_email.lower(), domain=new_domain) new_domain_invitation.save() def on_each_login(self): """Callback each time the user is authenticated. When a user arrives on the site each time, we need to retrieve any domain invitations that match their email address. We also need to check if they are logging in with the same e-mail as a transition domain and update our domainInfo objects accordingly. """ self.check_domain_invitations_on_login() class Meta: permissions = [ ("analyst_access_permission", "Analyst Access Permission"), ("full_access_permission", "Full Access Permission"), ]