diff --git a/docs/architecture/diagrams/model_timeline.md b/docs/architecture/diagrams/model_timeline.md index cc7dddc6c..f6e7bbb91 100644 --- a/docs/architecture/diagrams/model_timeline.md +++ b/docs/architecture/diagrams/model_timeline.md @@ -136,7 +136,7 @@ class DomainInvitation { -- } DomainInvitation -- Domain -DomainInvitation .[#green].> UserDomainRole : User.first_login() +DomainInvitation .[#green].> UserDomainRole : User.on_each_login() actor applicant #Red applicant -d-> DomainApplication : **/register** diff --git a/docs/architecture/diagrams/model_timeline.svg b/docs/architecture/diagrams/model_timeline.svg index 4e0400bb0..5410bf25f 100644 --- a/docs/architecture/diagrams/model_timeline.svg +++ b/docs/architecture/diagrams/model_timeline.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/docs/operations/data_migration.md b/docs/operations/data_migration.md index 3f97511f3..7290349ad 100644 --- a/docs/operations/data_migration.md +++ b/docs/operations/data_migration.md @@ -246,7 +246,7 @@ This will allow Docker to mount the files to a container (under `/app`) for our *You are now ready to run migration scripts.* ## Transition Domains (Part 2) - Running the Migration Scripts -While keeping the same ssh instance open (if you are running on a sandbox), run through the following commands. If you run into the error that. If you cannot run `manage.py` commands, try running `/tmp/lifecycle/shell` in the ssh instance. +While keeping the same ssh instance open (if you are running on a sandbox), run through the following commands.If you cannot run `manage.py` commands, try running `/tmp/lifecycle/shell` in the ssh instance. ### STEP 1: Load Transition Domains @@ -274,8 +274,6 @@ Directs the script to load only the first 100 entries into the table. You can a This will delete all the data in transtion_domain. It is helpful if you want to see the entries reload from scratch or for clearing test data. ###### (arguments that override filepaths and directories if needed) -`--infer_filenames` -Determines if we should infer filenames or not. Recommended to be enabled only in a development or testing setting.. `--directory` Defines the directory where all data files and the JSON are stored. @@ -307,7 +305,8 @@ Defines the filename for domain type adhocs. `--authority_adhoc_filename` Defines the filename for domain type adhocs. - +`--infer_filenames` +Determines if we should infer filenames or not. This setting is not available for use in environments with the flag `settings.DEBUG` set to false, as it is intended for local development only. ### STEP 2: Transfer Transition Domain data into main Domain tables diff --git a/src/.flake8 b/src/.flake8 index db9f3094a..b3fa0993f 100644 --- a/src/.flake8 +++ b/src/.flake8 @@ -1,5 +1,5 @@ [flake8] -max-line-length = 88 +max-line-length = 120 max-complexity = 10 extend-ignore = E203 per-file-ignores = __init__.py:F401,F403,E402 diff --git a/src/api/views.py b/src/api/views.py index 694bea349..2cb23a9b2 100644 --- a/src/api/views.py +++ b/src/api/views.py @@ -10,9 +10,7 @@ from login_required import login_not_required from cachetools.func import ttl_cache -DOMAIN_FILE_URL = ( - "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv" -) +DOMAIN_FILE_URL = "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv" DOMAIN_API_MESSAGES = { @@ -22,8 +20,7 @@ DOMAIN_API_MESSAGES = { "extra_dots": "Enter the .gov domain you want without any periods.", "unavailable": "That domain isn’t available. Try entering another one." " Contact us if you need help coming up with a domain.", - "invalid": "Enter a domain using only letters," - " numbers, or hyphens (though we don't recommend using hyphens).", + "invalid": "Enter a domain using only letters, numbers, or hyphens (though we don't recommend using hyphens).", "success": "That domain is available!", "error": "Error finding domain availability.", } @@ -82,24 +79,13 @@ def available(request, domain=""): DraftDomain = apps.get_model("registrar.DraftDomain") # validate that the given domain could be a domain name and fail early if # not. - if not ( - DraftDomain.string_could_be_domain(domain) - or DraftDomain.string_could_be_domain(domain + ".gov") - ): - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["invalid"]} - ) + if not (DraftDomain.string_could_be_domain(domain) or DraftDomain.string_could_be_domain(domain + ".gov")): + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["invalid"]}) # a domain is available if it is NOT in the list of current domains try: if check_domain_available(domain): - return JsonResponse( - {"available": True, "message": DOMAIN_API_MESSAGES["success"]} - ) + return JsonResponse({"available": True, "message": DOMAIN_API_MESSAGES["success"]}) else: - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["unavailable"]} - ) + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["unavailable"]}) except Exception: - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["error"]} - ) + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["error"]}) diff --git a/src/djangooidc/backends.py b/src/djangooidc/backends.py index 08ca47d2e..ce6b3acd3 100644 --- a/src/djangooidc/backends.py +++ b/src/djangooidc/backends.py @@ -49,13 +49,13 @@ class OpenIdConnectBackend(ModelBackend): user, created = UserModel.objects.update_or_create(**args) if created: user = self.configure_user(user, **kwargs) - # run a newly created user's callback for a first-time login - user.first_login() else: try: user = UserModel.objects.get_by_natural_key(username) except UserModel.DoesNotExist: return None + # run this callback for a each login + user.on_each_login() return user def clean_username(self, username): diff --git a/src/djangooidc/oidc.py b/src/djangooidc/oidc.py index 286e519a4..b2b1acd8e 100644 --- a/src/djangooidc/oidc.py +++ b/src/djangooidc/oidc.py @@ -72,9 +72,7 @@ class Client(oic.Client): try: # discover and store the provider (OP) urls, etc self.provider_config(provider["srv_discovery_url"]) - self.store_registration_info( - RegistrationResponse(**provider["client_registration"]) - ) + self.store_registration_info(RegistrationResponse(**provider["client_registration"])) except Exception as err: logger.error(err) logger.error( @@ -169,9 +167,7 @@ class Client(oic.Client): if isinstance(authn_response, ErrorResponse): error = authn_response.get("error", "") if error == "login_required": - logger.warning( - "User was not logged in (%s), trying again for %s" % (error, state) - ) + logger.warning("User was not logged in (%s), trying again for %s" % (error, state)) return self.create_authn_request(session) else: logger.error("Unable to process response %s for %s" % (error, state)) @@ -190,9 +186,7 @@ class Client(oic.Client): if self.behaviour.get("response_type") == "code": # need an access token to get user info (and to log the user out later) - self._request_token( - authn_response["state"], authn_response["code"], session - ) + self._request_token(authn_response["state"], authn_response["code"], session) user_info = self._get_user_info(state, session) @@ -216,10 +210,7 @@ class Client(oic.Client): # ErrorResponse is not raised, it is passed back... if isinstance(info_response, ErrorResponse): - logger.error( - "Unable to get user info (%s) for %s" - % (info_response.get("error", ""), state) - ) + logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state)) raise o_e.AuthenticationFailed(locator=state) logger.debug("user info: %s" % info_response) @@ -249,10 +240,7 @@ class Client(oic.Client): # ErrorResponse is not raised, it is passed back... if isinstance(token_response, ErrorResponse): - logger.error( - "Unable to get token (%s) for %s" - % (token_response.get("error", ""), state) - ) + logger.error("Unable to get token (%s) for %s" % (token_response.get("error", ""), state)) raise o_e.AuthenticationFailed(locator=state) logger.debug("token response %s" % token_response) diff --git a/src/djangooidc/tests/test_views.py b/src/djangooidc/tests/test_views.py index 30caf3713..5ff36a77c 100644 --- a/src/djangooidc/tests/test_views.py +++ b/src/djangooidc/tests/test_views.py @@ -85,12 +85,8 @@ class ViewsTest(TestCase): session.save() # mock mock_client.callback.side_effect = self.user_info - mock_client.registration_response = { - "post_logout_redirect_uris": ["http://example.com/back"] - } - mock_client.provider_info = { - "end_session_endpoint": "http://example.com/log_me_out" - } + mock_client.registration_response = {"post_logout_redirect_uris": ["http://example.com/back"]} + mock_client.provider_info = {"end_session_endpoint": "http://example.com/log_me_out"} mock_client.client_id = "TEST" # test with less_console_noise(): diff --git a/src/djangooidc/views.py b/src/djangooidc/views.py index 2db304509..ea893daf2 100644 --- a/src/djangooidc/views.py +++ b/src/djangooidc/views.py @@ -92,11 +92,7 @@ def logout(request, next_page=None): and len(CLIENT.registration_response["post_logout_redirect_uris"]) > 0 ): request_args.update( - { - "post_logout_redirect_uri": CLIENT.registration_response[ - "post_logout_redirect_uris" - ][0] - } + {"post_logout_redirect_uri": CLIENT.registration_response["post_logout_redirect_uris"][0]} ) url = CLIENT.provider_info["end_session_endpoint"] diff --git a/src/docker-compose.yml b/src/docker-compose.yml index a45ea2a51..90ce1acb0 100644 --- a/src/docker-compose.yml +++ b/src/docker-compose.yml @@ -29,6 +29,8 @@ services: - IS_PRODUCTION=False # Tell Django where it is being hosted - DJANGO_BASE_URL=http://localhost:8080 + # Is this a production environment + - IS_PRODUCTION # Public site URL link - GETGOV_PUBLIC_SITE_URL=https://beta.get.gov # Set a username for accessing the registry diff --git a/src/epplibwrapper/client.py b/src/epplibwrapper/client.py index e4b7a5d53..80ed278ff 100644 --- a/src/epplibwrapper/client.py +++ b/src/epplibwrapper/client.py @@ -95,9 +95,7 @@ class EPPLibWrapper: try: if not self.pool_status.connection_success: - raise LoginError( - "Couldn't connect to the registry after three attempts" - ) + raise LoginError("Couldn't connect to the registry after three attempts") with self._pool.get() as connection: response = connection.send(command) except Timeout as t: @@ -239,6 +237,4 @@ try: logger.info("registry client initialized") except Exception: CLIENT = None # type: ignore - logger.warning( - "Unable to configure epplib. Registrar cannot contact registry.", exc_info=True - ) + logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True) diff --git a/src/epplibwrapper/tests/test_pool.py b/src/epplibwrapper/tests/test_pool.py index 4e919ba76..1a5a2ccf7 100644 --- a/src/epplibwrapper/tests/test_pool.py +++ b/src/epplibwrapper/tests/test_pool.py @@ -103,9 +103,7 @@ class TestConnectionPool(TestCase): ], cl_id="gov2023-ote", cr_id="gov2023-ote", - cr_date=datetime.datetime( - 2023, 8, 15, 23, 56, 36, tzinfo=tzlocal() - ), + cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()), up_id="gov2023-ote", up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()), tr_date=None, @@ -129,9 +127,7 @@ class TestConnectionPool(TestCase): # Mock what happens inside the "with" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) stack.enter_context(patch.object(SocketTransport, "send", self.fake_send)) stack.enter_context(patch.object(SocketTransport, "receive", fake_receive)) @@ -176,9 +172,7 @@ class TestConnectionPool(TestCase): ], cl_id="gov2023-ote", cr_id="gov2023-ote", - cr_date=datetime.datetime( - 2023, 8, 15, 23, 56, 36, tzinfo=tzlocal() - ), + cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()), up_id="gov2023-ote", up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()), tr_date=None, @@ -202,9 +196,7 @@ class TestConnectionPool(TestCase): # Mock what happens inside the "with" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) stack.enter_context(patch.object(SocketTransport, "send", self.fake_send)) stack.enter_context(patch.object(SocketTransport, "receive", fake_receive)) @@ -218,9 +210,7 @@ class TestConnectionPool(TestCase): # that they cannot connect to EPP with self.assertRaises(RegistryError): expected = "InfoDomain failed to execute due to a connection error." - result = registry.send( - commands.InfoDomain(name="test.gov"), cleaned=True - ) + result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True) self.assertEqual(result, expected) # A subsequent command should be successful, as the pool restarts @@ -240,9 +230,7 @@ class TestConnectionPool(TestCase): right as we send a command.""" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) # Pool should be running @@ -252,7 +240,5 @@ class TestConnectionPool(TestCase): # Try to send a command out - should fail with self.assertRaises(RegistryError): expected = "InfoDomain failed to execute due to a connection error." - result = registry.send( - commands.InfoDomain(name="test.gov"), cleaned=True - ) + result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True) self.assertEqual(result, expected) diff --git a/src/epplibwrapper/utility/pool.py b/src/epplibwrapper/utility/pool.py index 36771252b..e28270f50 100644 --- a/src/epplibwrapper/utility/pool.py +++ b/src/epplibwrapper/utility/pool.py @@ -125,9 +125,7 @@ class EPPConnectionPool(ConnectionPool): # Open multiple connections for i in range(self.size): - self.greenlets.append( - gevent.spawn_later(self.spawn_frequency * i, self._addOne) - ) + self.greenlets.append(gevent.spawn_later(self.spawn_frequency * i, self._addOne)) # Open a "keepalive" thread if we want to ping open connections if self.keepalive: diff --git a/src/epplibwrapper/utility/pool_error.py b/src/epplibwrapper/utility/pool_error.py index 821962774..bdf955afe 100644 --- a/src/epplibwrapper/utility/pool_error.py +++ b/src/epplibwrapper/utility/pool_error.py @@ -28,14 +28,8 @@ class PoolError(Exception): # Used variables due to linter requirements kill_failed = "Could not kill all connections. Are multiple pools running?" - conn_failed = ( - "Failed to execute due to a registry error." - " See previous logs to determine the cause of the error." - ) - alive_failed = ( - "Failed to keep the connection alive. " - "It is likely that the registry returned a LoginError." - ) + conn_failed = "Failed to execute due to a registry error. See previous logs to determine the cause of the error." + alive_failed = "Failed to keep the connection alive. It is likely that the registry returned a LoginError." _error_mapping = { PoolErrorCodes.KILL_ALL_FAILED: kill_failed, PoolErrorCodes.NEW_CONNECTION_FAILED: conn_failed, diff --git a/src/pyproject.toml b/src/pyproject.toml index af7fc4ee7..ba2b76f83 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,5 +1,5 @@ [tool.black] -line-length=88 +line-length=120 [tool.mypy] ignore_missing_imports = true diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 8914e5c87..9de5f563c 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -73,9 +73,7 @@ class ListHeaderAdmin(AuditedAdmin): filters = self.get_filters(request) # Pass the filtered values to the template context extra_context["filters"] = filters - extra_context["search_query"] = request.GET.get( - "q", "" - ) # Assuming the search query parameter is 'q' + extra_context["search_query"] = request.GET.get("q", "") # Assuming the search query parameter is 'q' return super().changelist_view(request, extra_context=extra_context) def get_filters(self, request): @@ -91,11 +89,7 @@ class ListHeaderAdmin(AuditedAdmin): for param in request.GET.keys(): # Exclude the default search parameter 'q' if param != "q" and param != "o": - parameter_name = ( - param.replace("__exact", "") - .replace("_type", "") - .replace("__id", " id") - ) + parameter_name = param.replace("__exact", "").replace("_type", "").replace("__id", " id") if parameter_name == "investigator id": # Retrieves the corresponding contact from Users @@ -613,8 +607,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): messages.error( request, - "This action is not permitted. The domain " - + "is already active.", + "This action is not permitted. The domain is already active.", ) else: @@ -627,9 +620,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): models.DomainApplication.APPROVED: obj.approve, models.DomainApplication.WITHDRAWN: obj.withdraw, models.DomainApplication.REJECTED: obj.reject, - models.DomainApplication.INELIGIBLE: ( - obj.reject_with_prejudice - ), + models.DomainApplication.INELIGIBLE: (obj.reject_with_prejudice), } selected_method = status_method_mapping.get(obj.status) if selected_method is None: @@ -649,8 +640,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): messages.error( request, - "This action is not permitted for applications " - + "with a restricted creator.", + "This action is not permitted for applications with a restricted creator.", ) def get_readonly_fields(self, request, obj=None): @@ -669,9 +659,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): readonly_fields.extend([field.name for field in self.model._meta.fields]) # Add the multi-select fields to readonly_fields: # Complex fields like ManyToManyField require special handling - readonly_fields.extend( - ["current_websites", "other_contacts", "alternative_domains"] - ) + readonly_fields.extend(["current_websites", "other_contacts", "alternative_domains"]) if request.user.has_perm("registrar.full_access_permission"): return readonly_fields @@ -739,9 +727,7 @@ class DomainAdmin(ListHeaderAdmin): def organization_type(self, obj): return obj.domain_info.get_organization_type_display() - organization_type.admin_order_field = ( # type: ignore - "domain_info__organization_type" - ) + organization_type.admin_order_field = "domain_info__organization_type" # type: ignore # Filters list_filter = ["domain_info__organization_type", "state"] @@ -846,9 +832,7 @@ class DomainAdmin(ListHeaderAdmin): if not err.is_connection_error(): # If nothing is found, will default to returned err message = error_messages.get(err.code, err) - self.message_user( - request, f"Error deleting this Domain: {message}", messages.ERROR - ) + self.message_user(request, f"Error deleting this Domain: {message}", messages.ERROR) except TransitionNotAllowed: if obj.state == Domain.State.DELETED: self.message_user( @@ -886,8 +870,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - f"The registry statuses are {statuses}. " - "These statuses are from the provider of the .gov registry.", + f"The registry statuses are {statuses}. These statuses are from the provider of the .gov registry.", ) return HttpResponseRedirect(".") @@ -916,11 +899,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - ( - "%s is in client hold. This domain is no longer accessible on" - " the public internet." - ) - % obj.name, + ("%s is in client hold. This domain is no longer accessible on the public internet.") % obj.name, ) return HttpResponseRedirect(".") @@ -949,8 +928,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - ("%s is ready. This domain is accessible on the public internet.") - % obj.name, + ("%s is ready. This domain is accessible on the public internet.") % obj.name, ) return HttpResponseRedirect(".") @@ -973,9 +951,9 @@ class DomainAdmin(ListHeaderAdmin): # Fixes a bug wherein users which are only is_staff # can access 'change' when GET, # but cannot access this page when it is a request of type POST. - if request.user.has_perm( - "registrar.full_access_permission" - ) or request.user.has_perm("registrar.analyst_access_permission"): + if request.user.has_perm("registrar.full_access_permission") or request.user.has_perm( + "registrar.analyst_access_permission" + ): return True return super().has_change_permission(request, obj) diff --git a/src/registrar/assets/sass/_theme/_admin.scss b/src/registrar/assets/sass/_theme/_admin.scss index 080a343ee..56218c377 100644 --- a/src/registrar/assets/sass/_theme/_admin.scss +++ b/src/registrar/assets/sass/_theme/_admin.scss @@ -161,6 +161,26 @@ h1, h2, h3 { font-size: 14px; } +// right justify custom buttons and display as links +.submit-row input.custom-link-button, +.submit-row input.custom-link-button:hover { + background: none; + border: none; + color: var(--link-fg); + cursor: pointer; + text-decoration: none; + padding: 0; + font-size: inherit; + margin-left: auto; +} +.submit-row div.spacer { + flex-grow: 1; +} +.submit-row span { + margin-top: units(1); +} + +// Customize // Keep th from collapsing .min-width-25 { min-width: 25px; diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py index 7c21cfbb7..f85143ea8 100644 --- a/src/registrar/config/settings.py +++ b/src/registrar/config/settings.py @@ -212,6 +212,7 @@ TEMPLATES = [ "registrar.context_processors.language_code", "registrar.context_processors.canonical_path", "registrar.context_processors.is_demo_site", + "registrar.context_processors.is_production", ], }, }, @@ -528,9 +529,7 @@ OIDC_PROVIDERS = { "acr_value": "http://idmanagement.gov/ns/assurance/ial/2", }, "client_registration": { - "client_id": ( - "urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar" - ), + "client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"), "redirect_uris": [f"{env_base_url}/openid/callback/login/"], "post_logout_redirect_uris": [f"{env_base_url}/openid/callback/logout/"], "token_endpoint_auth_method": ["private_key_jwt"], diff --git a/src/registrar/context_processors.py b/src/registrar/context_processors.py index f778f1ab1..abab72dac 100644 --- a/src/registrar/context_processors.py +++ b/src/registrar/context_processors.py @@ -31,3 +31,8 @@ def is_demo_site(request): should not appear. """ return {"IS_DEMO_SITE": settings.IS_DEMO_SITE} + + +def is_production(request): + """Add a boolean if this is our production site.""" + return {"IS_PRODUCTION": settings.IS_PRODUCTION} diff --git a/src/registrar/fixtures_applications.py b/src/registrar/fixtures_applications.py index 18be79814..aea153aef 100644 --- a/src/registrar/fixtures_applications.py +++ b/src/registrar/fixtures_applications.py @@ -97,9 +97,7 @@ class DomainApplicationFixture: def _set_non_foreign_key_fields(cls, da: DomainApplication, app: dict): """Helper method used by `load`.""" da.status = app["status"] if "status" in app else "started" - da.organization_type = ( - app["organization_type"] if "organization_type" in app else "federal" - ) + da.organization_type = app["organization_type"] if "organization_type" in app else "federal" da.federal_agency = ( app["federal_agency"] if "federal_agency" in app @@ -112,40 +110,25 @@ class DomainApplicationFixture: if "federal_type" in app else random.choice(["executive", "judicial", "legislative"]) # nosec ) - da.address_line1 = ( - app["address_line1"] if "address_line1" in app else fake.street_address() - ) + da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address() da.address_line2 = app["address_line2"] if "address_line2" in app else None da.city = app["city"] if "city" in app else fake.city() - da.state_territory = ( - app["state_territory"] if "state_territory" in app else fake.state_abbr() - ) + da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr() da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode() da.urbanization = app["urbanization"] if "urbanization" in app else None da.purpose = app["purpose"] if "purpose" in app else fake.paragraph() da.anything_else = app["anything_else"] if "anything_else" in app else None - da.is_policy_acknowledged = ( - app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True - ) + da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True @classmethod def _set_foreign_key_fields(cls, da: DomainApplication, app: dict, user: User): """Helper method used by `load`.""" if not da.investigator: - da.investigator = ( - User.objects.get(username=user.username) - if "investigator" in app - else None - ) + da.investigator = User.objects.get(username=user.username) if "investigator" in app else None if not da.authorizing_official: - if ( - "authorizing_official" in app - and app["authorizing_official"] is not None - ): - da.authorizing_official, _ = Contact.objects.get_or_create( - **app["authorizing_official"] - ) + if "authorizing_official" in app and app["authorizing_official"] is not None: + da.authorizing_official, _ = Contact.objects.get_or_create(**app["authorizing_official"]) else: da.authorizing_official = Contact.objects.create(**cls.fake_contact()) @@ -157,13 +140,9 @@ class DomainApplicationFixture: if not da.requested_domain: if "requested_domain" in app and app["requested_domain"] is not None: - da.requested_domain, _ = DraftDomain.objects.get_or_create( - name=app["requested_domain"] - ) + da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"]) else: - da.requested_domain = DraftDomain.objects.create( - name=cls.fake_dot_gov() - ) + da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov()) @classmethod def _set_many_to_many_relations(cls, da: DomainApplication, app: dict): @@ -173,32 +152,25 @@ class DomainApplicationFixture: da.other_contacts.add(Contact.objects.get_or_create(**contact)[0]) elif not da.other_contacts.exists(): other_contacts = [ - Contact.objects.create(**cls.fake_contact()) - for _ in range(random.randint(0, 3)) # nosec + Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec ] da.other_contacts.add(*other_contacts) if "current_websites" in app: for website in app["current_websites"]: - da.current_websites.add( - Website.objects.get_or_create(website=website)[0] - ) + da.current_websites.add(Website.objects.get_or_create(website=website)[0]) elif not da.current_websites.exists(): current_websites = [ - Website.objects.create(website=fake.uri()) - for _ in range(random.randint(0, 3)) # nosec + Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec ] da.current_websites.add(*current_websites) if "alternative_domains" in app: for domain in app["alternative_domains"]: - da.alternative_domains.add( - Website.objects.get_or_create(website=domain)[0] - ) + da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0]) elif not da.alternative_domains.exists(): alternative_domains = [ - Website.objects.create(website=cls.fake_dot_gov()) - for _ in range(random.randint(0, 3)) # nosec + Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec ] da.alternative_domains.add(*alternative_domains) @@ -242,9 +214,7 @@ class DomainFixture(DomainApplicationFixture): for user in users: # approve one of each users in review status domains - application = DomainApplication.objects.filter( - creator=user, status=DomainApplication.IN_REVIEW - ).last() + application = DomainApplication.objects.filter(creator=user, status=DomainApplication.IN_REVIEW).last() logger.debug(f"Approving {application} for {user}") application.approve() application.save() diff --git a/src/registrar/forms/application_wizard.py b/src/registrar/forms/application_wizard.py index 2fd78cdd8..a70c23e52 100644 --- a/src/registrar/forms/application_wizard.py +++ b/src/registrar/forms/application_wizard.py @@ -50,9 +50,7 @@ class RegistrarForm(forms.Form): """Returns a dict of form field values gotten from `obj`.""" if obj is None: return {} - return { - name: getattr(obj, name) for name in cls.declared_fields.keys() - } # type: ignore + return {name: getattr(obj, name) for name in cls.declared_fields.keys()} # type: ignore class RegistrarFormSet(forms.BaseFormSet): @@ -178,10 +176,7 @@ class TribalGovernmentForm(RegistrarForm): def clean(self): """Needs to be either state or federally recognized.""" - if not ( - self.cleaned_data["federally_recognized_tribe"] - or self.cleaned_data["state_recognized_tribe"] - ): + if not (self.cleaned_data["federally_recognized_tribe"] or self.cleaned_data["state_recognized_tribe"]): raise forms.ValidationError( # no sec because we are using it to include an internal URL # into a link. There should be no user-facing input in the @@ -203,11 +198,7 @@ class OrganizationFederalForm(RegistrarForm): federal_type = forms.ChoiceField( choices=DomainApplication.BranchChoices.choices, widget=forms.RadioSelect, - error_messages={ - "required": ( - "Select the part of the federal government your organization is in." - ) - }, + error_messages={"required": ("Select the part of the federal government your organization is in.")}, ) @@ -227,10 +218,7 @@ class OrganizationElectionForm(RegistrarForm): is_election_board = self.cleaned_data["is_election_board"] if is_election_board is None: raise forms.ValidationError( - ( - "Select “Yes” if you represent an election office. Select “No” if" - " you don’t." - ), + ("Select “Yes” if you represent an election office. Select “No” if you don’t."), code="required", ) return is_election_board @@ -260,18 +248,13 @@ class OrganizationContactForm(RegistrarForm): ) city = forms.CharField( label="City", - error_messages={ - "required": "Enter the city where your organization is located." - }, + error_messages={"required": "Enter the city where your organization is located."}, ) state_territory = forms.ChoiceField( label="State, territory, or military post", choices=[("", "--Select--")] + DomainApplication.StateTerritoryChoices.choices, error_messages={ - "required": ( - "Select the state, territory, or military post where your organization" - " is located." - ) + "required": ("Select the state, territory, or military post where your organization is located.") }, ) zipcode = forms.CharField( @@ -320,9 +303,7 @@ class AboutYourOrganizationForm(RegistrarForm): message="Response must be less than 1000 characters.", ) ], - error_messages={ - "required": ("Enter more information about your organization.") - }, + error_messages={"required": ("Enter more information about your organization.")}, ) @@ -346,11 +327,7 @@ class AuthorizingOfficialForm(RegistrarForm): first_name = forms.CharField( label="First name / given name", - error_messages={ - "required": ( - "Enter the first name / given name of your authorizing official." - ) - }, + error_messages={"required": ("Enter the first name / given name of your authorizing official.")}, ) middle_name = forms.CharField( required=False, @@ -358,11 +335,7 @@ class AuthorizingOfficialForm(RegistrarForm): ) last_name = forms.CharField( label="Last name / family name", - error_messages={ - "required": ( - "Enter the last name / family name of your authorizing official." - ) - }, + error_messages={"required": ("Enter the last name / family name of your authorizing official.")}, ) title = forms.CharField( label="Title or role in your organization", @@ -375,17 +348,11 @@ class AuthorizingOfficialForm(RegistrarForm): ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter an email address in the required format, like name@example.com." - ) - }, + error_messages={"invalid": ("Enter an email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", - error_messages={ - "required": "Enter the phone number for your authorizing official." - }, + error_messages={"required": "Enter the phone number for your authorizing official."}, ) @@ -394,10 +361,7 @@ class CurrentSitesForm(RegistrarForm): required=False, label="Public website", error_messages={ - "invalid": ( - "Enter your organization's current website in the required format, like" - " www.city.com." - ) + "invalid": ("Enter your organization's current website in the required format, like www.city.com.") }, ) @@ -410,9 +374,7 @@ class BaseCurrentSitesFormSet(RegistrarFormSet): return website.strip() == "" def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def from_database(cls, obj): @@ -434,13 +396,9 @@ class AlternativeDomainForm(RegistrarForm): requested = self.cleaned_data.get("alternative_domain", None) validated = DraftDomain.validate(requested, blank_ok=True) except errors.ExtraDotsError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots") except errors.DomainUnavailableError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["unavailable"], code="unavailable" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["unavailable"], code="unavailable") except ValueError: raise forms.ValidationError(DOMAIN_API_MESSAGES["invalid"], code="invalid") return validated @@ -471,9 +429,7 @@ class BaseAlternativeDomainFormSet(RegistrarFormSet): return {} def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def on_fetch(cls, query): @@ -523,17 +479,11 @@ class DotGovDomainForm(RegistrarForm): requested = self.cleaned_data.get("requested_domain", None) validated = DraftDomain.validate(requested) except errors.BlankValueError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["required"], code="required" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["required"], code="required") except errors.ExtraDotsError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots") except errors.DomainUnavailableError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["unavailable"], code="unavailable" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["unavailable"], code="unavailable") except ValueError: raise forms.ValidationError(DOMAIN_API_MESSAGES["invalid"], code="invalid") return validated @@ -551,9 +501,7 @@ class PurposeForm(RegistrarForm): message="Response must be less than 1000 characters.", ) ], - error_messages={ - "required": "Describe how you'll use the .gov domain you’re requesting." - }, + error_messages={"required": "Describe how you'll use the .gov domain you’re requesting."}, ) @@ -590,20 +538,12 @@ class YourContactForm(RegistrarForm): title = forms.CharField( label="Title or role in your organization", error_messages={ - "required": ( - "Enter your title or role in your organization (e.g., Chief Information" - " Officer)." - ) + "required": ("Enter your title or role in your organization (e.g., Chief Information Officer).") }, ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter your email address in the required format, like" - " name@example.com." - ) - }, + error_messages={"invalid": ("Enter your email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", @@ -614,9 +554,7 @@ class YourContactForm(RegistrarForm): class OtherContactsForm(RegistrarForm): first_name = forms.CharField( label="First name / given name", - error_messages={ - "required": "Enter the first name / given name of this contact." - }, + error_messages={"required": "Enter the first name / given name of this contact."}, ) middle_name = forms.CharField( required=False, @@ -624,26 +562,19 @@ class OtherContactsForm(RegistrarForm): ) last_name = forms.CharField( label="Last name / family name", - error_messages={ - "required": "Enter the last name / family name of this contact." - }, + error_messages={"required": "Enter the last name / family name of this contact."}, ) title = forms.CharField( label="Title or role in your organization", error_messages={ "required": ( - "Enter the title or role in your organization of this contact (e.g.," - " Chief Information Officer)." + "Enter the title or role in your organization of this contact (e.g., Chief Information Officer)." ) }, ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter an email address in the required format, like name@example.com." - ) - }, + error_messages={"invalid": ("Enter an email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", @@ -659,9 +590,7 @@ class BaseOtherContactsFormSet(RegistrarFormSet): return all(empty) def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def from_database(cls, obj): @@ -706,9 +635,6 @@ class RequirementsForm(RegistrarForm): is_policy_acknowledged = forms.BooleanField( label="I read and agree to the requirements for operating .gov domains.", error_messages={ - "required": ( - "Check the box if you read and agree to the requirements for" - " operating .gov domains." - ) + "required": ("Check the box if you read and agree to the requirements for operating .gov domains.") }, ) diff --git a/src/registrar/forms/domain.py b/src/registrar/forms/domain.py index 2c95c3e74..6cb5b338f 100644 --- a/src/registrar/forms/domain.py +++ b/src/registrar/forms/domain.py @@ -95,23 +95,17 @@ class DomainNameserverForm(forms.Form): elif e.code == nsErrorCodes.MISSING_IP: self.add_error( "ip", - NameserverError( - code=nsErrorCodes.MISSING_IP, nameserver=domain, ip=ip_list - ), + NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=domain, ip=ip_list), ) elif e.code == nsErrorCodes.MISSING_HOST: self.add_error( "server", - NameserverError( - code=nsErrorCodes.MISSING_HOST, nameserver=domain, ip=ip_list - ), + NameserverError(code=nsErrorCodes.MISSING_HOST, nameserver=domain, ip=ip_list), ) elif e.code == nsErrorCodes.INVALID_HOST: self.add_error( "server", - NameserverError( - code=nsErrorCodes.INVALID_HOST, nameserver=server, ip=ip_list - ), + NameserverError(code=nsErrorCodes.INVALID_HOST, nameserver=server, ip=ip_list), ) else: self.add_error("ip", str(e)) @@ -187,17 +181,12 @@ class DomainOrgNameAddressForm(forms.ModelForm): "urbanization", ] error_messages = { - "federal_agency": { - "required": "Select the federal agency for your organization." - }, + "federal_agency": {"required": "Select the federal agency for your organization."}, "organization_name": {"required": "Enter the name of your organization."}, - "address_line1": { - "required": "Enter the street address of your organization." - }, + "address_line1": {"required": "Enter the street address of your organization."}, "city": {"required": "Enter the city where your organization is located."}, "state_territory": { - "required": "Select the state, territory, or military post where your" - "organization is located." + "required": "Select the state, territory, or military post where your organization is located." }, } widgets = { @@ -205,9 +194,7 @@ class DomainOrgNameAddressForm(forms.ModelForm): # state/territory because for these fields we are creating an individual # instance of the Select. For the other fields we use the for loop to set # the class's required attribute to true. - "federal_agency": forms.Select( - attrs={"required": True}, choices=DomainInformation.AGENCY_CHOICES - ), + "federal_agency": forms.Select(attrs={"required": True}, choices=DomainInformation.AGENCY_CHOICES), "organization_name": forms.TextInput, "address_line1": forms.TextInput, "address_line2": forms.TextInput, diff --git a/src/registrar/management/commands/cat_files_into_getgov.py b/src/registrar/management/commands/cat_files_into_getgov.py index 993643ee4..4ccb1301b 100644 --- a/src/registrar/management/commands/cat_files_into_getgov.py +++ b/src/registrar/management/commands/cat_files_into_getgov.py @@ -23,9 +23,7 @@ class Command(BaseCommand): default="txt", help="What file extensions to look for, like txt or gz", ) - parser.add_argument( - "--directory", default="migrationdata", help="Desired directory" - ) + parser.add_argument("--directory", default="migrationdata", help="Desired directory") def handle(self, **options): file_extension: str = options.get("file_extension").lstrip(".") diff --git a/src/registrar/management/commands/lint.py b/src/registrar/management/commands/lint.py index 3d0ec0225..f867f5b6f 100644 --- a/src/registrar/management/commands/lint.py +++ b/src/registrar/management/commands/lint.py @@ -52,20 +52,15 @@ class Command(BaseCommand): if result.returncode: self.stderr.write( self.style.NOTICE( - "[manage.py lint] Re-try with: [docker-compose exec app] " - f"{' '.join(linter['args'])}" + "[manage.py lint] Re-try with: [docker-compose exec app] " f"{' '.join(linter['args'])}" ) ) errors.append(CalledProcessError(result.returncode, linter["args"])) else: - self.stdout.write( - f"[manage.py lint] {linter['purpose']} completed with success!" - ) + self.stdout.write(f"[manage.py lint] {linter['purpose']} completed with success!") if errors: self.stdout.write(f"[manage.py lint] {len(errors)} linter(s) failed.") raise LinterError(errors) except (CalledProcessError, LinterError) as e: raise CommandError(e) - self.stdout.write( - self.style.SUCCESS("[manage.py lint] All linters ran successfully.") - ) + self.stdout.write(self.style.SUCCESS("[manage.py lint] All linters ran successfully.")) diff --git a/src/registrar/management/commands/load_domain_invitations.py b/src/registrar/management/commands/load_domain_invitations.py index 83b7163d1..28eb09def 100644 --- a/src/registrar/management/commands/load_domain_invitations.py +++ b/src/registrar/management/commands/load_domain_invitations.py @@ -21,9 +21,7 @@ class Command(BaseCommand): "domain_contacts_filename", help="Data file with domain contact information", ) - parser.add_argument( - "contacts_filename", help="Data file with contact information" - ) + parser.add_argument("contacts_filename", help="Data file with contact information") parser.add_argument("--sep", default="|", help="Delimiter character") diff --git a/src/registrar/management/commands/load_domains_data.py b/src/registrar/management/commands/load_domains_data.py index b205e562e..fd6a560bd 100644 --- a/src/registrar/management/commands/load_domains_data.py +++ b/src/registrar/management/commands/load_domains_data.py @@ -43,9 +43,7 @@ class Command(BaseCommand): help = "Load domain data from a delimited text file on stdin." def add_arguments(self, parser): - parser.add_argument( - "--sep", default="|", help="Separator character for data file" - ) + parser.add_argument("--sep", default="|", help="Separator character for data file") def handle(self, *args, **options): separator_character = options.get("sep") diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 4b48298ae..6566a2f16 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -1,10 +1,12 @@ import json +import os import sys import csv import logging import argparse from collections import defaultdict +from django.conf import settings from django.core.management import BaseCommand from registrar.management.commands.utility.epp_data_containers import EnumFilenames @@ -43,19 +45,14 @@ class Command(BaseCommand): """ parser.add_argument( "migration_json_filename", - help=( - "A JSON file that holds the location and filenames" - "of all the data files used for migrations" - ), + help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"), ) parser.add_argument("--sep", default="|", help="Delimiter character") parser.add_argument("--debug", action=argparse.BooleanOptionalAction) - parser.add_argument( - "--limitParse", default=0, help="Sets max number of entries to load" - ) + parser.add_argument("--limitParse", default=0, help="Sets max number of entries to load") parser.add_argument( "--resetTable", @@ -63,16 +60,17 @@ class Command(BaseCommand): action=argparse.BooleanOptionalAction, ) - parser.add_argument( - "--infer_filenames", - action=argparse.BooleanOptionalAction, - help="Determines if we should infer filenames or not." - "Recommended to be enabled only in a development or testing setting.", - ) + # This option should only be available when developing locally. + # This should not be available to the end user. + if settings.DEBUG: + parser.add_argument( + "--infer_filenames", + action=argparse.BooleanOptionalAction, + help="Determines if we should infer filenames or not." + "Recommended to be enabled only in a development or testing setting.", + ) - parser.add_argument( - "--directory", default="migrationdata", help="Desired directory" - ) + parser.add_argument("--directory", default="migrationdata", help="Desired directory") parser.add_argument( "--domain_contacts_filename", help="Data file with domain contact information", @@ -116,9 +114,7 @@ class Command(BaseCommand): help="Defines the filename for domain type adhocs", ) - def print_debug_mode_statements( - self, debug_on: bool, debug_max_entries_to_parse: int - ): + def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int): """Prints additional terminal statements to indicate if --debug or --limitParse are in use""" if debug_on: @@ -140,9 +136,7 @@ class Command(BaseCommand): """ ) - def get_domain_user_dict( - self, domain_statuses_filename: str, sep: str - ) -> defaultdict[str, str]: + def get_domain_user_dict(self, domain_statuses_filename: str, sep: str) -> defaultdict[str, str]: """Creates a mapping of domain name -> status""" domain_status_dictionary = defaultdict(str) logger.info("Reading domain statuses data file %s", domain_statuses_filename) @@ -154,9 +148,7 @@ class Command(BaseCommand): logger.info("Loaded statuses for %d domains", len(domain_status_dictionary)) return domain_status_dictionary - def get_user_emails_dict( - self, contacts_filename: str, sep - ) -> defaultdict[str, str]: + def get_user_emails_dict(self, contacts_filename: str, sep) -> defaultdict[str, str]: """Creates mapping of userId -> emails""" user_emails_dictionary = defaultdict(str) logger.info("Reading contacts data file %s", contacts_filename) @@ -205,19 +197,13 @@ class Command(BaseCommand): total_duplicate_domains = len(duplicate_domains) total_users_without_email = len(users_without_email) if total_users_without_email > 0: - users_without_email_as_string = "{}".format( - ", ".join(map(str, duplicate_domain_user_combos)) - ) + users_without_email_as_string = "{}".format(", ".join(map(str, duplicate_domain_user_combos))) logger.warning( f"{TerminalColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa ) if total_duplicate_pairs > 0 or total_duplicate_domains > 0: - duplicate_pairs_as_string = "{}".format( - ", ".join(map(str, duplicate_domain_user_combos)) - ) - duplicate_domains_as_string = "{}".format( - ", ".join(map(str, duplicate_domains)) - ) + duplicate_pairs_as_string = "{}".format(", ".join(map(str, duplicate_domain_user_combos))) + duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) logger.warning( f"""{TerminalColors.YELLOW} @@ -235,9 +221,7 @@ class Command(BaseCommand): {TerminalColors.ENDC}""" ) - def print_summary_status_findings( - self, domains_without_status: list[str], outlier_statuses: list[str] - ): + def print_summary_status_findings(self, domains_without_status: list[str], outlier_statuses: list[str]): """Called at the end of the script execution to print out a summary of status anomolies in the imported Verisign data. Currently, we check for: - domains without a status @@ -247,9 +231,7 @@ class Command(BaseCommand): total_domains_without_status = len(domains_without_status) total_outlier_statuses = len(outlier_statuses) if total_domains_without_status > 0: - domains_without_status_as_string = "{}".format( - ", ".join(map(str, domains_without_status)) - ) + domains_without_status_as_string = "{}".format(", ".join(map(str, domains_without_status))) logger.warning( f"""{TerminalColors.YELLOW} @@ -263,9 +245,7 @@ class Command(BaseCommand): ) if total_outlier_statuses > 0: - domains_without_status_as_string = "{}".format( - ", ".join(map(str, outlier_statuses)) - ) # noqa + domains_without_status_as_string = "{}".format(", ".join(map(str, outlier_statuses))) # noqa logger.warning( f"""{TerminalColors.YELLOW} @@ -322,6 +302,9 @@ class Command(BaseCommand): **options, ): """Parse the data files and create TransitionDomains.""" + if not settings.DEBUG: + options["infer_filenames"] = False + args = TransitionDomainArguments(**options) # Desired directory for additional TransitionDomain data @@ -366,42 +349,31 @@ class Command(BaseCommand): debug_on = args.debug # Get --LimitParse argument - debug_max_entries_to_parse = int( - args.limitParse - ) # set to 0 to parse all entries + debug_max_entries_to_parse = int(args.limitParse) # set to 0 to parse all entries # Variables for Additional TransitionDomain Information # # Main script filenames - these do not have defaults domain_contacts_filename = None try: - domain_contacts_filename = directory + options.get( - "domain_contacts_filename" - ) + domain_contacts_filename = directory + options.get("domain_contacts_filename") except TypeError: logger.error( - f"Invalid filename of '{args.domain_contacts_filename}'" - " was provided for domain_contacts_filename" + f"Invalid filename of '{args.domain_contacts_filename}'" " was provided for domain_contacts_filename" ) contacts_filename = None try: contacts_filename = directory + options.get("contacts_filename") except TypeError: - logger.error( - f"Invalid filename of '{args.contacts_filename}'" - " was provided for contacts_filename" - ) + logger.error(f"Invalid filename of '{args.contacts_filename}'" " was provided for contacts_filename") domain_statuses_filename = None try: - domain_statuses_filename = directory + options.get( - "domain_statuses_filename" - ) + domain_statuses_filename = directory + options.get("domain_statuses_filename") except TypeError: logger.error( - f"Invalid filename of '{args.domain_statuses_filename}'" - " was provided for domain_statuses_filename" + f"Invalid filename of '{args.domain_statuses_filename}'" " was provided for domain_statuses_filename" ) # Agency information @@ -419,11 +391,25 @@ class Command(BaseCommand): # print message to terminal about which args are in use self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) + filenames = [ + agency_adhoc_filename, + domain_adhoc_filename, + organization_adhoc_filename, + domain_escrow_filename, + domain_additional_filename, + ] + + # Do a top-level check to see if these files exist + for filename in filenames: + if not isinstance(filename, str): + raise TypeError(f"Filename must be a string, got {type(filename).__name__}") + full_path = os.path.join(directory, filename) + if not os.path.isfile(full_path): + raise FileNotFoundError(full_path) + # STEP 1: # Create mapping of domain name -> status - domain_status_dictionary = self.get_domain_user_dict( - domain_statuses_filename, sep - ) + domain_status_dictionary = self.get_domain_user_dict(domain_statuses_filename, sep) # STEP 2: # Create mapping of userId -> email @@ -518,12 +504,7 @@ class Command(BaseCommand): None, ) existing_domain_user_pair = next( - ( - x - for x in to_create - if x.username == new_entry_email - and x.domain_name == new_entry_domain_name - ), + (x for x in to_create if x.username == new_entry_email and x.domain_name == new_entry_domain_name), None, ) if existing_domain is not None: @@ -594,10 +575,7 @@ class Command(BaseCommand): ) # Check Parse limit and exit loop if needed - if ( - total_rows_parsed >= debug_max_entries_to_parse - and debug_max_entries_to_parse != 0 - ): + if total_rows_parsed >= debug_max_entries_to_parse and debug_max_entries_to_parse != 0: logger.info( f"{TerminalColors.YELLOW}" f"----PARSE LIMIT REACHED. HALTING PARSER.----" @@ -606,12 +584,35 @@ class Command(BaseCommand): break TransitionDomain.objects.bulk_create(to_create) + # Print a summary of findings (duplicate entries, + # missing data..etc.) + self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email) + self.print_summary_status_findings(domains_without_status, outlier_statuses) + + logger.info( + f"""{TerminalColors.OKGREEN} + ============= FINISHED =============== + Created {total_new_entries} transition domain entries, + Updated {total_updated_domain_entries} transition domain entries + + {TerminalColors.YELLOW} + ----- DUPLICATES FOUND ----- + {len(duplicate_domain_user_combos)} DOMAIN - USER pairs + were NOT unique in the supplied data files. + {len(duplicate_domains)} DOMAINS were NOT unique in + the supplied data files. + + ----- STATUSES ----- + {len(domains_without_status)} DOMAINS had NO status (defaulted to READY). + {len(outlier_statuses)} Statuses were invalid (defaulted to READY). + + {TerminalColors.ENDC} + """ + ) # Print a summary of findings (duplicate entries, # missing data..etc.) - self.print_summary_duplications( - duplicate_domain_user_combos, duplicate_domains, users_without_email - ) + self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email) self.print_summary_status_findings(domains_without_status, outlier_statuses) logger.info( diff --git a/src/registrar/management/commands/master_domain_migrations.py b/src/registrar/management/commands/master_domain_migrations.py index 6656458d7..9cb469078 100644 --- a/src/registrar/management/commands/master_domain_migrations.py +++ b/src/registrar/management/commands/master_domain_migrations.py @@ -94,33 +94,23 @@ class Command(BaseCommand): parser.add_argument( "--migrationJSON", default="migrationFilepaths.json", - help=( - "A JSON file that holds the location and filenames" - "of all the data files used for migrations" - ), + help=("A JSON file that holds the location and filenames" "of all the data files used for migrations"), ) # TODO: deprecate this once JSON module is done? (or keep as an override) parser.add_argument( "--migrationDirectory", default="migrationdata", - help=( - "The location of the files used for" - "load_transition_domain migration script" - ), + help=("The location of the files used for load_transition_domain migration script"), ) - parser.add_argument( - "--sep", default="|", help="Delimiter character for the migration files" - ) + parser.add_argument("--sep", default="|", help="Delimiter character for the migration files") parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--disablePrompts", action=argparse.BooleanOptionalAction) - parser.add_argument( - "--limitParse", default=0, help="Sets max number of entries to load" - ) + parser.add_argument("--limitParse", default=0, help="Sets max number of entries to load") parser.add_argument( "--resetTable", @@ -171,9 +161,7 @@ class Command(BaseCommand): # Check Domain table matching_domains = Domain.objects.filter(name=transition_domain_name) # Check Domain Information table - matching_domain_informations = DomainInformation.objects.filter( - domain__name=transition_domain_name - ) + matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) # Check Domain Invitation table matching_domain_invitations = DomainInvitation.objects.filter( email=transition_domain_email.lower(), @@ -213,15 +201,9 @@ class Command(BaseCommand): total_missing_domain_invitations = len(missing_domain_invites) missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) - duplicate_domains_as_string = "{}".format( - ", ".join(map(str, duplicate_domains)) - ) - missing_domain_informations_as_string = "{}".format( - ", ".join(map(str, missing_domain_informations)) - ) - missing_domain_invites_as_string = "{}".format( - ", ".join(map(str, missing_domain_invites)) - ) + duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) + missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) + missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) logger.info( f"""{TerminalColors.OKGREEN} diff --git a/src/registrar/management/commands/send_domain_invitations.py b/src/registrar/management/commands/send_domain_invitations.py index f6abb3cf0..68e805be9 100644 --- a/src/registrar/management/commands/send_domain_invitations.py +++ b/src/registrar/management/commands/send_domain_invitations.py @@ -37,14 +37,24 @@ class Command(BaseCommand): help="Send emails ", ) + parser.add_argument("emails", nargs="*", help="Email addresses to send invitations to") + def handle(self, **options): """Process the objects in TransitionDomain.""" logger.info("checking domains and preparing emails") - # Get all TransitionDomain objects - self.transition_domains = TransitionDomain.objects.filter( - email_sent=False, - ).order_by("username") + + if options["emails"]: + # this option is a list of email addresses + self.transition_domains = TransitionDomain.objects.filter( + username__in=options["emails"], + email_sent=False, + ).order_by("username") + else: + # Get all TransitionDomain objects + self.transition_domains = TransitionDomain.objects.filter( + email_sent=False, + ).order_by("username") logger.info("Found %d transition domains", len(self.transition_domains)) self.build_emails_to_send_array() @@ -83,10 +93,7 @@ class Command(BaseCommand): # domains_with_errors try: # if prior username does not match current username - if ( - not email_context["email"] - or email_context["email"] != transition_domain.username - ): + if not email_context["email"] or email_context["email"] != transition_domain.username: # if not first in list of transition_domains if email_context["email"]: # append the email context to the emails_to_send array @@ -96,12 +103,8 @@ class Command(BaseCommand): email_context["domains"].append(transition_domain.domain_name) except Exception as err: # error condition if domain not in database - self.domains_with_errors.append( - copy.deepcopy(transition_domain.domain_name) - ) - logger.error( - f"error retrieving domain {transition_domain.domain_name}: {err}" - ) + self.domains_with_errors.append(copy.deepcopy(transition_domain.domain_name)) + logger.error(f"error retrieving domain {transition_domain.domain_name}: {err}") # if there are at least one more transition domains than errors, # then append one more item if len(self.transition_domains) > len(self.domains_with_errors): diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py index 277e28687..58b36c1e5 100644 --- a/src/registrar/management/commands/transfer_transition_domains_to_domains.py +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -43,9 +43,7 @@ class Command(BaseCommand): # ====================================================== # ===================== PRINTING ====================== # ====================================================== - def print_debug_mode_statements( - self, debug_on: bool, debug_max_entries_to_parse: int - ): + def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int): """Prints additional terminal statements to indicate if --debug or --limitParse are in use""" TerminalHelper.print_conditional( @@ -67,13 +65,8 @@ class Command(BaseCommand): """, ) - def parse_limit_reached( - self, debug_max_entries_to_parse: bool, total_rows_parsed: int - ) -> bool: - if ( - debug_max_entries_to_parse > 0 - and total_rows_parsed >= debug_max_entries_to_parse - ): + def parse_limit_reached(self, debug_max_entries_to_parse: bool, total_rows_parsed: int) -> bool: + if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse: logger.info( f"""{TerminalColors.YELLOW} ----PARSE LIMIT REACHED. HALTING PARSER.---- @@ -160,9 +153,7 @@ class Command(BaseCommand): # ====================================================== # =================== DOMAIN ===================== # ====================================================== - def update_or_create_domain( - self, transition_domain: TransitionDomain, debug_on: bool - ): + def update_or_create_domain(self, transition_domain: TransitionDomain, debug_on: bool): """Given a transition domain, either finds & updates an existing corresponding domain, or creates a new corresponding domain in the Domain table. @@ -261,9 +252,7 @@ class Command(BaseCommand): ) return (target_domain, True) - def update_domain_status( - self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool - ) -> bool: + def update_domain_status(self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool) -> bool: """Given a transition domain that matches an existing domain, updates the existing domain object with that status of the transition domain. @@ -294,9 +283,7 @@ class Command(BaseCommand): # ====================================================== # ================ DOMAIN INVITATION ================== # ====================================================== - def try_add_domain_invitation( - self, domain_email: str, associated_domain: Domain - ) -> DomainInvitation | None: + def try_add_domain_invitation(self, domain_email: str, associated_domain: Domain) -> DomainInvitation | None: """If no domain invitation exists for the given domain and e-mail, create and return a new domain invitation object. If one already exists, or if the email is invalid, return NONE""" @@ -328,26 +315,18 @@ class Command(BaseCommand): ).exists() if not domain_email_already_in_domain_invites: # Create new domain invitation - new_domain_invitation = DomainInvitation( - email=domain_email.lower(), domain=associated_domain - ) + new_domain_invitation = DomainInvitation(email=domain_email.lower(), domain=associated_domain) return new_domain_invitation return None # ====================================================== # ================ DOMAIN INFORMATION ================= # ====================================================== - def update_domain_information( - self, current: DomainInformation, target: DomainInformation, debug_on: bool - ) -> bool: + def update_domain_information(self, current: DomainInformation, target: DomainInformation, debug_on: bool) -> bool: # DEBUG: TerminalHelper.print_conditional( debug_on, - ( - f"{TerminalColors.OKCYAN}" - f"Updating: {current}" - f"{TerminalColors.ENDC}" - ), # noqa + (f"{TerminalColors.OKCYAN}" f"Updating: {current}" f"{TerminalColors.ENDC}"), # noqa ) updated = False @@ -496,15 +475,11 @@ class Command(BaseCommand): debug_on, ) target_domain_information = None - domain_information_exists = DomainInformation.objects.filter( - domain__name=transition_domain_name - ).exists() + domain_information_exists = DomainInformation.objects.filter(domain__name=transition_domain_name).exists() if domain_information_exists: try: # get the existing domain information object - target_domain_information = DomainInformation.objects.get( - domain__name=transition_domain_name - ) + target_domain_information = DomainInformation.objects.get(domain__name=transition_domain_name) # DEBUG: TerminalHelper.print_conditional( debug_on, @@ -518,9 +493,7 @@ class Command(BaseCommand): # for existing entry, update the status to # the transition domain status - self.update_domain_information( - target_domain_information, template_domain_information, debug_on - ) + self.update_domain_information(target_domain_information, template_domain_information, debug_on) # TODO: not all domains need to be updated # (the information is the same). # Need to bubble this up to the final report. @@ -590,9 +563,7 @@ class Command(BaseCommand): if target_domain_information is None: # ---------------- SKIPPED ---------------- skipped_domain_information_entries.append(target_domain_information) - debug_string = ( - f"skipped domain information: {target_domain_information}" - ) + debug_string = f"skipped domain information: {target_domain_information}" elif was_created: # DEBUG: TerminalHelper.print_conditional( @@ -607,11 +578,7 @@ class Command(BaseCommand): # The unique key constraint does not allow multiple domain # information objects to share the same domain existing_domain_information_in_to_create = next( - ( - x - for x in domain_information_to_create - if x.domain.name == target_domain_information.domain.name - ), + (x for x in domain_information_to_create if x.domain.name == target_domain_information.domain.name), None, ) # TODO: this is redundant. @@ -620,10 +587,7 @@ class Command(BaseCommand): existing_domain_info = DomainInformation.objects.filter( domain__name=target_domain_information.domain.name ).exists() - if ( - existing_domain_information_in_to_create is not None - or existing_domain_info - ): + if existing_domain_information_in_to_create is not None or existing_domain_info: debug_string = f"""{TerminalColors.YELLOW} Duplicate Detected: {existing_domain_information_in_to_create}. Cannot add duplicate Domain Information object @@ -631,15 +595,11 @@ class Command(BaseCommand): else: # ---------------- CREATED ---------------- domain_information_to_create.append(target_domain_information) - debug_string = ( - f"created domain information: {target_domain_information}" - ) + debug_string = f"created domain information: {target_domain_information}" elif not was_created: # ---------------- UPDATED ---------------- updated_domain_information.append(target_domain_information) - debug_string = ( - f"updated domain information: {target_domain_information}" - ) + debug_string = f"updated domain information: {target_domain_information}" else: debug_string = "domain information already exists and " f"matches incoming data (NO CHANGES MADE): {target_domain_information}" @@ -694,9 +654,7 @@ class Command(BaseCommand): # ====================================================== # ====================== DOMAIN ======================= - target_domain, was_created = self.update_or_create_domain( - transition_domain, debug_on - ) + target_domain, was_created = self.update_or_create_domain(transition_domain, debug_on) debug_string = "" if target_domain is None: @@ -734,9 +692,7 @@ class Command(BaseCommand): # ====================================================== # ================ DOMAIN INVITATIONS ================== - new_domain_invitation = self.try_add_domain_invitation( - transition_domain_email, target_domain - ) + new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, target_domain) if new_domain_invitation is None: logger.info( f"{TerminalColors.YELLOW} ! No new e-mail detected !" # noqa @@ -774,9 +730,7 @@ class Command(BaseCommand): # grab command line arguments and store locally... debug_on = options.get("debug") - debug_max_entries_to_parse = int( - options.get("limitParse") - ) # set to 0 to parse all entries + debug_max_entries_to_parse = int(options.get("limitParse")) # set to 0 to parse all entries self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) @@ -844,19 +798,11 @@ class Command(BaseCommand): invitation.domain = existing_domain.get() else: # Raise an err for now - raise Exception( - f"Domain {existing_domain} wants to be added" - "but doesn't exist in the DB" - ) + raise Exception(f"Domain {existing_domain} wants to be added" "but doesn't exist in the DB") invitation.save() - valid_org_choices = [ - (name, value) - for name, value in DomainApplication.OrganizationChoices.choices - ] - valid_fed_choices = [ - value for name, value in DomainApplication.BranchChoices.choices - ] + valid_org_choices = [(name, value) for name, value in DomainApplication.OrganizationChoices.choices] + valid_fed_choices = [value for name, value in DomainApplication.BranchChoices.choices] valid_agency_choices = DomainApplication.AGENCIES # ====================================================== # ================= DOMAIN INFORMATION ================= @@ -884,11 +830,7 @@ class Command(BaseCommand): TerminalHelper.print_conditional( debug_on, - ( - f"{TerminalColors.YELLOW}" - f"Trying to add: {domain_information_to_create}" - f"{TerminalColors.ENDC}" - ), + (f"{TerminalColors.YELLOW}" f"Trying to add: {domain_information_to_create}" f"{TerminalColors.ENDC}"), ) DomainInformation.objects.bulk_create(domain_information_to_create) diff --git a/src/registrar/management/commands/utility/extra_transition_domain_helper.py b/src/registrar/management/commands/utility/extra_transition_domain_helper.py index 2e71ae600..20c8293e5 100644 --- a/src/registrar/management/commands/utility/extra_transition_domain_helper.py +++ b/src/registrar/management/commands/utility/extra_transition_domain_helper.py @@ -111,9 +111,7 @@ class FileTransitionLog: """Logs every LogItem contained in this object""" for parent_log in self.logs: for child_log in parent_log: - TerminalHelper.print_conditional( - True, child_log.message, child_log.severity - ) + TerminalHelper.print_conditional(True, child_log.message, child_log.severity) def display_logs_by_domain_name(self, domain_name, restrict_type=LogCode.DEFAULT): """Displays all logs of a given domain_name. @@ -130,9 +128,7 @@ class FileTransitionLog: return None for log in domain_logs: - TerminalHelper.print_conditional( - restrict_type != log.code, log.message, log.code - ) + TerminalHelper.print_conditional(restrict_type != log.code, log.message, log.code) def get_logs(self, file_type, domain_name): """Grabs the logs associated with @@ -166,38 +162,21 @@ class LoadExtraTransitionDomain: updated_transition_domain = transition_domain try: # STEP 1: Parse organization data - updated_transition_domain = self.parse_org_data( - domain_name, transition_domain - ) + updated_transition_domain = self.parse_org_data(domain_name, transition_domain) # STEP 2: Parse domain type data - updated_transition_domain = self.parse_domain_type_data( - domain_name, transition_domain - ) + updated_transition_domain = self.parse_domain_type_data(domain_name, transition_domain) - # STEP 3: Parse authority data - updated_transition_domain = self.parse_authority_data( - domain_name, transition_domain - ) + # STEP 3: Parse agency data + updated_transition_domain = self.parse_agency_data(domain_name, transition_domain) - # STEP 4: Parse agency data - updated_transition_domain = self.parse_agency_data( - domain_name, transition_domain - ) - - # STEP 5: Parse creation and expiration data - updated_transition_domain = self.parse_creation_expiration_data( - domain_name, transition_domain - ) + # STEP 4: Parse creation and expiration data + updated_transition_domain = self.parse_creation_expiration_data(domain_name, transition_domain) # Check if the instance has changed before saving updated_transition_domain.save() updated_transition_domains.append(updated_transition_domain) - logger.info( - f"{TerminalColors.OKCYAN}" - f"Successfully updated {domain_name}" - f"{TerminalColors.ENDC}" - ) + logger.info(f"{TerminalColors.OKCYAN}" f"Successfully updated {domain_name}" f"{TerminalColors.ENDC}") # If we run into an exception on this domain, # Just skip over it and log that it happened. @@ -272,8 +251,7 @@ class LoadExtraTransitionDomain: self.parse_logs.create_log_item( EnumFilenames.DOMAIN_ESCROW, LogCode.ERROR, - "Could not add epp_creation_date and epp_expiration_date " - f"on {domain_name}, no data exists.", + "Could not add epp_creation_date and epp_expiration_date " f"on {domain_name}, no data exists.", domain_name, not self.debug, ) @@ -375,10 +353,7 @@ class LoadExtraTransitionDomain: ) return transition_domain - agency_exists = ( - transition_domain.federal_agency is not None - and transition_domain.federal_agency.strip() != "" - ) + agency_exists = transition_domain.federal_agency is not None and transition_domain.federal_agency.strip() != "" if not isinstance(info.active, str) or not info.active.lower() == "y": self.parse_logs.create_log_item( @@ -393,12 +368,11 @@ class LoadExtraTransitionDomain: if not isinstance(info.isfederal, str) or not info.isfederal.lower() == "y": self.parse_logs.create_log_item( EnumFilenames.DOMAIN_ADHOC, - LogCode.ERROR, - f"Could not add non-federal agency {info.agencyname} on {domain_name}", + LogCode.INFO, + f"Adding non-federal agency {info.agencyname} on {domain_name}", domain_name, not self.debug, ) - return transition_domain transition_domain.federal_agency = info.agencyname @@ -414,9 +388,7 @@ class LoadExtraTransitionDomain: return transition_domain - def parse_domain_type_data( - self, domain_name, transition_domain: TransitionDomain - ) -> TransitionDomain: + def parse_domain_type_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: """Grabs organization_type and federal_type from the parsed files and associates it with a transition_domain object, then returns that object.""" if not isinstance(transition_domain, TransitionDomain): @@ -461,12 +433,10 @@ class LoadExtraTransitionDomain: # Are we updating data that already exists, # or are we adding new data in its place? organization_type_exists = ( - transition_domain.organization_type is not None - and transition_domain.organization_type.strip() != "" + transition_domain.organization_type is not None and transition_domain.organization_type.strip() != "" ) federal_type_exists = ( - transition_domain.federal_type is not None - and transition_domain.federal_type.strip() != "" + transition_domain.federal_type is not None and transition_domain.federal_type.strip() != "" ) # If we get two records, then we know it is federal. @@ -500,9 +470,7 @@ class LoadExtraTransitionDomain: return transition_domain - def parse_org_data( - self, domain_name, transition_domain: TransitionDomain - ) -> TransitionDomain: + def parse_org_data(self, domain_name, transition_domain: TransitionDomain) -> TransitionDomain: """Grabs organization_name from the parsed files and associates it with a transition_domain object, then returns that object.""" if not isinstance(transition_domain, TransitionDomain): @@ -520,8 +488,7 @@ class LoadExtraTransitionDomain: return transition_domain desired_property_exists = ( - transition_domain.organization_name is not None - and transition_domain.organization_name.strip() != "" + transition_domain.organization_name is not None and transition_domain.organization_name.strip() != "" ) transition_domain.organization_name = org_info.orgname @@ -538,9 +505,7 @@ class LoadExtraTransitionDomain: return transition_domain - def _add_or_change_message( - self, file_type, var_name, changed_value, domain_name, is_update=False - ): + def _add_or_change_message(self, file_type, var_name, changed_value, domain_name, is_update=False): """Creates a log instance when a property is successfully changed on a given TransitionDomain.""" if not is_update: @@ -744,6 +709,10 @@ class FileDataHolder: # Object data # self.data: Dict[str, type] = {} + # This is used ONLY for development purposes. This behaviour + # is controlled by the --infer_filename flag which is defaulted + # to false. The purpose of this check is to speed up development, + # but it cannot be used by the enduser def try_infer_filename(self, current_file_name, default_file_name): """Tries to match a given filename to a regex, then uses that match to generate the filename.""" @@ -909,7 +878,6 @@ class ExtraTransitionDomain: infer_filenames: bool -> Determines if we should try to infer the filename if a default is passed in """ - self.clear_file_data() for name, value in self.file_data.items(): is_domain_escrow = name == EnumFilenames.DOMAIN_ESCROW filename = f"{value.filename}" @@ -924,8 +892,9 @@ class ExtraTransitionDomain: ) else: if not infer_filenames: - logger.error(f"Could not find file: {filename}") - continue + raise FileNotFoundError( + f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}" + ) # Infer filename logic # # This mode is used for @@ -956,25 +925,22 @@ class ExtraTransitionDomain: is_domain_escrow, ) continue - # Log if we can't find the desired file - logger.error(f"Could not find file: {filename}") + raise FileNotFoundError( + f"{TerminalColors.FAIL}" f"Could not find file {filename} for {name}" f"{TerminalColors.ENDC}" + ) def clear_file_data(self): for item in self.file_data.values(): file_type: FileDataHolder = item file_type.data = {} - def parse_csv_file( - self, file, seperator, dataclass_type, id_field, is_domain_escrow=False - ): + def parse_csv_file(self, file, seperator, dataclass_type, id_field, is_domain_escrow=False): # Domain escrow is an edge case if is_domain_escrow: item_to_return = self._read_domain_escrow(file, seperator) return item_to_return else: - item_to_return = self._read_csv_file( - file, seperator, dataclass_type, id_field - ) + item_to_return = self._read_csv_file(file, seperator, dataclass_type, id_field) return item_to_return # Domain escrow is an edgecase given that its structured differently data-wise. @@ -989,9 +955,7 @@ class ExtraTransitionDomain: creation_date = datetime.strptime(row[7], date_format) expiration_date = datetime.strptime(row[11], date_format) - dict_data[domain_name] = DomainEscrow( - domain_name, creation_date, expiration_date - ) + dict_data[domain_name] = DomainEscrow(domain_name, creation_date, expiration_date) return dict_data def _grab_row_id(self, row, id_field, file, dataclass_type): @@ -1024,9 +988,7 @@ class ExtraTransitionDomain: f"Found bad data in {file}. Attempting to clean." f"{TerminalColors.ENDC}" ) - updated_file_content = self.replace_bad_seperators( - file, f"{seperator}", ";badseperator;" - ) + updated_file_content = self.replace_bad_seperators(file, f"{seperator}", ";badseperator;") dict_data = {} break @@ -1040,11 +1002,7 @@ class ExtraTransitionDomain: # After we clean the data, try to parse it again if updated_file_content: - logger.info( - f"{TerminalColors.MAGENTA}" - f"Retrying load for {file}" - f"{TerminalColors.ENDC}" - ) + logger.info(f"{TerminalColors.MAGENTA}" f"Retrying load for {file}" f"{TerminalColors.ENDC}") # Store the file locally rather than writing to the file. # This is to avoid potential data corruption. updated_file = io.StringIO(updated_file_content) @@ -1055,9 +1013,7 @@ class ExtraTransitionDomain: # is wrong with the file. if None in row: logger.error( - f"{TerminalColors.FAIL}" - f"Corrupt data found for {row_id}. Skipping." - f"{TerminalColors.ENDC}" + f"{TerminalColors.FAIL}" f"Corrupt data found for {row_id}. Skipping." f"{TerminalColors.ENDC}" ) continue diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index b38176172..85bfc8193 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -150,9 +150,7 @@ class TerminalHelper: logger.info(print_statement) @staticmethod - def prompt_for_execution( - system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str - ) -> bool: + def prompt_for_execution(system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str) -> bool: """Create to reduce code complexity. Prompts the user to inspect the given string and asks if they wish to proceed. @@ -195,9 +193,7 @@ class TerminalHelper: return total_line @staticmethod - def print_to_file_conditional( - print_condition: bool, filename: str, file_directory: str, file_contents: str - ): + def print_to_file_conditional(print_condition: bool, filename: str, file_directory: str, file_contents: str): """Sometimes logger outputs get insanely huge.""" if print_condition: # Add a slash if the last character isn't one @@ -206,54 +202,6 @@ class TerminalHelper: # Assemble filepath filepath = f"{file_directory}{filename}.txt" # Write to file - logger.info( - f"{TerminalColors.MAGENTA}Writing to file " - f" {filepath}..." - f"{TerminalColors.ENDC}" - ) + logger.info(f"{TerminalColors.MAGENTA}Writing to file " f" {filepath}..." f"{TerminalColors.ENDC}") with open(f"{filepath}", "w+") as f: f.write(file_contents) - - @staticmethod - def printProgressBar( - iteration, - total, - prefix="Progress:", - suffix="Complete", - decimals=1, - length=100, - fill="█", - printEnd="\r", - ): - """ - Call in a loop to create terminal progress bar - @params: - iteration - Required : current iteration (Int) - total - Required : total iterations (Int) - prefix - Optional : prefix string (Str) - suffix - Optional : suffix string (Str) - decimals - Optional : positive number of decimals in percent complete (Int) - length - Optional : character length of bar (Int) - fill - Optional : bar fill character (Str) - printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) - """ # noqa - - """ - # Initial call to print 0% progress - printProgressBar(0, l, prefix = 'Progress:', suffix = 'Complete', length = 50) - for i, item in enumerate(items): - # Do stuff... - time.sleep(0.1) - # Update Progress Bar - printProgressBar(i + 1, l, prefix = 'Progress:', suffix = 'Complete', length = 50) - """ # noqa - - percent = ("{0:." + str(decimals) + "f}").format( - 100 * (iteration / float(total)) - ) - filledLength = int(length * iteration // total) - bar = fill * filledLength + "-" * (length - filledLength) - print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) - # Print New Line on Complete - if iteration == total: - print() diff --git a/src/registrar/management/commands/utility/transition_domain_arguments.py b/src/registrar/management/commands/utility/transition_domain_arguments.py index 3a31fb8e7..56425a7b7 100644 --- a/src/registrar/management/commands/utility/transition_domain_arguments.py +++ b/src/registrar/management/commands/utility/transition_domain_arguments.py @@ -37,26 +37,14 @@ class TransitionDomainArguments: # Filenames # # = Adhocs =# - agency_adhoc_filename: Optional[str] = field( - default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True - ) - domain_adhoc_filename: Optional[str] = field( - default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True - ) - organization_adhoc_filename: Optional[str] = field( - default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True - ) - authority_adhoc_filename: Optional[str] = field( - default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True - ) + agency_adhoc_filename: Optional[str] = field(default=EnumFilenames.AGENCY_ADHOC.value[1], repr=True) + domain_adhoc_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADHOC.value[1], repr=True) + organization_adhoc_filename: Optional[str] = field(default=EnumFilenames.ORGANIZATION_ADHOC.value[1], repr=True) + authority_adhoc_filename: Optional[str] = field(default=EnumFilenames.AUTHORITY_ADHOC.value[1], repr=True) # = Data files =# - domain_escrow_filename: Optional[str] = field( - default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True - ) - domain_additional_filename: Optional[str] = field( - default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True - ) + domain_escrow_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ESCROW.value[1], repr=True) + domain_additional_filename: Optional[str] = field(default=EnumFilenames.DOMAIN_ADDITIONAL.value[1], repr=True) domain_contacts_filename: Optional[str] = field(default=None, repr=True) domain_statuses_filename: Optional[str] = field(default=None, repr=True) contacts_filename: Optional[str] = field(default=None, repr=True) diff --git a/src/registrar/migrations/0001_initial.py b/src/registrar/migrations/0001_initial.py index 78f0c5b66..8c50c750d 100644 --- a/src/registrar/migrations/0001_initial.py +++ b/src/registrar/migrations/0001_initial.py @@ -32,9 +32,7 @@ class Migration(migrations.Migration): ("password", models.CharField(max_length=128, verbose_name="password")), ( "last_login", - models.DateTimeField( - blank=True, null=True, verbose_name="last login" - ), + models.DateTimeField(blank=True, null=True, verbose_name="last login"), ), ( "is_superuser", @@ -47,35 +45,25 @@ class Migration(migrations.Migration): ( "username", models.CharField( - error_messages={ - "unique": "A user with that username already exists." - }, + error_messages={"unique": "A user with that username already exists."}, help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.", max_length=150, unique=True, - validators=[ - django.contrib.auth.validators.UnicodeUsernameValidator() - ], + validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name="username", ), ), ( "first_name", - models.CharField( - blank=True, max_length=150, verbose_name="first name" - ), + models.CharField(blank=True, max_length=150, verbose_name="first name"), ), ( "last_name", - models.CharField( - blank=True, max_length=150, verbose_name="last name" - ), + models.CharField(blank=True, max_length=150, verbose_name="last name"), ), ( "email", - models.EmailField( - blank=True, max_length=254, verbose_name="email address" - ), + models.EmailField(blank=True, max_length=254, verbose_name="email address"), ), ( "is_staff", @@ -95,9 +83,7 @@ class Migration(migrations.Migration): ), ( "date_joined", - models.DateTimeField( - default=django.utils.timezone.now, verbose_name="date joined" - ), + models.DateTimeField(default=django.utils.timezone.now, verbose_name="date joined"), ), ( "groups", @@ -145,9 +131,7 @@ class Migration(migrations.Migration): ), ( "first_name", - models.TextField( - blank=True, db_index=True, help_text="First name", null=True - ), + models.TextField(blank=True, db_index=True, help_text="First name", null=True), ), ( "middle_name", @@ -155,22 +139,16 @@ class Migration(migrations.Migration): ), ( "last_name", - models.TextField( - blank=True, db_index=True, help_text="Last name", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Last name", null=True), ), ("title", models.TextField(blank=True, help_text="Title", null=True)), ( "email", - models.TextField( - blank=True, db_index=True, help_text="Email", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Email", null=True), ), ( "phone", - models.TextField( - blank=True, db_index=True, help_text="Phone", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Phone", null=True), ), ], ), @@ -280,21 +258,15 @@ class Migration(migrations.Migration): ), ( "unit_type", - models.CharField( - blank=True, help_text="Unit type", max_length=15, null=True - ), + models.CharField(blank=True, help_text="Unit type", max_length=15, null=True), ), ( "unit_number", - models.CharField( - blank=True, help_text="Unit number", max_length=255, null=True - ), + models.CharField(blank=True, help_text="Unit number", max_length=255, null=True), ), ( "state_territory", - models.CharField( - blank=True, help_text="State/Territory", max_length=2, null=True - ), + models.CharField(blank=True, help_text="State/Territory", max_length=2, null=True), ), ( "zip_code", @@ -308,9 +280,7 @@ class Migration(migrations.Migration): ), ( "purpose", - models.TextField( - blank=True, help_text="Purpose of the domain", null=True - ), + models.TextField(blank=True, help_text="Purpose of the domain", null=True), ), ( "security_email", @@ -323,9 +293,7 @@ class Migration(migrations.Migration): ), ( "anything_else", - models.TextField( - blank=True, help_text="Anything else we should know?", null=True - ), + models.TextField(blank=True, help_text="Anything else we should know?", null=True), ), ( "acknowledged_policy", @@ -337,9 +305,7 @@ class Migration(migrations.Migration): ), ( "alternative_domains", - models.ManyToManyField( - blank=True, related_name="alternatives+", to="registrar.website" - ), + models.ManyToManyField(blank=True, related_name="alternatives+", to="registrar.website"), ), ( "authorizing_official", @@ -361,9 +327,7 @@ class Migration(migrations.Migration): ), ( "current_websites", - models.ManyToManyField( - blank=True, related_name="current+", to="registrar.website" - ), + models.ManyToManyField(blank=True, related_name="current+", to="registrar.website"), ), ( "investigator", diff --git a/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py b/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py index c12ca9d34..a6844bfaf 100644 --- a/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py +++ b/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py @@ -48,9 +48,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="address_line2", - field=models.CharField( - blank=True, help_text="Address line 2", max_length=15, null=True - ), + field=models.CharField(blank=True, help_text="Address line 2", max_length=15, null=True), ), migrations.AddField( model_name="domainapplication", diff --git a/src/registrar/migrations/0005_domainapplication_city_and_more.py b/src/registrar/migrations/0005_domainapplication_city_and_more.py index 3d1fc1de1..bd4683b8c 100644 --- a/src/registrar/migrations/0005_domainapplication_city_and_more.py +++ b/src/registrar/migrations/0005_domainapplication_city_and_more.py @@ -22,8 +22,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="federal_agency", - field=models.TextField( - blank=True, help_text="Top level federal agency", null=True - ), + field=models.TextField(blank=True, help_text="Top level federal agency", null=True), ), ] diff --git a/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py b/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py index 909b301ab..49df16fbb 100644 --- a/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py +++ b/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py @@ -21,9 +21,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="type_of_work", - field=models.TextField( - blank=True, help_text="Type of work of the organization", null=True - ), + field=models.TextField(blank=True, help_text="Type of work of the organization", null=True), ), migrations.AlterField( model_name="domainapplication", @@ -33,9 +31,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="address_line2", - field=models.CharField( - blank=True, help_text="Street address line 2", max_length=15, null=True - ), + field=models.CharField(blank=True, help_text="Street address line 2", max_length=15, null=True), ), migrations.AlterField( model_name="domainapplication", @@ -95,9 +91,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="purpose", - field=models.TextField( - blank=True, help_text="Purpose of your domain", null=True - ), + field=models.TextField(blank=True, help_text="Purpose of your domain", null=True), ), migrations.AlterField( model_name="domainapplication", @@ -112,9 +106,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="urbanization", - field=models.TextField( - blank=True, help_text="Urbanization (Puerto Rico only)", null=True - ), + field=models.TextField(blank=True, help_text="Urbanization (Puerto Rico only)", null=True), ), migrations.AlterField( model_name="domainapplication", diff --git a/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py b/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py index bb52e4320..2e2dbed6c 100644 --- a/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py +++ b/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py @@ -21,9 +21,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="contact", name="created_at", - field=models.DateTimeField( - auto_now_add=True, default=django.utils.timezone.now - ), + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False, ), migrations.AddField( @@ -34,9 +32,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="website", name="created_at", - field=models.DateTimeField( - auto_now_add=True, default=django.utils.timezone.now - ), + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False, ), migrations.AddField( diff --git a/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py b/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py index 3010b6cd4..7fce25b9e 100644 --- a/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py +++ b/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py @@ -12,16 +12,12 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="federally_recognized_tribe", - field=models.BooleanField( - help_text="Is the tribe federally recognized", null=True - ), + field=models.BooleanField(help_text="Is the tribe federally recognized", null=True), ), migrations.AddField( model_name="domainapplication", name="state_recognized_tribe", - field=models.BooleanField( - help_text="Is the tribe recognized by a state", null=True - ), + field=models.BooleanField(help_text="Is the tribe recognized by a state", null=True), ), migrations.AddField( model_name="domainapplication", diff --git a/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py b/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py index 2fbc5ab19..91ac00ff9 100644 --- a/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py +++ b/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py @@ -59,8 +59,6 @@ class Migration(migrations.Migration): ), migrations.AddConstraint( model_name="userdomainrole", - constraint=models.UniqueConstraint( - fields=("user", "domain"), name="unique_user_domain_role" - ), + constraint=models.UniqueConstraint(fields=("user", "domain"), name="unique_user_domain_role"), ), ] diff --git a/src/registrar/migrations/0018_domaininformation.py b/src/registrar/migrations/0018_domaininformation.py index 408fa048b..582a6e244 100644 --- a/src/registrar/migrations/0018_domaininformation.py +++ b/src/registrar/migrations/0018_domaininformation.py @@ -64,15 +64,11 @@ class Migration(migrations.Migration): ), ( "federally_recognized_tribe", - models.BooleanField( - help_text="Is the tribe federally recognized", null=True - ), + models.BooleanField(help_text="Is the tribe federally recognized", null=True), ), ( "state_recognized_tribe", - models.BooleanField( - help_text="Is the tribe recognized by a state", null=True - ), + models.BooleanField(help_text="Is the tribe recognized by a state", null=True), ), ( "tribe_name", @@ -172,9 +168,7 @@ class Migration(migrations.Migration): ), ( "purpose", - models.TextField( - blank=True, help_text="Purpose of your domain", null=True - ), + models.TextField(blank=True, help_text="Purpose of your domain", null=True), ), ( "no_other_contacts_rationale", @@ -186,9 +180,7 @@ class Migration(migrations.Migration): ), ( "anything_else", - models.TextField( - blank=True, help_text="Anything else we should know?", null=True - ), + models.TextField(blank=True, help_text="Anything else we should know?", null=True), ), ( "is_policy_acknowledged", diff --git a/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py b/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py index 35e07fe71..89478f5d6 100644 --- a/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py +++ b/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py @@ -76,9 +76,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="org", - field=models.TextField( - help_text="Contact's organization (null ok)", null=True - ), + field=models.TextField(help_text="Contact's organization (null ok)", null=True), ), migrations.AlterField( model_name="publiccontact", @@ -88,9 +86,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="pw", - field=models.TextField( - help_text="Contact's authorization code. 16 characters minimum." - ), + field=models.TextField(help_text="Contact's authorization code. 16 characters minimum."), ), migrations.AlterField( model_name="publiccontact", @@ -115,8 +111,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="voice", - field=models.TextField( - help_text="Contact's phone number. Must be in ITU.E164.2005 format" - ), + field=models.TextField(help_text="Contact's phone number. Must be in ITU.E164.2005 format"), ), ] diff --git a/src/registrar/migrations/0024_alter_contact_email.py b/src/registrar/migrations/0024_alter_contact_email.py index f512d5d82..8dcfc4ffe 100644 --- a/src/registrar/migrations/0024_alter_contact_email.py +++ b/src/registrar/migrations/0024_alter_contact_email.py @@ -12,8 +12,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="contact", name="email", - field=models.EmailField( - blank=True, db_index=True, help_text="Email", max_length=254, null=True - ), + field=models.EmailField(blank=True, db_index=True, help_text="Email", max_length=254, null=True), ), ] diff --git a/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py b/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py index 6e28f5cbb..77da9e79c 100644 --- a/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py +++ b/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py @@ -12,15 +12,11 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="address_line2", - field=models.TextField( - blank=True, help_text="Street address line 2", null=True - ), + field=models.TextField(blank=True, help_text="Street address line 2", null=True), ), migrations.AlterField( model_name="domaininformation", name="address_line2", - field=models.TextField( - blank=True, help_text="Street address line 2", null=True - ), + field=models.TextField(blank=True, help_text="Street address line 2", null=True), ), ] diff --git a/src/registrar/migrations/0031_transitiondomain_and_more.py b/src/registrar/migrations/0031_transitiondomain_and_more.py index e378a33de..9d1153764 100644 --- a/src/registrar/migrations/0031_transitiondomain_and_more.py +++ b/src/registrar/migrations/0031_transitiondomain_and_more.py @@ -95,16 +95,12 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="about_your_organization", - field=models.TextField( - blank=True, help_text="Information about your organization", null=True - ), + field=models.TextField(blank=True, help_text="Information about your organization", null=True), ), migrations.AddField( model_name="domaininformation", name="about_your_organization", - field=models.TextField( - blank=True, help_text="Information about your organization", null=True - ), + field=models.TextField(blank=True, help_text="Information about your organization", null=True), ), migrations.AlterField( model_name="domainapplication", diff --git a/src/registrar/migrations/0043_domain_expiration_date.py b/src/registrar/migrations/0043_domain_expiration_date.py index f2269094c..3ff9add6a 100644 --- a/src/registrar/migrations/0043_domain_expiration_date.py +++ b/src/registrar/migrations/0043_domain_expiration_date.py @@ -13,7 +13,7 @@ class Migration(migrations.Migration): model_name="domain", name="expiration_date", field=models.DateField( - help_text="Duplication of registry's expirationdate saved for ease of reporting", + help_text="Duplication of registry's expiration date saved for ease of reporting", null=True, ), ), diff --git a/src/registrar/migrations/0045_transitiondomain_federal_agency_and_more.py b/src/registrar/migrations/0045_transitiondomain_federal_agency_and_more.py index 742ab4453..1cbcf90c5 100644 --- a/src/registrar/migrations/0045_transitiondomain_federal_agency_and_more.py +++ b/src/registrar/migrations/0045_transitiondomain_federal_agency_and_more.py @@ -27,16 +27,12 @@ class Migration(migrations.Migration): migrations.AddField( model_name="transitiondomain", name="organization_type", - field=models.TextField( - blank=True, help_text="Type of organization", max_length=255, null=True - ), + field=models.TextField(blank=True, help_text="Type of organization", max_length=255, null=True), ), migrations.AddField( model_name="transitiondomain", name="organization_name", - field=models.TextField( - blank=True, db_index=True, help_text="Organization name", null=True - ), + field=models.TextField(blank=True, db_index=True, help_text="Organization name", null=True), ), migrations.AddField( model_name="transitiondomain", diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index c5c0b63f7..cb3b784e9 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -262,10 +262,7 @@ class Domain(TimeStampedModel, DomainHelper): doesn't add the created host to the domain returns ErrorCode (int)""" if addrs is not None and addrs != []: - addresses = [ - epp.Ip(addr=addr, ip="v6" if self.is_ipv6(addr) else None) - for addr in addrs - ] + addresses = [epp.Ip(addr=addr, ip="v6" if self.is_ipv6(addr) else None) for addr in addrs] request = commands.CreateHost(name=host, addrs=addresses) else: request = commands.CreateHost(name=host) @@ -358,15 +355,11 @@ class Domain(TimeStampedModel, DomainHelper): raise NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=nameserver) elif not cls.isSubdomain(name, nameserver) and (ip is not None and ip != []): - raise NameserverError( - code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip - ) + raise NameserverError(code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip) elif ip is not None and ip != []: for addr in ip: if not cls._valid_ip_addr(addr): - raise NameserverError( - code=nsErrorCodes.INVALID_IP, nameserver=nameserver[:40], ip=ip - ) + raise NameserverError(code=nsErrorCodes.INVALID_IP, nameserver=nameserver[:40], ip=ip) return None @classmethod @@ -382,9 +375,7 @@ class Domain(TimeStampedModel, DomainHelper): except ValueError: return False - def getNameserverChanges( - self, hosts: list[tuple[str, list]] - ) -> tuple[list, list, dict, dict]: + def getNameserverChanges(self, hosts: list[tuple[str, list]]) -> tuple[list, list, dict, dict]: """ calls self.nameserver, it should pull from cache but may result in an epp call @@ -420,40 +411,27 @@ class Domain(TimeStampedModel, DomainHelper): else: # TODO - host is being updated when previous was None+new is empty list # add check here - if newHostDict[prevHost] is not None and set( - newHostDict[prevHost] - ) != set(addrs): - self.__class__.checkHostIPCombo( - name=self.name, nameserver=prevHost, ip=newHostDict[prevHost] - ) + if newHostDict[prevHost] is not None and set(newHostDict[prevHost]) != set(addrs): + self.__class__.checkHostIPCombo(name=self.name, nameserver=prevHost, ip=newHostDict[prevHost]) updated_values.append((prevHost, newHostDict[prevHost])) new_values = { - key: newHostDict.get(key) - for key in newHostDict - if key not in previousHostDict and key.strip() != "" + key: newHostDict.get(key) for key in newHostDict if key not in previousHostDict and key.strip() != "" } for nameserver, ip in new_values.items(): - self.__class__.checkHostIPCombo( - name=self.name, nameserver=nameserver, ip=ip - ) + self.__class__.checkHostIPCombo(name=self.name, nameserver=nameserver, ip=ip) return (deleted_values, updated_values, new_values, previousHostDict) def _update_host_values(self, updated_values, oldNameservers): for hostTuple in updated_values: - updated_response_code = self._update_host( - hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0]) - ) + updated_response_code = self._update_host(hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])) if updated_response_code not in [ ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, ErrorCode.OBJECT_EXISTS, ]: - logger.warning( - "Could not update host %s. Error code was: %s " - % (hostTuple[0], updated_response_code) - ) + logger.warning("Could not update host %s. Error code was: %s " % (hostTuple[0], updated_response_code)) def createNewHostList(self, new_values: dict): """convert the dictionary of new values to a list of HostObjSet @@ -469,13 +447,8 @@ class Domain(TimeStampedModel, DomainHelper): hostStringList = [] for key, value in new_values.items(): - createdCode = self._create_host( - host=key, addrs=value - ) # creates in registry - if ( - createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY - or createdCode == ErrorCode.OBJECT_EXISTS - ): + createdCode = self._create_host(host=key, addrs=value) # creates in registry + if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY or createdCode == ErrorCode.OBJECT_EXISTS: hostStringList.append(key) if hostStringList == []: return [], 0 @@ -522,9 +495,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.info("Domain does not have dnssec data defined %s" % err) return None - def getDnssecdataChanges( - self, _dnssecdata: Optional[extensions.DNSSECExtension] - ) -> tuple[dict, dict]: + def getDnssecdataChanges(self, _dnssecdata: Optional[extensions.DNSSECExtension]) -> tuple[dict, dict]: """ calls self.dnssecdata, it should pull from cache but may result in an epp call @@ -550,20 +521,12 @@ class Domain(TimeStampedModel, DomainHelper): if oldDnssecdata and len(oldDnssecdata.dsData) > 0: # if existing dsData not in new dsData, mark for removal - dsDataForRemoval = [ - dsData - for dsData in oldDnssecdata.dsData - if dsData not in _dnssecdata.dsData - ] + dsDataForRemoval = [dsData for dsData in oldDnssecdata.dsData if dsData not in _dnssecdata.dsData] if len(dsDataForRemoval) > 0: remDnssecdata["dsData"] = dsDataForRemoval # if new dsData not in existing dsData, mark for add - dsDataForAdd = [ - dsData - for dsData in _dnssecdata.dsData - if dsData not in oldDnssecdata.dsData - ] + dsDataForAdd = [dsData for dsData in _dnssecdata.dsData if dsData not in oldDnssecdata.dsData] if len(dsDataForAdd) > 0: addDnssecdata["dsData"] = dsDataForAdd else: @@ -598,9 +561,7 @@ class Domain(TimeStampedModel, DomainHelper): if "dsData" in _remDnssecdata and _remDnssecdata["dsData"] is not None: registry.send(remRequest, cleaned=True) except RegistryError as e: - logger.error( - "Error updating DNSSEC, code was %s error was %s" % (e.code, e) - ) + logger.error("Error updating DNSSEC, code was %s error was %s" % (e.code, e)) raise e @nameservers.setter # type: ignore @@ -630,14 +591,10 @@ class Domain(TimeStampedModel, DomainHelper): oldNameservers, ) = self.getNameserverChanges(hosts=hosts) - _ = self._update_host_values( - updated_values, oldNameservers - ) # returns nothing, just need to be run and errors + _ = self._update_host_values(updated_values, oldNameservers) # returns nothing, just need to be run and errors addToDomainList, addToDomainCount = self.createNewHostList(new_values) deleteHostList, deleteCount = self.createDeleteHostList(deleted_values) - responseCode = self.addAndRemoveHostsFromDomain( - hostsToAdd=addToDomainList, hostsToDelete=deleteHostList - ) + responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList) # if unable to update domain raise error and stop if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: @@ -651,19 +608,13 @@ class Domain(TimeStampedModel, DomainHelper): self.dns_needed() self.save() except Exception as err: - logger.info( - "nameserver setter checked for dns_needed state " - "and it did not succeed. Warning: %s" % err - ) + logger.info("nameserver setter checked for dns_needed state and it did not succeed. Warning: %s" % err) elif successTotalNameservers >= 2 and successTotalNameservers <= 13: try: self.ready() self.save() except Exception as err: - logger.info( - "nameserver setter checked for create state " - "and it did not succeed. Warning: %s" % err - ) + logger.info("nameserver setter checked for create state and it did not succeed. Warning: %s" % err) @Cache def statuses(self) -> list[str]: @@ -698,9 +649,7 @@ class Domain(TimeStampedModel, DomainHelper): so follow on additions will update the current registrant""" logger.info("making registrant contact") - self._set_singleton_contact( - contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT - ) + self._set_singleton_contact(contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT) @Cache def administrative_contact(self) -> PublicContact | None: @@ -712,9 +661,7 @@ class Domain(TimeStampedModel, DomainHelper): def administrative_contact(self, contact: PublicContact): logger.info("making admin contact") if contact.contact_type != contact.ContactTypeChoices.ADMINISTRATIVE: - raise ValueError( - "Cannot set a registrant contact with a different contact type" - ) + raise ValueError("Cannot set a registrant contact with a different contact type") self._make_contact_in_registry(contact=contact) self._update_domain_with_contact(contact, rem=False) @@ -736,20 +683,14 @@ class Domain(TimeStampedModel, DomainHelper): try: registry.send(updateContact, cleaned=True) except RegistryError as e: - logger.error( - "Error updating contact, code was %s error was %s" % (e.code, e) - ) + logger.error("Error updating contact, code was %s error was %s" % (e.code, e)) # TODO - ticket 433 human readable error handling here def _update_domain_with_contact(self, contact: PublicContact, rem=False): """adds or removes a contact from a domain rem being true indicates the contact will be removed from registry""" - logger.info( - "_update_domain_with_contact() received type %s " % contact.contact_type - ) - domainContact = epp.DomainContact( - contact=contact.registry_id, type=contact.contact_type - ) + logger.info("_update_domain_with_contact() received type %s " % contact.contact_type) + domainContact = epp.DomainContact(contact=contact.registry_id, type=contact.contact_type) updateDomain = commands.UpdateDomain(name=self.name, add=[domainContact]) if rem: @@ -758,17 +699,12 @@ class Domain(TimeStampedModel, DomainHelper): try: registry.send(updateDomain, cleaned=True) except RegistryError as e: - logger.error( - "Error changing contact on a domain. Error code is %s error was %s" - % (e.code, e) - ) + logger.error("Error changing contact on a domain. Error code is %s error was %s" % (e.code, e)) action = "add" if rem: action = "remove" - raise Exception( - "Can't %s the contact of type %s" % (action, contact.contact_type) - ) + raise Exception("Can't %s the contact of type %s" % (action, contact.contact_type)) @Cache def security_contact(self) -> PublicContact | None: @@ -778,16 +714,11 @@ class Domain(TimeStampedModel, DomainHelper): def _add_registrant_to_existing_domain(self, contact: PublicContact): """Used to change the registrant contact on an existing domain""" - updateDomain = commands.UpdateDomain( - name=self.name, registrant=contact.registry_id - ) + updateDomain = commands.UpdateDomain(name=self.name, registrant=contact.registry_id) try: registry.send(updateDomain, cleaned=True) except RegistryError as e: - logger.error( - "Error changing to new registrant error code is %s, error is %s" - % (e.code, e) - ) + logger.error("Error changing to new registrant error code is %s, error is %s" % (e.code, e)) # TODO-error handling better here? def _set_singleton_contact(self, contact: PublicContact, expectedType: str): # noqa @@ -800,16 +731,10 @@ class Domain(TimeStampedModel, DomainHelper): Will throw error if contact type is not the same as expectType Raises ValueError if expected type doesn't match the contact type""" if expectedType != contact.contact_type: - raise ValueError( - "Cannot set a contact with a different contact type," - " expected type was %s" % expectedType - ) + raise ValueError("Cannot set a contact with a different contact type, expected type was %s" % expectedType) isRegistrant = contact.contact_type == contact.ContactTypeChoices.REGISTRANT - isEmptySecurity = ( - contact.contact_type == contact.ContactTypeChoices.SECURITY - and contact.email == "" - ) + isEmptySecurity = contact.contact_type == contact.ContactTypeChoices.SECURITY and contact.email == "" # get publicContact objects that have the matching # domain and type but a different id @@ -827,10 +752,7 @@ class Domain(TimeStampedModel, DomainHelper): # contact is already added to the domain, but something may have changed on it alreadyExistsInRegistry = errorCode == ErrorCode.OBJECT_EXISTS # if an error occured besides duplication, stop - if ( - not alreadyExistsInRegistry - and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY - ): + if not alreadyExistsInRegistry and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: # TODO- ticket #433 look here for error handling raise RegistryError(code=errorCode) @@ -839,9 +761,7 @@ class Domain(TimeStampedModel, DomainHelper): # if has conflicting contacts in our db remove them if hasOtherContact: - logger.info( - "_set_singleton_contact()-> updating domain, removing old contact" - ) + logger.info("_set_singleton_contact()-> updating domain, removing old contact") existing_contact = ( PublicContact.objects.exclude(registry_id=contact.registry_id) @@ -859,9 +779,7 @@ class Domain(TimeStampedModel, DomainHelper): self._update_domain_with_contact(contact=existing_contact, rem=True) existing_contact.delete() except Exception as err: - logger.error( - "Raising error after removing and adding a new contact" - ) + logger.error("Raising error after removing and adding a new contact") raise (err) # update domain with contact or update the contact itself @@ -870,9 +788,7 @@ class Domain(TimeStampedModel, DomainHelper): self._update_domain_with_contact(contact=contact, rem=False) # if already exists just update elif alreadyExistsInRegistry: - current_contact = PublicContact.objects.filter( - registry_id=contact.registry_id - ).get() + current_contact = PublicContact.objects.filter(registry_id=contact.registry_id).get() if current_contact.email != contact.email: self._update_epp_contact(contact=contact) @@ -880,9 +796,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.info("removing security contact and setting default again") # get the current contact registry id for security - current_contact = PublicContact.objects.filter( - registry_id=contact.registry_id - ).get() + current_contact = PublicContact.objects.filter(registry_id=contact.registry_id).get() # don't let user delete the default without adding a new email if current_contact.email != PublicContact.get_default_security().email: @@ -900,9 +814,7 @@ class Domain(TimeStampedModel, DomainHelper): from domain information (not domain application) and should have the security email from DomainApplication""" logger.info("making security contact in registry") - self._set_singleton_contact( - contact, expectedType=contact.ContactTypeChoices.SECURITY - ) + self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.SECURITY) @Cache def technical_contact(self) -> PublicContact | None: @@ -913,9 +825,7 @@ class Domain(TimeStampedModel, DomainHelper): @technical_contact.setter # type: ignore def technical_contact(self, contact: PublicContact): logger.info("making technical contact") - self._set_singleton_contact( - contact, expectedType=contact.ContactTypeChoices.TECHNICAL - ) + self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.TECHNICAL) def is_active(self) -> bool: """Currently just returns if the state is created, @@ -996,17 +906,13 @@ class Domain(TimeStampedModel, DomainHelper): expiration_date = DateField( null=True, - help_text=( - "Duplication of registry's expiration" "date saved for ease of reporting" - ), + help_text=("Duplication of registry's expiration date saved for ease of reporting"), ) def isActive(self): return self.state == Domain.State.CREATED - def map_epp_contact_to_public_contact( - self, contact: eppInfo.InfoContactResultData, contact_id, contact_type - ): + def map_epp_contact_to_public_contact(self, contact: eppInfo.InfoContactResultData, contact_id, contact_type): """Maps the Epp contact representation to a PublicContact object. contact -> eppInfo.InfoContactResultData: The converted contact object @@ -1028,10 +934,7 @@ class Domain(TimeStampedModel, DomainHelper): # Since contact_id is registry_id, # check that its the right length contact_id_length = len(contact_id) - if ( - contact_id_length > PublicContact.get_max_id_length() - or contact_id_length < 1 - ): + if contact_id_length > PublicContact.get_max_id_length() or contact_id_length < 1: raise ContactError(code=ContactErrorCodes.CONTACT_ID_INVALID_LENGTH) if not isinstance(contact, eppInfo.InfoContactResultData): @@ -1114,9 +1017,7 @@ class Domain(TimeStampedModel, DomainHelper): ) raise error - def generic_contact_getter( - self, contact_type_choice: PublicContact.ContactTypeChoices - ) -> PublicContact | None: + def generic_contact_getter(self, contact_type_choice: PublicContact.ContactTypeChoices) -> PublicContact | None: """Retrieves the desired PublicContact from the registry. This abstracts the caching and EPP retrieval for all contact items and thus may result in EPP calls being sent. @@ -1187,9 +1088,7 @@ class Domain(TimeStampedModel, DomainHelper): if contact_type == PublicContact.ContactTypeChoices.REGISTRANT: desired_contact = None if isinstance(contacts, str): - desired_contact = self._registrant_to_public_contact( - self._cache["registrant"] - ) + desired_contact = self._registrant_to_public_contact(self._cache["registrant"]) # Set the cache with the updated object # for performance reasons. if "registrant" in self._cache: @@ -1203,9 +1102,7 @@ class Domain(TimeStampedModel, DomainHelper): if contacts is not None and contact_type in contacts: _registry_id = contacts.get(contact_type) - desired = PublicContact.objects.filter( - registry_id=_registry_id, domain=self, contact_type=contact_type - ) + desired = PublicContact.objects.filter(registry_id=_registry_id, domain=self, contact_type=contact_type) if desired.count() == 1: return desired.get() @@ -1214,10 +1111,7 @@ class Domain(TimeStampedModel, DomainHelper): return None def _handle_registrant_contact(self, contact): - if ( - contact.contact_type is not None - and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT - ): + if contact.contact_type is not None and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT: return contact else: raise ValueError("Invalid contact object for registrant_contact") @@ -1297,9 +1191,7 @@ class Domain(TimeStampedModel, DomainHelper): administrative_contact = self.get_default_administrative_contact() administrative_contact.save() - @transition( - field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD - ) + @transition(field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD) def place_client_hold(self, ignoreEPP=False): """place a clienthold on a domain (no longer should resolve) ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains) @@ -1325,9 +1217,7 @@ class Domain(TimeStampedModel, DomainHelper): self._remove_client_hold() # TODO -on the client hold ticket any additional error handling here - @transition( - field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED - ) + @transition(field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED) def deletedInEpp(self): """Domain is deleted in epp but is saved in our database. Error handling should be provided by the caller.""" @@ -1345,9 +1235,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.error("Could not delete domain. FSM failure: {err}") raise err except Exception as err: - logger.error( - f"Could not delete domain. An unspecified error occured: {err}" - ) + logger.error(f"Could not delete domain. An unspecified error occured: {err}") raise err else: self._invalidate_cache() @@ -1407,9 +1295,7 @@ class Domain(TimeStampedModel, DomainHelper): is_security = contact.contact_type == contact.ContactTypeChoices.SECURITY DF = epp.DiscloseField fields = {DF.EMAIL} - disclose = ( - is_security and contact.email != PublicContact.get_default_security().email - ) + disclose = is_security and contact.email != PublicContact.get_default_security().email # Will only disclose DF.EMAIL if its not the default return epp.Disclose( flag=disclose, @@ -1421,9 +1307,7 @@ class Domain(TimeStampedModel, DomainHelper): name=contact.name, addr=epp.ContactAddr( street=[ - getattr(contact, street) - for street in ["street1", "street2", "street3"] - if hasattr(contact, street) + getattr(contact, street) for street in ["street1", "street2", "street3"] if hasattr(contact, street) ], # type: ignore city=contact.city, pc=contact.pc, @@ -1488,9 +1372,7 @@ class Domain(TimeStampedModel, DomainHelper): data = registry.send(req, cleaned=True).res_data[0] # Map the object we recieved from EPP to a PublicContact - mapped_object = self.map_epp_contact_to_public_contact( - data, domainContact.contact, domainContact.type - ) + mapped_object = self.map_epp_contact_to_public_contact(data, domainContact.contact, domainContact.type) # Find/create it in the DB in_db = self._get_or_create_public_contact(mapped_object) @@ -1503,9 +1385,7 @@ class Domain(TimeStampedModel, DomainHelper): return self._request_contact_info(contact) except RegistryError as e: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: - logger.info( - "_get_or_create_contact()-> contact doesn't exist so making it" - ) + logger.info("_get_or_create_contact()-> contact doesn't exist so making it") contact.domain = self contact.save() # this will call the function based on type of contact return self._request_contact_info(contact=contact) @@ -1560,9 +1440,7 @@ class Domain(TimeStampedModel, DomainHelper): return [] for ip_addr in ip_list: - edited_ip_list.append( - epp.Ip(addr=ip_addr, ip="v6" if self.is_ipv6(ip_addr) else None) - ) + edited_ip_list.append(epp.Ip(addr=ip_addr, ip="v6" if self.is_ipv6(ip_addr) else None)) return edited_ip_list @@ -1579,12 +1457,7 @@ class Domain(TimeStampedModel, DomainHelper): """ try: - if ( - ip_list is None - or len(ip_list) == 0 - and isinstance(old_ip_list, list) - and len(old_ip_list) != 0 - ): + if ip_list is None or len(ip_list) == 0 and isinstance(old_ip_list, list) and len(old_ip_list) != 0: return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY added_ip_list = set(ip_list).difference(old_ip_list) @@ -1607,9 +1480,7 @@ class Domain(TimeStampedModel, DomainHelper): else: raise e - def addAndRemoveHostsFromDomain( - self, hostsToAdd: list[str], hostsToDelete: list[str] - ): + def addAndRemoveHostsFromDomain(self, hostsToAdd: list[str], hostsToDelete: list[str]): """sends an UpdateDomain message to the registry with the hosts provided Args: hostsToDelete (list[epp.HostObjSet])- list of host objects to delete @@ -1624,22 +1495,14 @@ class Domain(TimeStampedModel, DomainHelper): return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY try: - updateReq = commands.UpdateDomain( - name=self.name, rem=hostsToDelete, add=hostsToAdd - ) + updateReq = commands.UpdateDomain(name=self.name, rem=hostsToDelete, add=hostsToAdd) - logger.info( - "addAndRemoveHostsFromDomain()-> sending update domain req as %s" - % updateReq - ) + logger.info("addAndRemoveHostsFromDomain()-> sending update domain req as %s" % updateReq) response = registry.send(updateReq, cleaned=True) return response.code except RegistryError as e: - logger.error( - "Error addAndRemoveHostsFromDomain, code was %s error was %s" - % (e.code, e) - ) + logger.error("Error addAndRemoveHostsFromDomain, code was %s error was %s" % (e.code, e)) return e.code def _delete_hosts_if_not_used(self, hostsToDelete: list[str]): @@ -1657,22 +1520,13 @@ class Domain(TimeStampedModel, DomainHelper): for nameserver in hostsToDelete: deleteHostReq = commands.DeleteHost(name=nameserver) registry.send(deleteHostReq, cleaned=True) - logger.info( - "_delete_hosts_if_not_used()-> sending delete host req as %s" - % deleteHostReq - ) + logger.info("_delete_hosts_if_not_used()-> sending delete host req as %s" % deleteHostReq) except RegistryError as e: if e.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: - logger.info( - "Did not remove host %s because it is in use on another domain." - % nameserver - ) + logger.info("Did not remove host %s because it is in use on another domain." % nameserver) else: - logger.error( - "Error _delete_hosts_if_not_used, code was %s error was %s" - % (e.code, e) - ) + logger.error("Error _delete_hosts_if_not_used, code was %s error was %s" % (e.code, e)) def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False): """Contact registry for info about a domain.""" @@ -1768,9 +1622,7 @@ class Domain(TimeStampedModel, DomainHelper): # Raise an error if we find duplicates. # This should not occur if db_contact.count() > 1: - raise Exception( - f"Multiple contacts found for {public_contact.contact_type}" - ) + raise Exception(f"Multiple contacts found for {public_contact.contact_type}") # Save to DB if it doesn't exist already. if db_contact.count() == 0: @@ -1784,10 +1636,7 @@ class Domain(TimeStampedModel, DomainHelper): # Does the item we're grabbing match # what we have in our DB? - if ( - existing_contact.email != public_contact.email - or existing_contact.registry_id != public_contact.registry_id - ): + if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id: existing_contact.delete() public_contact.save() logger.warning("Requested PublicContact is out of sync " "with DB.") @@ -1809,9 +1658,7 @@ class Domain(TimeStampedModel, DomainHelper): # Grabs the expanded contact full_object = self._request_contact_info(contact) # Maps it to type PublicContact - mapped_object = self.map_epp_contact_to_public_contact( - full_object, contact.registry_id, contact.contact_type - ) + mapped_object = self.map_epp_contact_to_public_contact(full_object, contact.registry_id, contact.contact_type) return self._get_or_create_public_contact(mapped_object) def _invalidate_cache(self): @@ -1829,6 +1676,4 @@ class Domain(TimeStampedModel, DomainHelper): if property in self._cache: return self._cache[property] else: - raise KeyError( - "Requested key %s was not found in registry cache." % str(property) - ) + raise KeyError("Requested key %s was not found in registry cache." % str(property)) diff --git a/src/registrar/models/domain_application.py b/src/registrar/models/domain_application.py index 708fceb35..86b8a0f7a 100644 --- a/src/registrar/models/domain_application.py +++ b/src/registrar/models/domain_application.py @@ -131,8 +131,7 @@ class DomainApplication(TimeStampedModel): FEDERAL = ( "federal", - "Federal: an agency of the U.S. government's executive, " - "legislative, or judicial branches", + "Federal: an agency of the U.S. government's executive, legislative, or judicial branches", ) INTERSTATE = "interstate", "Interstate: an organization of two or more states" STATE_OR_TERRITORY = ( @@ -143,8 +142,7 @@ class DomainApplication(TimeStampedModel): ) TRIBAL = ( "tribal", - "Tribal: a tribal government recognized by the federal or a state " - "government", + "Tribal: a tribal government recognized by the federal or a state government", ) COUNTY = "county", "County: a county, parish, or borough" CITY = "city", "City: a city, town, township, village, etc." @@ -154,8 +152,7 @@ class DomainApplication(TimeStampedModel): ) SCHOOL_DISTRICT = ( "school_district", - "School district: a school district that is not part of a local " - "government", + "School district: a school district that is not part of a local government", ) class BranchChoices(models.TextChoices): @@ -169,10 +166,7 @@ class DomainApplication(TimeStampedModel): "American Battle Monuments Commission", "AMTRAK", "Appalachian Regional Commission", - ( - "Appraisal Subcommittee of the Federal Financial " - "Institutions Examination Council" - ), + ("Appraisal Subcommittee of the Federal Financial " "Institutions Examination Council"), "Appraisal Subcommittee", "Architect of the Capitol", "Armed Forces Retirement Home", @@ -572,9 +566,7 @@ class DomainApplication(TimeStampedModel): return not self.approved_domain.is_active() return True - def _send_status_update_email( - self, new_status, email_template, email_template_subject - ): + def _send_status_update_email(self, new_status, email_template, email_template_subject): """Send a atatus update email to the submitter. The email goes to the email address that the submitter gave as their @@ -583,9 +575,7 @@ class DomainApplication(TimeStampedModel): """ if self.submitter is None or self.submitter.email is None: - logger.warning( - f"Cannot send {new_status} email, no submitter email address." - ) + logger.warning(f"Cannot send {new_status} email, no submitter email address.") return try: send_templated_email( @@ -598,9 +588,7 @@ class DomainApplication(TimeStampedModel): except EmailSendingError: logger.warning("Failed to send confirmation email", exc_info=True) - @transition( - field="status", source=[STARTED, ACTION_NEEDED, WITHDRAWN], target=SUBMITTED - ) + @transition(field="status", source=[STARTED, ACTION_NEEDED, WITHDRAWN], target=SUBMITTED) def submit(self): """Submit an application that is started. diff --git a/src/registrar/models/domain_information.py b/src/registrar/models/domain_information.py index 377d75685..d2bc5c53d 100644 --- a/src/registrar/models/domain_information.py +++ b/src/registrar/models/domain_information.py @@ -54,6 +54,7 @@ class DomainInformation(TimeStampedModel): blank=True, help_text="Type of Organization", ) + federally_recognized_tribe = models.BooleanField( null=True, help_text="Is the tribe federally recognized", diff --git a/src/registrar/models/domain_invitation.py b/src/registrar/models/domain_invitation.py index dff03fb87..1e0b7fec8 100644 --- a/src/registrar/models/domain_invitation.py +++ b/src/registrar/models/domain_invitation.py @@ -57,9 +57,7 @@ class DomainInvitation(TimeStampedModel): except User.DoesNotExist: # should not happen because a matching user should exist before # we retrieve this invitation - raise RuntimeError( - "Cannot find the user to retrieve this domain invitation." - ) + raise RuntimeError("Cannot find the user to retrieve this domain invitation.") # and create a role for that user on this domain _, created = UserDomainRole.objects.get_or_create( @@ -68,6 +66,4 @@ class DomainInvitation(TimeStampedModel): if not created: # something strange happened and this role already existed when # the invitation was retrieved. Log that this occurred. - logger.warn( - "Invitation %s was retrieved for a role that already exists.", self - ) + logger.warn("Invitation %s was retrieved for a role that already exists.", self) diff --git a/src/registrar/models/public_contact.py b/src/registrar/models/public_contact.py index 4afe3c467..c49a66260 100644 --- a/src/registrar/models/public_contact.py +++ b/src/registrar/models/public_contact.py @@ -10,9 +10,7 @@ from .utility.time_stamped_model import TimeStampedModel def get_id(): """Generate a 16 character registry ID with a low probability of collision.""" day = datetime.today().strftime("%A")[:2] - rand = "".join( - choices(ascii_uppercase + ascii_lowercase + digits, k=14) # nosec B311 - ) + rand = "".join(choices(ascii_uppercase + ascii_lowercase + digits, k=14)) # nosec B311 return f"{day}{rand}" @@ -69,16 +67,12 @@ class PublicContact(TimeStampedModel): pc = models.TextField(null=False, help_text="Contact's postal code") cc = models.TextField(null=False, help_text="Contact's country code") email = models.TextField(null=False, help_text="Contact's email address") - voice = models.TextField( - null=False, help_text="Contact's phone number. Must be in ITU.E164.2005 format" - ) + voice = models.TextField(null=False, help_text="Contact's phone number. Must be in ITU.E164.2005 format") fax = models.TextField( null=True, help_text="Contact's fax number (null ok). Must be in ITU.E164.2005 format.", ) - pw = models.TextField( - null=False, help_text="Contact's authorization code. 16 characters minimum." - ) + pw = models.TextField(null=False, help_text="Contact's authorization code. 16 characters minimum.") @classmethod def get_default_registrant(cls): @@ -154,8 +148,4 @@ class PublicContact(TimeStampedModel): return cls._meta.get_field("registry_id").max_length def __str__(self): - return ( - f"{self.name} <{self.email}>" - f"id: {self.registry_id} " - f"type: {self.contact_type}" - ) + return f"{self.name} <{self.email}>" f"id: {self.registry_id} " f"type: {self.contact_type}" diff --git a/src/registrar/models/transition_domain.py b/src/registrar/models/transition_domain.py index d1283f1ee..bae07d18f 100644 --- a/src/registrar/models/transition_domain.py +++ b/src/registrar/models/transition_domain.py @@ -66,15 +66,11 @@ class TransitionDomain(TimeStampedModel): ) epp_creation_date = models.DateField( null=True, - help_text=( - "Duplication of registry's creation " "date saved for ease of reporting" - ), + help_text=("Duplication of registry's creation " "date saved for ease of reporting"), ) epp_expiration_date = models.DateField( null=True, - help_text=( - "Duplication of registry's expiration " "date saved for ease of reporting" - ), + help_text=("Duplication of registry's expiration " "date saved for ease of reporting"), ) first_name = models.TextField( null=True, diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index 7207a5a61..bde73d3cb 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -68,9 +68,7 @@ class User(AbstractUser): def check_domain_invitations_on_login(self): """When a user first arrives on the site, we need to retrieve any domain invitations that match their email address.""" - for invitation in DomainInvitation.objects.filter( - email=self.email, status=DomainInvitation.INVITED - ): + for invitation in DomainInvitation.objects.filter(email=self.email, status=DomainInvitation.INVITED): try: invitation.retrieve() invitation.save() @@ -78,9 +76,7 @@ class User(AbstractUser): # retrieving should not fail because of a missing user, but # if it does fail, log the error so a new user can continue # logging in - logger.warn( - "Failed to retrieve invitation %s", invitation, exc_info=True - ) + logger.warn("Failed to retrieve invitation %s", invitation, exc_info=True) def create_domain_and_invite(self, transition_domain: TransitionDomain): transition_domain_name = transition_domain.domain_name @@ -89,9 +85,7 @@ class User(AbstractUser): # type safety check. name should never be none if transition_domain_name is not None: - new_domain = Domain( - name=transition_domain_name, state=transition_domain_status - ) + new_domain = Domain(name=transition_domain_name, state=transition_domain_status) new_domain.save() # check that a domain invitation doesn't already # exist for this e-mail / Domain pair @@ -100,9 +94,7 @@ class User(AbstractUser): ).exists() if not domain_email_already_in_domain_invites: # Create new domain invitation - new_domain_invitation = DomainInvitation( - email=transition_domain_email.lower(), domain=new_domain - ) + new_domain_invitation = DomainInvitation(email=transition_domain_email.lower(), domain=new_domain) new_domain_invitation.save() def check_transition_domains_on_login(self): @@ -129,9 +121,7 @@ class User(AbstractUser): # with our data and migrations need to be run again. # Get the domain that corresponds with this transition domain - domain_exists = Domain.objects.filter( - name=transition_domain.domain_name - ).exists() + domain_exists = Domain.objects.filter(name=transition_domain.domain_name).exists() if not domain_exists: logger.warn( """There are transition domains without @@ -147,17 +137,15 @@ class User(AbstractUser): # Create a domain information object, if one doesn't # already exist - domain_info_exists = DomainInformation.objects.filter( - domain=domain - ).exists() + domain_info_exists = DomainInformation.objects.filter(domain=domain).exists() if not domain_info_exists: new_domain_info = DomainInformation(creator=self, domain=domain) new_domain_info.save() - def first_login(self): - """Callback when the user is authenticated for the very first time. + def on_each_login(self): + """Callback each time the user is authenticated. - When a user first arrives on the site, we need to retrieve any domain + When a user arrives on the site each time, we need to retrieve any domain invitations that match their email address. We also need to check if they are logging in with the same e-mail diff --git a/src/registrar/models/user_domain_role.py b/src/registrar/models/user_domain_role.py index 7b1f550d3..479f75089 100644 --- a/src/registrar/models/user_domain_role.py +++ b/src/registrar/models/user_domain_role.py @@ -44,7 +44,5 @@ class UserDomainRole(TimeStampedModel): constraints = [ # a user can have only one role on a given domain, that is, there can # be only a single row with a certain (user, domain) pair. - models.UniqueConstraint( - fields=["user", "domain"], name="unique_user_domain_role" - ) + models.UniqueConstraint(fields=["user", "domain"], name="unique_user_domain_role") ] diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index 568741786..cf261286e 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -87,14 +87,10 @@ class UserGroup(Group): permissions = permission["permissions"] # Retrieve the content type for the app and model - content_type = ContentType.objects.get( - app_label=app_label, model=model_name - ) + content_type = ContentType.objects.get(app_label=app_label, model=model_name) # Retrieve the permissions based on their codenames - permissions = Permission.objects.filter( - content_type=content_type, codename__in=permissions - ) + permissions = Permission.objects.filter(content_type=content_type, codename__in=permissions) # Assign the permissions to the group cisa_analysts_group.permissions.add(*permissions) @@ -113,9 +109,7 @@ class UserGroup(Group): ) cisa_analysts_group.save() - logger.debug( - "CISA Analyt permissions added to group " + cisa_analysts_group.name - ) + logger.debug("CISA Analyt permissions added to group " + cisa_analysts_group.name) except Exception as e: logger.error(f"Error creating analyst permissions group: {e}") diff --git a/src/registrar/templates/base.html b/src/registrar/templates/base.html index 68a5c69ea..af432e5e9 100644 --- a/src/registrar/templates/base.html +++ b/src/registrar/templates/base.html @@ -3,6 +3,14 @@
+ {% if IS_PRODUCTION %} + + + + {% endif %} +