Code cleanup, layout cleanup, unit tests

This commit is contained in:
Rachid Mrad 2023-12-20 21:54:13 -05:00
parent 31031d054d
commit cb16f5eb96
No known key found for this signature in database
GPG key ID: EF38E4CEC4A8F3CF
5 changed files with 229 additions and 151 deletions

View file

@ -1,6 +1,6 @@
import csv
import logging
from datetime import datetime
from datetime import date, datetime
from registrar.models.domain import Domain
from registrar.models.domain_information import DomainInformation
from registrar.models.public_contact import PublicContact
@ -11,58 +11,66 @@ from django.utils import timezone
logger = logging.getLogger(__name__)
def export_domains_to_writer(writer, columns, sort_fields, filter_condition, filter_condition_for_additional_domains=None):
def get_domain_infos(filter_condition, sort_fields):
domain_infos = DomainInformation.objects.filter(**filter_condition).order_by(*sort_fields)
return domain_infos
def write_row(writer, columns, domain_info):
security_contacts = domain_info.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
# For linter
ao = " "
if domain_info.authorizing_official:
first_name = domain_info.authorizing_official.first_name or ""
last_name = domain_info.authorizing_official.last_name or ""
ao = first_name + " " + last_name
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain name": domain_info.domain.name,
"Domain type": domain_info.get_organization_type_display() + " - " + domain_info.get_federal_type_display()
if domain_info.federal_type
else domain_info.get_organization_type_display(),
"Agency": domain_info.federal_agency,
"Organization name": domain_info.organization_name,
"City": domain_info.city,
"State": domain_info.state_territory,
"AO": ao,
"AO email": domain_info.authorizing_official.email if domain_info.authorizing_official else " ",
"Security contact email": security_contacts[0].email if security_contacts else " ",
"Status": domain_info.domain.state,
"Expiration date": domain_info.domain.expiration_date,
"Created at": domain_info.domain.created_at,
"Deleted at": domain_info.domain.deleted_at,
}
writer.writerow([FIELDS.get(column, "") for column in columns])
def export_domains_to_writer(writer, columns, sort_fields, filter_condition, sort_fields_for_additional_domains=None, filter_condition_for_additional_domains=None):
"""
Receives params from the parent methods and outputs a CSV with fltered and sorted domains.
The 'additional' params enable us to concatenate 2 different filtered lists.
"""
# write columns headers to writer
writer.writerow(columns)
logger.info('export_domains_to_writer')
logger.info(filter_condition)
logger.info(filter_condition_for_additional_domains)
# Get the domainInfos
domainInfos = DomainInformation.objects.filter(**filter_condition).order_by(*sort_fields)
domainInfos = get_domain_infos(filter_condition, sort_fields)
# Condition is true for export_data_growth_to_csv. This is an OR situation so we can' combine the filters
# in one query.
if filter_condition_for_additional_domains is not None and 'domain__deleted_at__lt' in filter_condition_for_additional_domains:
logger.info("Fetching deleted domains")
deleted_domainInfos = DomainInformation.objects.filter(domain__state=Domain.State.DELETED).order_by("domain__deleted_at")
# Get the deleted domain infos
deleted_domainInfos = get_domain_infos(filter_condition_for_additional_domains, sort_fields_for_additional_domains)
# Combine the two querysets into a single iterable
all_domainInfos = list(chain(domainInfos, deleted_domainInfos))
else:
all_domainInfos = list(domainInfos)
for domainInfo in all_domainInfos:
security_contacts = domainInfo.domain.contacts.filter(contact_type=PublicContact.ContactTypeChoices.SECURITY)
# For linter
ao = " "
if domainInfo.authorizing_official:
first_name = domainInfo.authorizing_official.first_name or ""
last_name = domainInfo.authorizing_official.last_name or ""
ao = first_name + " " + last_name
# create a dictionary of fields which can be included in output
FIELDS = {
"Domain name": domainInfo.domain.name,
"Domain type": domainInfo.get_organization_type_display() + " - " + domainInfo.get_federal_type_display()
if domainInfo.federal_type
else domainInfo.get_organization_type_display(),
"Agency": domainInfo.federal_agency,
"Organization name": domainInfo.organization_name,
"City": domainInfo.city,
"State": domainInfo.state_territory,
"AO": ao,
"AO email": domainInfo.authorizing_official.email if domainInfo.authorizing_official else " ",
"Security contact email": security_contacts[0].email if security_contacts else " ",
"Status": domainInfo.domain.state,
"Expiration date": domainInfo.domain.expiration_date,
"Created at": domainInfo.domain.created_at,
"Deleted at": domainInfo.domain.deleted_at,
}
writer.writerow([FIELDS.get(column, "") for column in columns])
# Write rows to CSV
for domain_info in all_domainInfos:
write_row(writer, columns, domain_info)
def export_data_type_to_csv(csv_file):
"""All domains report with extra columns"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
@ -94,8 +102,9 @@ def export_data_type_to_csv(csv_file):
}
export_domains_to_writer(writer, columns, sort_fields, filter_condition)
def export_data_full_to_csv(csv_file):
"""All domains report"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
@ -125,6 +134,8 @@ def export_data_full_to_csv(csv_file):
def export_data_federal_to_csv(csv_file):
"""Federal domains report"""
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
@ -152,25 +163,38 @@ def export_data_federal_to_csv(csv_file):
],
}
export_domains_to_writer(writer, columns, sort_fields, filter_condition)
def get_default_start_date():
# Default to a date that's prior to our first deployment
return timezone.make_aware(datetime(2023, 11, 1))
def get_default_end_date():
# Default to now()
return timezone.now()
def export_data_growth_to_csv(csv_file, start_date, end_date):
"""
Growth report:
Receive start and end dates from the view, parse them.
Request from export_domains_to_writer READY domains that are created between
the start and end dates, as well as DELETED domains that are deleted between
the start and end dates. Specify sort params for both lists.
"""
if start_date:
start_date_formatted = timezone.make_aware(datetime.strptime(start_date, "%Y-%m-%d"))
else:
# Handle the case where start_date is missing or empty
# Default to a date that's prior to our first deployment
logger.error(f"Error fetching the start date, will default to 12023/1/1")
start_date_formatted = timezone.make_aware(datetime(2023, 11, 1)) # Replace with appropriate handling
start_date_formatted = (
timezone.make_aware(datetime.strptime(start_date, "%Y-%m-%d"))
if start_date
else get_default_start_date()
)
end_date_formatted = (
timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d"))
if end_date
else get_default_end_date()
)
if end_date:
end_date_formatted = timezone.make_aware(datetime.strptime(end_date, "%Y-%m-%d"))
else:
# Handle the case where end_date is missing or empty
logger.error(f"Error fetching the end date, will default to now()")
end_date_formatted = timezone.make_aware(datetime.now()) # Replace with appropriate handling
writer = csv.writer(csv_file)
# define columns to include in export
columns = [
"Domain name",
@ -189,17 +213,20 @@ def export_data_growth_to_csv(csv_file, start_date, end_date):
"domain__name",
]
filter_condition = {
"domain__state__in": [
Domain.State.READY,
],
"domain__state__in": [Domain.State.READY],
"domain__created_at__lt": end_date_formatted,
"domain__created_at__gt": start_date_formatted,
}
# We also want domains deleted between sar and end dates, sorted
sort_fields_for_additional_domains = [
"domain__deleted_at",
"domain__name",
]
filter_condition_for_additional_domains = {
"domain__state__in": [
Domain.State.DELETED,
],
"domain__state__in": [Domain.State.DELETED],
"domain__created_at__lt": end_date_formatted,
"domain__created_at__gt": start_date_formatted,
}
export_domains_to_writer(writer, columns, sort_fields, filter_condition, filter_condition_for_additional_domains)
export_domains_to_writer(writer, columns, sort_fields, filter_condition, sort_fields_for_additional_domains, filter_condition_for_additional_domains)