mirror of
https://github.com/cisagov/manage.get.gov.git
synced 2025-07-23 03:06:01 +02:00
merge
This commit is contained in:
commit
f7856d52fe
2 changed files with 20 additions and 22 deletions
|
@ -1,12 +1,9 @@
|
||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
from django.test import Client, RequestFactory, TestCase
|
from django.test import Client, RequestFactory
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
from registrar.models.domain_application import DomainApplication
|
from registrar.models.domain_application import DomainApplication
|
||||||
from registrar.models.domain_information import DomainInformation
|
|
||||||
from registrar.models.domain import Domain
|
from registrar.models.domain import Domain
|
||||||
from registrar.models.user import User
|
|
||||||
from django.contrib.auth import get_user_model
|
|
||||||
from registrar.utility.csv_export import (
|
from registrar.utility.csv_export import (
|
||||||
get_sliced_domains,
|
get_sliced_domains,
|
||||||
get_sliced_requests,
|
get_sliced_requests,
|
||||||
|
@ -23,7 +20,7 @@ from django.conf import settings
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
import boto3_mocking
|
import boto3_mocking
|
||||||
from registrar.utility.s3_bucket import S3ClientError, S3ClientErrorCodes # type: ignore
|
from registrar.utility.s3_bucket import S3ClientError, S3ClientErrorCodes # type: ignore
|
||||||
from datetime import date, datetime, timedelta
|
from datetime import datetime
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from .common import MockDb, MockEppLib, less_console_noise
|
from .common import MockDb, MockEppLib, less_console_noise
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ def parse_domain_row(columns, domain_info: DomainInformation, security_emails_di
|
||||||
|
|
||||||
# Domain should never be none when parsing this information
|
# Domain should never be none when parsing this information
|
||||||
if domain_info.domain is None:
|
if domain_info.domain is None:
|
||||||
|
logger.error("Attemting to parse row for csv exports but Domain is none in a DomainInfo")
|
||||||
raise ValueError("Domain is none")
|
raise ValueError("Domain is none")
|
||||||
|
|
||||||
domain = domain_info.domain # type: ignore
|
domain = domain_info.domain # type: ignore
|
||||||
|
@ -127,16 +128,6 @@ def _get_security_emails(sec_contact_ids):
|
||||||
|
|
||||||
return security_emails_dict
|
return security_emails_dict
|
||||||
|
|
||||||
|
|
||||||
def update_columns_with_domain_managers(columns, max_dm_count):
|
|
||||||
"""
|
|
||||||
Update the columns list to include "Domain manager email {#}" headers
|
|
||||||
based on the maximum domain manager count.
|
|
||||||
"""
|
|
||||||
for i in range(1, max_dm_count + 1):
|
|
||||||
columns.append(f"Domain manager email {i}")
|
|
||||||
|
|
||||||
|
|
||||||
def write_domains_csv(
|
def write_domains_csv(
|
||||||
writer,
|
writer,
|
||||||
columns,
|
columns,
|
||||||
|
@ -162,16 +153,26 @@ def write_domains_csv(
|
||||||
# Reduce the memory overhead when performing the write operation
|
# Reduce the memory overhead when performing the write operation
|
||||||
paginator = Paginator(all_domain_infos, 1000)
|
paginator = Paginator(all_domain_infos, 1000)
|
||||||
|
|
||||||
if get_domain_managers and len(all_domain_infos) > 0:
|
# The maximum amount of domain managers an account has
|
||||||
# We want to get the max amont of domain managers an
|
# We get the max so we can set the column header accurately
|
||||||
# account has to set the column header dynamically
|
max_dm_count = 0
|
||||||
max_dm_count = max(len(domain_info.domain.permissions.all()) for domain_info in all_domain_infos)
|
total_body_rows = []
|
||||||
update_columns_with_domain_managers(columns, max_dm_count)
|
|
||||||
|
|
||||||
for page_num in paginator.page_range:
|
for page_num in paginator.page_range:
|
||||||
rows = []
|
rows = []
|
||||||
page = paginator.page(page_num)
|
page = paginator.page(page_num)
|
||||||
for domain_info in page.object_list:
|
for domain_info in page.object_list:
|
||||||
|
|
||||||
|
# Get count of all the domain managers for an account
|
||||||
|
if get_domain_managers:
|
||||||
|
dm_count = domain_info.domain.permissions.count()
|
||||||
|
if dm_count > max_dm_count:
|
||||||
|
max_dm_count = dm_count
|
||||||
|
for i in range(1, max_dm_count + 1):
|
||||||
|
column_name = f"Domain manager email {i}"
|
||||||
|
if column_name not in columns:
|
||||||
|
columns.append(column_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers)
|
row = parse_domain_row(columns, domain_info, security_emails_dict, get_domain_managers)
|
||||||
rows.append(row)
|
rows.append(row)
|
||||||
|
@ -180,10 +181,11 @@ def write_domains_csv(
|
||||||
# It indicates that DomainInformation.domain is None.
|
# It indicates that DomainInformation.domain is None.
|
||||||
logger.error("csv_export -> Error when parsing row, domain was None")
|
logger.error("csv_export -> Error when parsing row, domain was None")
|
||||||
continue
|
continue
|
||||||
|
total_body_rows.extend(rows)
|
||||||
|
|
||||||
if should_write_header:
|
if should_write_header:
|
||||||
write_header(writer, columns)
|
write_header(writer, columns)
|
||||||
writer.writerows(rows)
|
writer.writerows(total_body_rows)
|
||||||
|
|
||||||
|
|
||||||
def get_requests(filter_condition, sort_fields):
|
def get_requests(filter_condition, sort_fields):
|
||||||
|
@ -254,7 +256,6 @@ def write_requests_csv(
|
||||||
|
|
||||||
if should_write_header:
|
if should_write_header:
|
||||||
write_header(writer, columns)
|
write_header(writer, columns)
|
||||||
|
|
||||||
writer.writerows(rows)
|
writer.writerows(rows)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue