diff --git a/src/registrar/management/commands/utility/epp_data_containers.py b/src/registrar/management/commands/utility/epp_data_containers.py index 36ec9e19a..6dabb78a2 100644 --- a/src/registrar/management/commands/utility/epp_data_containers.py +++ b/src/registrar/management/commands/utility/epp_data_containers.py @@ -5,7 +5,7 @@ Regarding our dataclasses: Not intended to be used as models but rather as an alternative to storing as a dictionary. By keeping it as a dataclass instead of a dictionary, we can maintain data consistency. """ -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import date from enum import Enum from typing import List, Optional @@ -15,69 +15,69 @@ from typing import List, Optional class AgencyAdhoc: """Defines the structure given in the AGENCY_ADHOC file""" - agencyid: Optional[int] = None - agencyname: Optional[str] = None - active: Optional[str] = None - isfederal: Optional[str] = None + agencyid: Optional[int] = field(default=None, repr=True) + agencyname: Optional[str] = field(default=None, repr=True) + active: Optional[str] = field(default=None, repr=True) + isfederal: Optional[str] = field(default=None, repr=True) @dataclass class DomainAdditionalData: """Defines the structure given in the DOMAIN_ADDITIONAL file""" - domainname: Optional[str] = None - domaintypeid: Optional[int] = None - authorityid: Optional[int] = None - orgid: Optional[int] = None - securitycontactemail: Optional[str] = None - dnsseckeymonitor: Optional[str] = None - domainpurpose: Optional[str] = None + domainname: Optional[str] = field(default=None, repr=True) + domaintypeid: Optional[int] = field(default=None, repr=True) + authorityid: Optional[int] = field(default=None, repr=True) + orgid: Optional[int] = field(default=None, repr=True) + securitycontactemail: Optional[str] = field(default=None, repr=True) + dnsseckeymonitor: Optional[str] = field(default=None, repr=True) + domainpurpose: Optional[str] = field(default=None, repr=True) @dataclass class DomainTypeAdhoc: """Defines the structure given in the DOMAIN_ADHOC file""" - domaintypeid: Optional[int] = None - domaintype: Optional[str] = None - code: Optional[str] = None - active: Optional[str] = None + domaintypeid: Optional[int] = field(default=None, repr=True) + domaintype: Optional[str] = field(default=None, repr=True) + code: Optional[str] = field(default=None, repr=True) + active: Optional[str] = field(default=None, repr=True) @dataclass class OrganizationAdhoc: """Defines the structure given in the ORGANIZATION_ADHOC file""" - orgid: Optional[int] = None - orgname: Optional[str] = None - orgstreet: Optional[str] = None - orgcity: Optional[str] = None - orgstate: Optional[str] = None - orgzip: Optional[str] = None - orgcountrycode: Optional[str] = None + orgid: Optional[int] = field(default=None, repr=True) + orgname: Optional[str] = field(default=None, repr=True) + orgstreet: Optional[str] = field(default=None, repr=True) + orgcity: Optional[str] = field(default=None, repr=True) + orgstate: Optional[str] = field(default=None, repr=True) + orgzip: Optional[str] = field(default=None, repr=True) + orgcountrycode: Optional[str] = field(default=None, repr=True) @dataclass class AuthorityAdhoc: """Defines the structure given in the AUTHORITY_ADHOC file""" - authorityid: Optional[int] = None - firstname: Optional[str] = None - middlename: Optional[str] = None - lastname: Optional[str] = None - email: Optional[str] = None - phonenumber: Optional[str] = None - agencyid: Optional[int] = None - addlinfo: Optional[List[str]] = None + authorityid: Optional[int] = field(default=None, repr=True) + firstname: Optional[str] = field(default=None, repr=True) + middlename: Optional[str] = field(default=None, repr=True) + lastname: Optional[str] = field(default=None, repr=True) + email: Optional[str] = field(default=None, repr=True) + phonenumber: Optional[str] = field(default=None, repr=True) + agencyid: Optional[int] = field(default=None, repr=True) + addlinfo: Optional[List[str]] = field(default=None, repr=True) @dataclass class DomainEscrow: """Defines the structure given in the DOMAIN_ESCROW file""" - domainname: Optional[str] = None - creationdate: Optional[date] = None - expirationdate: Optional[date] = None + domainname: Optional[str] = field(default=None, repr=True) + creationdate: Optional[date] = field(default=None, repr=True) + expirationdate: Optional[date] = field(default=None, repr=True) class EnumFilenames(Enum): diff --git a/src/registrar/management/commands/utility/extra_transition_domain_helper.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py index 111ac7055..97b7b2ffb 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain_helper.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -93,6 +93,11 @@ class FileTransitionLog: self.logs[file_type].append(log) return log + def display_all_logs(self): + """Logs every LogItem contained in this object""" + for file_type in self.logs: + self.display_logs(file_type) + def display_logs(self, file_type): """Displays all logs in the given file_type in EnumFilenames. Will log with the correct severity depending on code. @@ -107,6 +112,21 @@ class FileTransitionLog: logger.info(log.message) case LogCode.DEBUG: logger.debug(log.message) + + def clear_logs(self): + """Clears log information""" + self.logs = { + EnumFilenames.DOMAIN_ADHOC: [], + EnumFilenames.AGENCY_ADHOC: [], + EnumFilenames.ORGANIZATION_ADHOC: [], + EnumFilenames.DOMAIN_ADDITIONAL: [], + EnumFilenames.DOMAIN_ESCROW: [], + } + + def get_logs(self, file_type): + """Grabs the logs associated with + a particular file_type""" + return self.logs.get(file_type) class LoadExtraTransitionDomain: @@ -118,46 +138,90 @@ class LoadExtraTransitionDomain: arguments = options.args_extra_transition_domain() # Reads and parses migration files - self.domain_object = ExtraTransitionDomain(**arguments) - self.domain_object.parse_all_files() + self.parsed_data_container = ExtraTransitionDomain(**arguments) + self.parsed_data_container.parse_all_files() + + def create_update_model_logs(self, file_type): + """Associates runtime logs to the file_type, + such that we can determine where errors occured when + updating a TransitionDomain model.""" + logs = self.parse_logs.get_logs(file_type) + self.parsed_data_container.set_logs(file_type, logs) def update_transition_domain_models(self): """Updates TransitionDomain objects based off the file content - given in self.domain_object""" + given in self.parsed_data_container""" all_transition_domains = TransitionDomain.objects.all() if not all_transition_domains.exists(): raise Exception("No TransitionDomain objects exist.") - for transition_domain in all_transition_domains: - domain_name = transition_domain.domain_name.upper() - updated_transition_domain = transition_domain + try: + for transition_domain in all_transition_domains: + domain_name = transition_domain.domain_name.upper() + updated_transition_domain = transition_domain - # STEP 1: Parse organization data - updated_transition_domain = self.parse_org_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.ORGANIZATION_ADHOC) + # STEP 1: Parse organization data + updated_transition_domain = self.parse_org_data( + domain_name, transition_domain + ) + # Store the event logs + self.create_update_model_logs(EnumFilenames.ORGANIZATION_ADHOC) - # STEP 2: Parse domain type data - updated_transition_domain = self.parse_domain_type_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.DOMAIN_ADHOC) + # STEP 2: Parse domain type data + updated_transition_domain = self.parse_domain_type_data( + domain_name, transition_domain + ) + # Store the event logs + self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC) - # STEP 3: Parse agency data - updated_transition_domain = self.parse_agency_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.AGENCY_ADHOC) + # STEP 3: Parse agency data + updated_transition_domain = self.parse_agency_data( + domain_name, transition_domain + ) + # Store the event logs + self.create_update_model_logs(EnumFilenames.AGENCY_ADHOC) - # STEP 4: Parse creation and expiration data - updated_transition_domain = self.parse_creation_expiration_data( - domain_name, transition_domain - ) - self.parse_logs.display_logs(EnumFilenames.DOMAIN_ESCROW) + # STEP 4: Parse creation and expiration data + updated_transition_domain = self.parse_creation_expiration_data( + domain_name, transition_domain + ) + # Store the event logs + self.create_update_model_logs(EnumFilenames.DOMAIN_ADHOC) - updated_transition_domain.save() + updated_transition_domain.save() + + logger.info(f"Succesfully updated TransitionDomain {domain_name}") + self.parse_logs.clear_logs() + except Exception as err: + logger.error("Could not update all TransitionDomain objects.") + # Regardless of what occurred, log what happened. + logger.info("======Printing log stack======") + self.parse_logs.display_all_logs() + + raise err + else: + self.display_run_summary() + + def display_run_summary(self): + """Prints information about this particular run. + Organizes like data together. + """ + container = self.parsed_data_container + agency_adhoc = container.get_logs_for_type(EnumFilenames.AGENCY_ADHOC) + authority_adhoc = container.get_logs_for_type(EnumFilenames.AUTHORITY_ADHOC) + domain_additional = container.get_logs_for_type(EnumFilenames.DOMAIN_ADDITIONAL) + domain_adhoc = container.get_logs_for_type(EnumFilenames.DOMAIN_ADHOC) + domain_escrow = container.get_logs_for_type(EnumFilenames.DOMAIN_ESCROW) + organization_adhoc = container.get_logs_for_type(EnumFilenames.ORGANIZATION_ADHOC) + variable_data = [] + for file_type in self.parsed_data_container.file_data: + # Grab all logs for + logs = self.parsed_data_container.get_logs_for_type(file_type) + variable_data.append(logs) + #agency_adhoc, authority_adhoc, domain_additional, domain_adhoc, domain_escrow, organization_adhoc = variable_data + + def parse_creation_expiration_data(self, domain_name, transition_domain): """Grabs expiration_date from the parsed files and associates it with a transition_domain object, then returns that object.""" @@ -384,14 +448,14 @@ class LoadExtraTransitionDomain: if not is_update: self.parse_logs.create_log_item( file_type, - LogCode.DEBUG, + LogCode.INFO, f"Added {var_name} as '{changed_value}' on {domain_name}", domain_name, ) else: self.parse_logs.create_log_item( file_type, - LogCode.INFO, + LogCode.WARNING, f"Updated existing {var_name} to '{changed_value}' on {domain_name}", domain_name, ) @@ -501,7 +565,7 @@ class LoadExtraTransitionDomain: """ # Grabs a dict associated with the file_type. # For example, EnumFilenames.DOMAIN_ADDITIONAL. - desired_type = self.domain_object.file_data.get(file_type) + desired_type = self.parsed_data_container.file_data.get(file_type) if desired_type is None: self.parse_logs.create_log_item( file_type, LogCode.ERROR, f"Type {file_type} does not exist" @@ -565,7 +629,7 @@ class PatternMap: # Object data # self.data = {} - self.logs = [] + self.logs = {} def try_infer_filename(self, current_file_name, default_file_name): """Tries to match a given filename to a regex, @@ -790,16 +854,34 @@ class ExtraTransitionDomain: file_type.data = {} def parse_csv_file( - self, file, seperator, dataclass_type, id_field, is_domain_escrow=False + self, + file_type, + file, + seperator, + dataclass_type, + id_field, + is_domain_escrow=False ): # Domain escrow is an edge case if is_domain_escrow: - return self._read_domain_escrow(file, seperator) + item_to_return = self._read_domain_escrow( + file_type, + file, + seperator + ) + return item_to_return else: - return self._read_csv_file(file, seperator, dataclass_type, id_field) + item_to_return = self._read_csv_file( + file_type, + file, + seperator, + dataclass_type, + id_field + ) + return item_to_return # Domain escrow is an edgecase given that its structured differently data-wise. - def _read_domain_escrow(self, file, seperator): + def _read_domain_escrow(self, file_type, file, seperator): dict_data = {} with open(file, "r", encoding="utf-8-sig") as requested_file: reader = csv.reader(requested_file, delimiter=seperator) @@ -813,9 +895,13 @@ class ExtraTransitionDomain: dict_data[domain_name] = DomainEscrow( domain_name, creation_date, expiration_date ) + + # Given this row_id, create a default log object. + # So that we can track logs on it later. + self.set_log(file_type, domain_name, []) return dict_data - def _read_csv_file(self, file, seperator, dataclass_type, id_field): + def _read_csv_file(self, file_type, file, seperator, dataclass_type, id_field): with open(file, "r", encoding="utf-8-sig") as requested_file: reader = csv.DictReader(requested_file, delimiter=seperator) """ @@ -832,5 +918,27 @@ class ExtraTransitionDomain: continue row_id = row[id_field] dict_data[row_id] = dataclass_type(**row) + + # Given this row_id, create a default log object. + # So that we can track logs on it later. + self.set_log(file_type, row_id, []) # dict_data = {row[id_field]: dataclass_type(**row) for row in reader} return dict_data + + # Logging logic # + def get_logs_for_type(self, file_type): + """Returns all logs for the given file_type""" + return self.file_data.get(file_type).logs + + def get_log(self, file_type, item_id): + """Returns a log of a particular id""" + logs = self.get_logs_for_type(file_type) + return logs.get(item_id) + + def set_logs_for_type(self, file_type, logs): + """Sets all logs for a given file_type""" + self.file_data[file_type] = logs + + def set_log(self, file_type, item_id, log): + """Creates a single log item under a given file_type""" + self.file_data.get(file_type)[item_id] = log