mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-08-21 16:51:08 +02:00
Add to fixtures and add whitelist logic
This commit is contained in:
parent
42dec38b22
commit
056a3ecf36
3 changed files with 87 additions and 7 deletions
|
@ -6,6 +6,7 @@ from registrar.models import (
|
||||||
User,
|
User,
|
||||||
UserGroup,
|
UserGroup,
|
||||||
)
|
)
|
||||||
|
from registrar.models.allowed_email import AllowedEmail
|
||||||
|
|
||||||
|
|
||||||
fake = Faker()
|
fake = Faker()
|
||||||
|
@ -240,6 +241,11 @@ class UserFixture:
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Additional emails to add to the AllowedEmail whitelist
|
||||||
|
ADDITIONAL_ALLOWED_EMAILS = [
|
||||||
|
"zander.adkinson@ecstech.com"
|
||||||
|
]
|
||||||
|
|
||||||
def load_users(cls, users, group_name, are_superusers=False):
|
def load_users(cls, users, group_name, are_superusers=False):
|
||||||
logger.info(f"Going to load {len(users)} users in group {group_name}")
|
logger.info(f"Going to load {len(users)} users in group {group_name}")
|
||||||
for user_data in users:
|
for user_data in users:
|
||||||
|
@ -264,6 +270,34 @@ class UserFixture:
|
||||||
logger.warning(e)
|
logger.warning(e)
|
||||||
logger.info(f"All users in group {group_name} loaded.")
|
logger.info(f"All users in group {group_name} loaded.")
|
||||||
|
|
||||||
|
def load_allowed_emails(cls, users, additional_emails):
|
||||||
|
"""Populates a whitelist of allowed emails (as defined in this list)"""
|
||||||
|
logger.info(f"Going to load allowed emails for {len(users)} users")
|
||||||
|
if additional_emails:
|
||||||
|
logger.info(f"Going to load {len(additional_emails)} additional allowed emails")
|
||||||
|
|
||||||
|
allowed_emails = []
|
||||||
|
|
||||||
|
# Load user emails
|
||||||
|
for user_data in users:
|
||||||
|
user_email = user_data.get("email")
|
||||||
|
if user_email and user_email not in allowed_emails:
|
||||||
|
allowed_emails.append(AllowedEmail(email=user_email))
|
||||||
|
else:
|
||||||
|
first_name = user_data.get("first_name")
|
||||||
|
last_name = user_data.get("last_name")
|
||||||
|
logger.warning(f"Could not load email for {first_name} {last_name}: No email exists.")
|
||||||
|
|
||||||
|
# Load additional emails
|
||||||
|
for email in additional_emails:
|
||||||
|
allowed_emails.append(AllowedEmail(email=email))
|
||||||
|
|
||||||
|
if allowed_emails:
|
||||||
|
AllowedEmail.objects.bulk_create(allowed_emails)
|
||||||
|
logger.info(f"Loaded {len(allowed_emails)} allowed emails")
|
||||||
|
else:
|
||||||
|
logger.info("No allowed emails to load")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls):
|
def load(cls):
|
||||||
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
# Lumped under .atomic to ensure we don't make redundant DB calls.
|
||||||
|
@ -275,3 +309,7 @@ class UserFixture:
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
|
cls.load_users(cls, cls.ADMINS, "full_access_group", are_superusers=True)
|
||||||
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
|
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")
|
||||||
|
|
||||||
|
# Combine ADMINS and STAFF lists
|
||||||
|
all_users = cls.ADMINS + cls.STAFF
|
||||||
|
cls.load_allowed_emails(cls, all_users, additional_emails=cls.ADDITIONAL_ALLOWED_EMAILS)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
from django.db.models import Q
|
||||||
|
import re
|
||||||
from .utility.time_stamped_model import TimeStampedModel
|
from .utility.time_stamped_model import TimeStampedModel
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,5 +17,33 @@ class AllowedEmail(TimeStampedModel):
|
||||||
max_length=320,
|
max_length=320,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_allowed_email(cls, email):
|
||||||
|
"""Given an email, check if this email exists within our AllowEmail whitelist"""
|
||||||
|
print(f"the email is: {email}")
|
||||||
|
if not email:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Split the email into a local part and a domain part
|
||||||
|
local, domain = email.split('@')
|
||||||
|
|
||||||
|
# Check if there's a '+' in the local part
|
||||||
|
if "+" in local:
|
||||||
|
base_local = local.split("+")[0]
|
||||||
|
base_email = f"{base_local}@{domain}"
|
||||||
|
allowed_emails = cls.objects.filter(email__iexact=base_email)
|
||||||
|
|
||||||
|
# The string must start with the local, and the plus must be a digit
|
||||||
|
# and occur immediately after the local. The domain should still exist in the email.
|
||||||
|
pattern = f'^{re.escape(base_local)}\\+\\d+@{re.escape(domain)}$'
|
||||||
|
|
||||||
|
# If the base email exists AND the email matches our expected regex,
|
||||||
|
# then we can let the email through.
|
||||||
|
return allowed_emails.exists() and re.match(pattern, email)
|
||||||
|
else:
|
||||||
|
# If no '+' exists in the email, just do an exact match
|
||||||
|
allowed_emails = cls.objects.filter(email__iexact=email)
|
||||||
|
return allowed_emails.exists()
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self.email)
|
return str(self.email)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import boto3
|
||||||
import logging
|
import logging
|
||||||
import textwrap
|
import textwrap
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from django.apps import apps
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.template.loader import get_template
|
from django.template.loader import get_template
|
||||||
from email.mime.application import MIMEApplication
|
from email.mime.application import MIMEApplication
|
||||||
|
@ -27,7 +28,7 @@ def send_templated_email(
|
||||||
to_address: str,
|
to_address: str,
|
||||||
bcc_address="",
|
bcc_address="",
|
||||||
context={},
|
context={},
|
||||||
attachment_file: str = None,
|
attachment_file = None,
|
||||||
wrap_email=False,
|
wrap_email=False,
|
||||||
):
|
):
|
||||||
"""Send an email built from a template to one email address.
|
"""Send an email built from a template to one email address.
|
||||||
|
@ -39,9 +40,21 @@ def send_templated_email(
|
||||||
Raises EmailSendingError if SES client could not be accessed
|
Raises EmailSendingError if SES client could not be accessed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if flag_is_active(None, "disable_email_sending") and not settings.IS_PRODUCTION: # type: ignore
|
|
||||||
|
|
||||||
|
if not settings.IS_PRODUCTION: # type: ignore
|
||||||
|
if flag_is_active(None, "disable_email_sending"): # type: ignore
|
||||||
message = "Could not send email. Email sending is disabled due to flag 'disable_email_sending'."
|
message = "Could not send email. Email sending is disabled due to flag 'disable_email_sending'."
|
||||||
raise EmailSendingError(message)
|
raise EmailSendingError(message)
|
||||||
|
else:
|
||||||
|
# Raise an email sending error if these doesn't exist within our whitelist.
|
||||||
|
# If these emails don't exist, this function can handle that elsewhere.
|
||||||
|
AllowedEmail = apps.get_model('registrar', 'AllowedEmail')
|
||||||
|
message = "Could not send email. The email '{}' does not exist within the whitelist."
|
||||||
|
if to_address and not AllowedEmail.is_allowed_email(to_address):
|
||||||
|
raise EmailSendingError(message.format(to_address))
|
||||||
|
if bcc_address and not AllowedEmail.is_allowed_email(bcc_address):
|
||||||
|
raise EmailSendingError(message.format(bcc_address))
|
||||||
|
|
||||||
template = get_template(template_name)
|
template = get_template(template_name)
|
||||||
email_body = template.render(context=context)
|
email_body = template.render(context=context)
|
||||||
|
@ -63,7 +76,7 @@ def send_templated_email(
|
||||||
)
|
)
|
||||||
logger.info(f"An email was sent! Template name: {template_name} to {to_address}")
|
logger.info(f"An email was sent! Template name: {template_name} to {to_address}")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug("E-mail unable to send! Could not access the SES client.")
|
logger.debug("An email was unable to send! Could not access the SES client.")
|
||||||
raise EmailSendingError("Could not access the SES client.") from exc
|
raise EmailSendingError("Could not access the SES client.") from exc
|
||||||
|
|
||||||
destination = {"ToAddresses": [to_address]}
|
destination = {"ToAddresses": [to_address]}
|
||||||
|
@ -71,7 +84,7 @@ def send_templated_email(
|
||||||
destination["BccAddresses"] = [bcc_address]
|
destination["BccAddresses"] = [bcc_address]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if attachment_file is None:
|
if not attachment_file:
|
||||||
# Wrap the email body to a maximum width of 80 characters per line.
|
# Wrap the email body to a maximum width of 80 characters per line.
|
||||||
# Not all email clients support CSS to do this, and our .txt files require parsing.
|
# Not all email clients support CSS to do this, and our .txt files require parsing.
|
||||||
if wrap_email:
|
if wrap_email:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue