diff --git a/src/.flake8 b/src/.flake8 index db9f3094a..b3fa0993f 100644 --- a/src/.flake8 +++ b/src/.flake8 @@ -1,5 +1,5 @@ [flake8] -max-line-length = 88 +max-line-length = 120 max-complexity = 10 extend-ignore = E203 per-file-ignores = __init__.py:F401,F403,E402 diff --git a/src/api/views.py b/src/api/views.py index 694bea349..2cb23a9b2 100644 --- a/src/api/views.py +++ b/src/api/views.py @@ -10,9 +10,7 @@ from login_required import login_not_required from cachetools.func import ttl_cache -DOMAIN_FILE_URL = ( - "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv" -) +DOMAIN_FILE_URL = "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv" DOMAIN_API_MESSAGES = { @@ -22,8 +20,7 @@ DOMAIN_API_MESSAGES = { "extra_dots": "Enter the .gov domain you want without any periods.", "unavailable": "That domain isn’t available. Try entering another one." " Contact us if you need help coming up with a domain.", - "invalid": "Enter a domain using only letters," - " numbers, or hyphens (though we don't recommend using hyphens).", + "invalid": "Enter a domain using only letters, numbers, or hyphens (though we don't recommend using hyphens).", "success": "That domain is available!", "error": "Error finding domain availability.", } @@ -82,24 +79,13 @@ def available(request, domain=""): DraftDomain = apps.get_model("registrar.DraftDomain") # validate that the given domain could be a domain name and fail early if # not. - if not ( - DraftDomain.string_could_be_domain(domain) - or DraftDomain.string_could_be_domain(domain + ".gov") - ): - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["invalid"]} - ) + if not (DraftDomain.string_could_be_domain(domain) or DraftDomain.string_could_be_domain(domain + ".gov")): + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["invalid"]}) # a domain is available if it is NOT in the list of current domains try: if check_domain_available(domain): - return JsonResponse( - {"available": True, "message": DOMAIN_API_MESSAGES["success"]} - ) + return JsonResponse({"available": True, "message": DOMAIN_API_MESSAGES["success"]}) else: - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["unavailable"]} - ) + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["unavailable"]}) except Exception: - return JsonResponse( - {"available": False, "message": DOMAIN_API_MESSAGES["error"]} - ) + return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["error"]}) diff --git a/src/djangooidc/oidc.py b/src/djangooidc/oidc.py index 286e519a4..b2b1acd8e 100644 --- a/src/djangooidc/oidc.py +++ b/src/djangooidc/oidc.py @@ -72,9 +72,7 @@ class Client(oic.Client): try: # discover and store the provider (OP) urls, etc self.provider_config(provider["srv_discovery_url"]) - self.store_registration_info( - RegistrationResponse(**provider["client_registration"]) - ) + self.store_registration_info(RegistrationResponse(**provider["client_registration"])) except Exception as err: logger.error(err) logger.error( @@ -169,9 +167,7 @@ class Client(oic.Client): if isinstance(authn_response, ErrorResponse): error = authn_response.get("error", "") if error == "login_required": - logger.warning( - "User was not logged in (%s), trying again for %s" % (error, state) - ) + logger.warning("User was not logged in (%s), trying again for %s" % (error, state)) return self.create_authn_request(session) else: logger.error("Unable to process response %s for %s" % (error, state)) @@ -190,9 +186,7 @@ class Client(oic.Client): if self.behaviour.get("response_type") == "code": # need an access token to get user info (and to log the user out later) - self._request_token( - authn_response["state"], authn_response["code"], session - ) + self._request_token(authn_response["state"], authn_response["code"], session) user_info = self._get_user_info(state, session) @@ -216,10 +210,7 @@ class Client(oic.Client): # ErrorResponse is not raised, it is passed back... if isinstance(info_response, ErrorResponse): - logger.error( - "Unable to get user info (%s) for %s" - % (info_response.get("error", ""), state) - ) + logger.error("Unable to get user info (%s) for %s" % (info_response.get("error", ""), state)) raise o_e.AuthenticationFailed(locator=state) logger.debug("user info: %s" % info_response) @@ -249,10 +240,7 @@ class Client(oic.Client): # ErrorResponse is not raised, it is passed back... if isinstance(token_response, ErrorResponse): - logger.error( - "Unable to get token (%s) for %s" - % (token_response.get("error", ""), state) - ) + logger.error("Unable to get token (%s) for %s" % (token_response.get("error", ""), state)) raise o_e.AuthenticationFailed(locator=state) logger.debug("token response %s" % token_response) diff --git a/src/djangooidc/tests/test_views.py b/src/djangooidc/tests/test_views.py index 30caf3713..5ff36a77c 100644 --- a/src/djangooidc/tests/test_views.py +++ b/src/djangooidc/tests/test_views.py @@ -85,12 +85,8 @@ class ViewsTest(TestCase): session.save() # mock mock_client.callback.side_effect = self.user_info - mock_client.registration_response = { - "post_logout_redirect_uris": ["http://example.com/back"] - } - mock_client.provider_info = { - "end_session_endpoint": "http://example.com/log_me_out" - } + mock_client.registration_response = {"post_logout_redirect_uris": ["http://example.com/back"]} + mock_client.provider_info = {"end_session_endpoint": "http://example.com/log_me_out"} mock_client.client_id = "TEST" # test with less_console_noise(): diff --git a/src/djangooidc/views.py b/src/djangooidc/views.py index 2db304509..ea893daf2 100644 --- a/src/djangooidc/views.py +++ b/src/djangooidc/views.py @@ -92,11 +92,7 @@ def logout(request, next_page=None): and len(CLIENT.registration_response["post_logout_redirect_uris"]) > 0 ): request_args.update( - { - "post_logout_redirect_uri": CLIENT.registration_response[ - "post_logout_redirect_uris" - ][0] - } + {"post_logout_redirect_uri": CLIENT.registration_response["post_logout_redirect_uris"][0]} ) url = CLIENT.provider_info["end_session_endpoint"] diff --git a/src/epplibwrapper/client.py b/src/epplibwrapper/client.py index e4b7a5d53..80ed278ff 100644 --- a/src/epplibwrapper/client.py +++ b/src/epplibwrapper/client.py @@ -95,9 +95,7 @@ class EPPLibWrapper: try: if not self.pool_status.connection_success: - raise LoginError( - "Couldn't connect to the registry after three attempts" - ) + raise LoginError("Couldn't connect to the registry after three attempts") with self._pool.get() as connection: response = connection.send(command) except Timeout as t: @@ -239,6 +237,4 @@ try: logger.info("registry client initialized") except Exception: CLIENT = None # type: ignore - logger.warning( - "Unable to configure epplib. Registrar cannot contact registry.", exc_info=True - ) + logger.warning("Unable to configure epplib. Registrar cannot contact registry.", exc_info=True) diff --git a/src/epplibwrapper/tests/test_pool.py b/src/epplibwrapper/tests/test_pool.py index 4e919ba76..1a5a2ccf7 100644 --- a/src/epplibwrapper/tests/test_pool.py +++ b/src/epplibwrapper/tests/test_pool.py @@ -103,9 +103,7 @@ class TestConnectionPool(TestCase): ], cl_id="gov2023-ote", cr_id="gov2023-ote", - cr_date=datetime.datetime( - 2023, 8, 15, 23, 56, 36, tzinfo=tzlocal() - ), + cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()), up_id="gov2023-ote", up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()), tr_date=None, @@ -129,9 +127,7 @@ class TestConnectionPool(TestCase): # Mock what happens inside the "with" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) stack.enter_context(patch.object(SocketTransport, "send", self.fake_send)) stack.enter_context(patch.object(SocketTransport, "receive", fake_receive)) @@ -176,9 +172,7 @@ class TestConnectionPool(TestCase): ], cl_id="gov2023-ote", cr_id="gov2023-ote", - cr_date=datetime.datetime( - 2023, 8, 15, 23, 56, 36, tzinfo=tzlocal() - ), + cr_date=datetime.datetime(2023, 8, 15, 23, 56, 36, tzinfo=tzlocal()), up_id="gov2023-ote", up_date=datetime.datetime(2023, 8, 17, 2, 3, 19, tzinfo=tzlocal()), tr_date=None, @@ -202,9 +196,7 @@ class TestConnectionPool(TestCase): # Mock what happens inside the "with" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) stack.enter_context(patch.object(SocketTransport, "send", self.fake_send)) stack.enter_context(patch.object(SocketTransport, "receive", fake_receive)) @@ -218,9 +210,7 @@ class TestConnectionPool(TestCase): # that they cannot connect to EPP with self.assertRaises(RegistryError): expected = "InfoDomain failed to execute due to a connection error." - result = registry.send( - commands.InfoDomain(name="test.gov"), cleaned=True - ) + result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True) self.assertEqual(result, expected) # A subsequent command should be successful, as the pool restarts @@ -240,9 +230,7 @@ class TestConnectionPool(TestCase): right as we send a command.""" with ExitStack() as stack: - stack.enter_context( - patch.object(EPPConnectionPool, "_create_socket", self.fake_socket) - ) + stack.enter_context(patch.object(EPPConnectionPool, "_create_socket", self.fake_socket)) stack.enter_context(patch.object(Socket, "connect", self.fake_client)) # Pool should be running @@ -252,7 +240,5 @@ class TestConnectionPool(TestCase): # Try to send a command out - should fail with self.assertRaises(RegistryError): expected = "InfoDomain failed to execute due to a connection error." - result = registry.send( - commands.InfoDomain(name="test.gov"), cleaned=True - ) + result = registry.send(commands.InfoDomain(name="test.gov"), cleaned=True) self.assertEqual(result, expected) diff --git a/src/epplibwrapper/utility/pool.py b/src/epplibwrapper/utility/pool.py index 36771252b..e28270f50 100644 --- a/src/epplibwrapper/utility/pool.py +++ b/src/epplibwrapper/utility/pool.py @@ -125,9 +125,7 @@ class EPPConnectionPool(ConnectionPool): # Open multiple connections for i in range(self.size): - self.greenlets.append( - gevent.spawn_later(self.spawn_frequency * i, self._addOne) - ) + self.greenlets.append(gevent.spawn_later(self.spawn_frequency * i, self._addOne)) # Open a "keepalive" thread if we want to ping open connections if self.keepalive: diff --git a/src/epplibwrapper/utility/pool_error.py b/src/epplibwrapper/utility/pool_error.py index 821962774..bdf955afe 100644 --- a/src/epplibwrapper/utility/pool_error.py +++ b/src/epplibwrapper/utility/pool_error.py @@ -28,14 +28,8 @@ class PoolError(Exception): # Used variables due to linter requirements kill_failed = "Could not kill all connections. Are multiple pools running?" - conn_failed = ( - "Failed to execute due to a registry error." - " See previous logs to determine the cause of the error." - ) - alive_failed = ( - "Failed to keep the connection alive. " - "It is likely that the registry returned a LoginError." - ) + conn_failed = "Failed to execute due to a registry error. See previous logs to determine the cause of the error." + alive_failed = "Failed to keep the connection alive. It is likely that the registry returned a LoginError." _error_mapping = { PoolErrorCodes.KILL_ALL_FAILED: kill_failed, PoolErrorCodes.NEW_CONNECTION_FAILED: conn_failed, diff --git a/src/pyproject.toml b/src/pyproject.toml index af7fc4ee7..ba2b76f83 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,5 +1,5 @@ [tool.black] -line-length=88 +line-length=120 [tool.mypy] ignore_missing_imports = true diff --git a/src/registrar/admin.py b/src/registrar/admin.py index 8914e5c87..9de5f563c 100644 --- a/src/registrar/admin.py +++ b/src/registrar/admin.py @@ -73,9 +73,7 @@ class ListHeaderAdmin(AuditedAdmin): filters = self.get_filters(request) # Pass the filtered values to the template context extra_context["filters"] = filters - extra_context["search_query"] = request.GET.get( - "q", "" - ) # Assuming the search query parameter is 'q' + extra_context["search_query"] = request.GET.get("q", "") # Assuming the search query parameter is 'q' return super().changelist_view(request, extra_context=extra_context) def get_filters(self, request): @@ -91,11 +89,7 @@ class ListHeaderAdmin(AuditedAdmin): for param in request.GET.keys(): # Exclude the default search parameter 'q' if param != "q" and param != "o": - parameter_name = ( - param.replace("__exact", "") - .replace("_type", "") - .replace("__id", " id") - ) + parameter_name = param.replace("__exact", "").replace("_type", "").replace("__id", " id") if parameter_name == "investigator id": # Retrieves the corresponding contact from Users @@ -613,8 +607,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): messages.error( request, - "This action is not permitted. The domain " - + "is already active.", + "This action is not permitted. The domain is already active.", ) else: @@ -627,9 +620,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): models.DomainApplication.APPROVED: obj.approve, models.DomainApplication.WITHDRAWN: obj.withdraw, models.DomainApplication.REJECTED: obj.reject, - models.DomainApplication.INELIGIBLE: ( - obj.reject_with_prejudice - ), + models.DomainApplication.INELIGIBLE: (obj.reject_with_prejudice), } selected_method = status_method_mapping.get(obj.status) if selected_method is None: @@ -649,8 +640,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): messages.error( request, - "This action is not permitted for applications " - + "with a restricted creator.", + "This action is not permitted for applications with a restricted creator.", ) def get_readonly_fields(self, request, obj=None): @@ -669,9 +659,7 @@ class DomainApplicationAdmin(ListHeaderAdmin): readonly_fields.extend([field.name for field in self.model._meta.fields]) # Add the multi-select fields to readonly_fields: # Complex fields like ManyToManyField require special handling - readonly_fields.extend( - ["current_websites", "other_contacts", "alternative_domains"] - ) + readonly_fields.extend(["current_websites", "other_contacts", "alternative_domains"]) if request.user.has_perm("registrar.full_access_permission"): return readonly_fields @@ -739,9 +727,7 @@ class DomainAdmin(ListHeaderAdmin): def organization_type(self, obj): return obj.domain_info.get_organization_type_display() - organization_type.admin_order_field = ( # type: ignore - "domain_info__organization_type" - ) + organization_type.admin_order_field = "domain_info__organization_type" # type: ignore # Filters list_filter = ["domain_info__organization_type", "state"] @@ -846,9 +832,7 @@ class DomainAdmin(ListHeaderAdmin): if not err.is_connection_error(): # If nothing is found, will default to returned err message = error_messages.get(err.code, err) - self.message_user( - request, f"Error deleting this Domain: {message}", messages.ERROR - ) + self.message_user(request, f"Error deleting this Domain: {message}", messages.ERROR) except TransitionNotAllowed: if obj.state == Domain.State.DELETED: self.message_user( @@ -886,8 +870,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - f"The registry statuses are {statuses}. " - "These statuses are from the provider of the .gov registry.", + f"The registry statuses are {statuses}. These statuses are from the provider of the .gov registry.", ) return HttpResponseRedirect(".") @@ -916,11 +899,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - ( - "%s is in client hold. This domain is no longer accessible on" - " the public internet." - ) - % obj.name, + ("%s is in client hold. This domain is no longer accessible on the public internet.") % obj.name, ) return HttpResponseRedirect(".") @@ -949,8 +928,7 @@ class DomainAdmin(ListHeaderAdmin): else: self.message_user( request, - ("%s is ready. This domain is accessible on the public internet.") - % obj.name, + ("%s is ready. This domain is accessible on the public internet.") % obj.name, ) return HttpResponseRedirect(".") @@ -973,9 +951,9 @@ class DomainAdmin(ListHeaderAdmin): # Fixes a bug wherein users which are only is_staff # can access 'change' when GET, # but cannot access this page when it is a request of type POST. - if request.user.has_perm( - "registrar.full_access_permission" - ) or request.user.has_perm("registrar.analyst_access_permission"): + if request.user.has_perm("registrar.full_access_permission") or request.user.has_perm( + "registrar.analyst_access_permission" + ): return True return super().has_change_permission(request, obj) diff --git a/src/registrar/config/settings.py b/src/registrar/config/settings.py index c6fc0f65b..d38de89e5 100644 --- a/src/registrar/config/settings.py +++ b/src/registrar/config/settings.py @@ -526,9 +526,7 @@ OIDC_PROVIDERS = { "acr_value": "http://idmanagement.gov/ns/assurance/ial/2", }, "client_registration": { - "client_id": ( - "urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar" - ), + "client_id": ("urn:gov:cisa:openidconnect.profiles:sp:sso:cisa:dotgov_registrar"), "redirect_uris": [f"{env_base_url}/openid/callback/login/"], "post_logout_redirect_uris": [f"{env_base_url}/openid/callback/logout/"], "token_endpoint_auth_method": ["private_key_jwt"], diff --git a/src/registrar/fixtures_applications.py b/src/registrar/fixtures_applications.py index 18be79814..aea153aef 100644 --- a/src/registrar/fixtures_applications.py +++ b/src/registrar/fixtures_applications.py @@ -97,9 +97,7 @@ class DomainApplicationFixture: def _set_non_foreign_key_fields(cls, da: DomainApplication, app: dict): """Helper method used by `load`.""" da.status = app["status"] if "status" in app else "started" - da.organization_type = ( - app["organization_type"] if "organization_type" in app else "federal" - ) + da.organization_type = app["organization_type"] if "organization_type" in app else "federal" da.federal_agency = ( app["federal_agency"] if "federal_agency" in app @@ -112,40 +110,25 @@ class DomainApplicationFixture: if "federal_type" in app else random.choice(["executive", "judicial", "legislative"]) # nosec ) - da.address_line1 = ( - app["address_line1"] if "address_line1" in app else fake.street_address() - ) + da.address_line1 = app["address_line1"] if "address_line1" in app else fake.street_address() da.address_line2 = app["address_line2"] if "address_line2" in app else None da.city = app["city"] if "city" in app else fake.city() - da.state_territory = ( - app["state_territory"] if "state_territory" in app else fake.state_abbr() - ) + da.state_territory = app["state_territory"] if "state_territory" in app else fake.state_abbr() da.zipcode = app["zipcode"] if "zipcode" in app else fake.postalcode() da.urbanization = app["urbanization"] if "urbanization" in app else None da.purpose = app["purpose"] if "purpose" in app else fake.paragraph() da.anything_else = app["anything_else"] if "anything_else" in app else None - da.is_policy_acknowledged = ( - app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True - ) + da.is_policy_acknowledged = app["is_policy_acknowledged"] if "is_policy_acknowledged" in app else True @classmethod def _set_foreign_key_fields(cls, da: DomainApplication, app: dict, user: User): """Helper method used by `load`.""" if not da.investigator: - da.investigator = ( - User.objects.get(username=user.username) - if "investigator" in app - else None - ) + da.investigator = User.objects.get(username=user.username) if "investigator" in app else None if not da.authorizing_official: - if ( - "authorizing_official" in app - and app["authorizing_official"] is not None - ): - da.authorizing_official, _ = Contact.objects.get_or_create( - **app["authorizing_official"] - ) + if "authorizing_official" in app and app["authorizing_official"] is not None: + da.authorizing_official, _ = Contact.objects.get_or_create(**app["authorizing_official"]) else: da.authorizing_official = Contact.objects.create(**cls.fake_contact()) @@ -157,13 +140,9 @@ class DomainApplicationFixture: if not da.requested_domain: if "requested_domain" in app and app["requested_domain"] is not None: - da.requested_domain, _ = DraftDomain.objects.get_or_create( - name=app["requested_domain"] - ) + da.requested_domain, _ = DraftDomain.objects.get_or_create(name=app["requested_domain"]) else: - da.requested_domain = DraftDomain.objects.create( - name=cls.fake_dot_gov() - ) + da.requested_domain = DraftDomain.objects.create(name=cls.fake_dot_gov()) @classmethod def _set_many_to_many_relations(cls, da: DomainApplication, app: dict): @@ -173,32 +152,25 @@ class DomainApplicationFixture: da.other_contacts.add(Contact.objects.get_or_create(**contact)[0]) elif not da.other_contacts.exists(): other_contacts = [ - Contact.objects.create(**cls.fake_contact()) - for _ in range(random.randint(0, 3)) # nosec + Contact.objects.create(**cls.fake_contact()) for _ in range(random.randint(0, 3)) # nosec ] da.other_contacts.add(*other_contacts) if "current_websites" in app: for website in app["current_websites"]: - da.current_websites.add( - Website.objects.get_or_create(website=website)[0] - ) + da.current_websites.add(Website.objects.get_or_create(website=website)[0]) elif not da.current_websites.exists(): current_websites = [ - Website.objects.create(website=fake.uri()) - for _ in range(random.randint(0, 3)) # nosec + Website.objects.create(website=fake.uri()) for _ in range(random.randint(0, 3)) # nosec ] da.current_websites.add(*current_websites) if "alternative_domains" in app: for domain in app["alternative_domains"]: - da.alternative_domains.add( - Website.objects.get_or_create(website=domain)[0] - ) + da.alternative_domains.add(Website.objects.get_or_create(website=domain)[0]) elif not da.alternative_domains.exists(): alternative_domains = [ - Website.objects.create(website=cls.fake_dot_gov()) - for _ in range(random.randint(0, 3)) # nosec + Website.objects.create(website=cls.fake_dot_gov()) for _ in range(random.randint(0, 3)) # nosec ] da.alternative_domains.add(*alternative_domains) @@ -242,9 +214,7 @@ class DomainFixture(DomainApplicationFixture): for user in users: # approve one of each users in review status domains - application = DomainApplication.objects.filter( - creator=user, status=DomainApplication.IN_REVIEW - ).last() + application = DomainApplication.objects.filter(creator=user, status=DomainApplication.IN_REVIEW).last() logger.debug(f"Approving {application} for {user}") application.approve() application.save() diff --git a/src/registrar/forms/application_wizard.py b/src/registrar/forms/application_wizard.py index 2fd78cdd8..a70c23e52 100644 --- a/src/registrar/forms/application_wizard.py +++ b/src/registrar/forms/application_wizard.py @@ -50,9 +50,7 @@ class RegistrarForm(forms.Form): """Returns a dict of form field values gotten from `obj`.""" if obj is None: return {} - return { - name: getattr(obj, name) for name in cls.declared_fields.keys() - } # type: ignore + return {name: getattr(obj, name) for name in cls.declared_fields.keys()} # type: ignore class RegistrarFormSet(forms.BaseFormSet): @@ -178,10 +176,7 @@ class TribalGovernmentForm(RegistrarForm): def clean(self): """Needs to be either state or federally recognized.""" - if not ( - self.cleaned_data["federally_recognized_tribe"] - or self.cleaned_data["state_recognized_tribe"] - ): + if not (self.cleaned_data["federally_recognized_tribe"] or self.cleaned_data["state_recognized_tribe"]): raise forms.ValidationError( # no sec because we are using it to include an internal URL # into a link. There should be no user-facing input in the @@ -203,11 +198,7 @@ class OrganizationFederalForm(RegistrarForm): federal_type = forms.ChoiceField( choices=DomainApplication.BranchChoices.choices, widget=forms.RadioSelect, - error_messages={ - "required": ( - "Select the part of the federal government your organization is in." - ) - }, + error_messages={"required": ("Select the part of the federal government your organization is in.")}, ) @@ -227,10 +218,7 @@ class OrganizationElectionForm(RegistrarForm): is_election_board = self.cleaned_data["is_election_board"] if is_election_board is None: raise forms.ValidationError( - ( - "Select “Yes” if you represent an election office. Select “No” if" - " you don’t." - ), + ("Select “Yes” if you represent an election office. Select “No” if you don’t."), code="required", ) return is_election_board @@ -260,18 +248,13 @@ class OrganizationContactForm(RegistrarForm): ) city = forms.CharField( label="City", - error_messages={ - "required": "Enter the city where your organization is located." - }, + error_messages={"required": "Enter the city where your organization is located."}, ) state_territory = forms.ChoiceField( label="State, territory, or military post", choices=[("", "--Select--")] + DomainApplication.StateTerritoryChoices.choices, error_messages={ - "required": ( - "Select the state, territory, or military post where your organization" - " is located." - ) + "required": ("Select the state, territory, or military post where your organization is located.") }, ) zipcode = forms.CharField( @@ -320,9 +303,7 @@ class AboutYourOrganizationForm(RegistrarForm): message="Response must be less than 1000 characters.", ) ], - error_messages={ - "required": ("Enter more information about your organization.") - }, + error_messages={"required": ("Enter more information about your organization.")}, ) @@ -346,11 +327,7 @@ class AuthorizingOfficialForm(RegistrarForm): first_name = forms.CharField( label="First name / given name", - error_messages={ - "required": ( - "Enter the first name / given name of your authorizing official." - ) - }, + error_messages={"required": ("Enter the first name / given name of your authorizing official.")}, ) middle_name = forms.CharField( required=False, @@ -358,11 +335,7 @@ class AuthorizingOfficialForm(RegistrarForm): ) last_name = forms.CharField( label="Last name / family name", - error_messages={ - "required": ( - "Enter the last name / family name of your authorizing official." - ) - }, + error_messages={"required": ("Enter the last name / family name of your authorizing official.")}, ) title = forms.CharField( label="Title or role in your organization", @@ -375,17 +348,11 @@ class AuthorizingOfficialForm(RegistrarForm): ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter an email address in the required format, like name@example.com." - ) - }, + error_messages={"invalid": ("Enter an email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", - error_messages={ - "required": "Enter the phone number for your authorizing official." - }, + error_messages={"required": "Enter the phone number for your authorizing official."}, ) @@ -394,10 +361,7 @@ class CurrentSitesForm(RegistrarForm): required=False, label="Public website", error_messages={ - "invalid": ( - "Enter your organization's current website in the required format, like" - " www.city.com." - ) + "invalid": ("Enter your organization's current website in the required format, like www.city.com.") }, ) @@ -410,9 +374,7 @@ class BaseCurrentSitesFormSet(RegistrarFormSet): return website.strip() == "" def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def from_database(cls, obj): @@ -434,13 +396,9 @@ class AlternativeDomainForm(RegistrarForm): requested = self.cleaned_data.get("alternative_domain", None) validated = DraftDomain.validate(requested, blank_ok=True) except errors.ExtraDotsError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots") except errors.DomainUnavailableError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["unavailable"], code="unavailable" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["unavailable"], code="unavailable") except ValueError: raise forms.ValidationError(DOMAIN_API_MESSAGES["invalid"], code="invalid") return validated @@ -471,9 +429,7 @@ class BaseAlternativeDomainFormSet(RegistrarFormSet): return {} def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def on_fetch(cls, query): @@ -523,17 +479,11 @@ class DotGovDomainForm(RegistrarForm): requested = self.cleaned_data.get("requested_domain", None) validated = DraftDomain.validate(requested) except errors.BlankValueError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["required"], code="required" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["required"], code="required") except errors.ExtraDotsError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["extra_dots"], code="extra_dots") except errors.DomainUnavailableError: - raise forms.ValidationError( - DOMAIN_API_MESSAGES["unavailable"], code="unavailable" - ) + raise forms.ValidationError(DOMAIN_API_MESSAGES["unavailable"], code="unavailable") except ValueError: raise forms.ValidationError(DOMAIN_API_MESSAGES["invalid"], code="invalid") return validated @@ -551,9 +501,7 @@ class PurposeForm(RegistrarForm): message="Response must be less than 1000 characters.", ) ], - error_messages={ - "required": "Describe how you'll use the .gov domain you’re requesting." - }, + error_messages={"required": "Describe how you'll use the .gov domain you’re requesting."}, ) @@ -590,20 +538,12 @@ class YourContactForm(RegistrarForm): title = forms.CharField( label="Title or role in your organization", error_messages={ - "required": ( - "Enter your title or role in your organization (e.g., Chief Information" - " Officer)." - ) + "required": ("Enter your title or role in your organization (e.g., Chief Information Officer).") }, ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter your email address in the required format, like" - " name@example.com." - ) - }, + error_messages={"invalid": ("Enter your email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", @@ -614,9 +554,7 @@ class YourContactForm(RegistrarForm): class OtherContactsForm(RegistrarForm): first_name = forms.CharField( label="First name / given name", - error_messages={ - "required": "Enter the first name / given name of this contact." - }, + error_messages={"required": "Enter the first name / given name of this contact."}, ) middle_name = forms.CharField( required=False, @@ -624,26 +562,19 @@ class OtherContactsForm(RegistrarForm): ) last_name = forms.CharField( label="Last name / family name", - error_messages={ - "required": "Enter the last name / family name of this contact." - }, + error_messages={"required": "Enter the last name / family name of this contact."}, ) title = forms.CharField( label="Title or role in your organization", error_messages={ "required": ( - "Enter the title or role in your organization of this contact (e.g.," - " Chief Information Officer)." + "Enter the title or role in your organization of this contact (e.g., Chief Information Officer)." ) }, ) email = forms.EmailField( label="Email", - error_messages={ - "invalid": ( - "Enter an email address in the required format, like name@example.com." - ) - }, + error_messages={"invalid": ("Enter an email address in the required format, like name@example.com.")}, ) phone = PhoneNumberField( label="Phone", @@ -659,9 +590,7 @@ class BaseOtherContactsFormSet(RegistrarFormSet): return all(empty) def to_database(self, obj: DomainApplication): - self._to_database( - obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create - ) + self._to_database(obj, self.JOIN, self.should_delete, self.pre_update, self.pre_create) @classmethod def from_database(cls, obj): @@ -706,9 +635,6 @@ class RequirementsForm(RegistrarForm): is_policy_acknowledged = forms.BooleanField( label="I read and agree to the requirements for operating .gov domains.", error_messages={ - "required": ( - "Check the box if you read and agree to the requirements for" - " operating .gov domains." - ) + "required": ("Check the box if you read and agree to the requirements for operating .gov domains.") }, ) diff --git a/src/registrar/forms/domain.py b/src/registrar/forms/domain.py index 2c95c3e74..6cb5b338f 100644 --- a/src/registrar/forms/domain.py +++ b/src/registrar/forms/domain.py @@ -95,23 +95,17 @@ class DomainNameserverForm(forms.Form): elif e.code == nsErrorCodes.MISSING_IP: self.add_error( "ip", - NameserverError( - code=nsErrorCodes.MISSING_IP, nameserver=domain, ip=ip_list - ), + NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=domain, ip=ip_list), ) elif e.code == nsErrorCodes.MISSING_HOST: self.add_error( "server", - NameserverError( - code=nsErrorCodes.MISSING_HOST, nameserver=domain, ip=ip_list - ), + NameserverError(code=nsErrorCodes.MISSING_HOST, nameserver=domain, ip=ip_list), ) elif e.code == nsErrorCodes.INVALID_HOST: self.add_error( "server", - NameserverError( - code=nsErrorCodes.INVALID_HOST, nameserver=server, ip=ip_list - ), + NameserverError(code=nsErrorCodes.INVALID_HOST, nameserver=server, ip=ip_list), ) else: self.add_error("ip", str(e)) @@ -187,17 +181,12 @@ class DomainOrgNameAddressForm(forms.ModelForm): "urbanization", ] error_messages = { - "federal_agency": { - "required": "Select the federal agency for your organization." - }, + "federal_agency": {"required": "Select the federal agency for your organization."}, "organization_name": {"required": "Enter the name of your organization."}, - "address_line1": { - "required": "Enter the street address of your organization." - }, + "address_line1": {"required": "Enter the street address of your organization."}, "city": {"required": "Enter the city where your organization is located."}, "state_territory": { - "required": "Select the state, territory, or military post where your" - "organization is located." + "required": "Select the state, territory, or military post where your organization is located." }, } widgets = { @@ -205,9 +194,7 @@ class DomainOrgNameAddressForm(forms.ModelForm): # state/territory because for these fields we are creating an individual # instance of the Select. For the other fields we use the for loop to set # the class's required attribute to true. - "federal_agency": forms.Select( - attrs={"required": True}, choices=DomainInformation.AGENCY_CHOICES - ), + "federal_agency": forms.Select(attrs={"required": True}, choices=DomainInformation.AGENCY_CHOICES), "organization_name": forms.TextInput, "address_line1": forms.TextInput, "address_line2": forms.TextInput, diff --git a/src/registrar/management/commands/cat_files_into_getgov.py b/src/registrar/management/commands/cat_files_into_getgov.py index 993643ee4..4ccb1301b 100644 --- a/src/registrar/management/commands/cat_files_into_getgov.py +++ b/src/registrar/management/commands/cat_files_into_getgov.py @@ -23,9 +23,7 @@ class Command(BaseCommand): default="txt", help="What file extensions to look for, like txt or gz", ) - parser.add_argument( - "--directory", default="migrationdata", help="Desired directory" - ) + parser.add_argument("--directory", default="migrationdata", help="Desired directory") def handle(self, **options): file_extension: str = options.get("file_extension").lstrip(".") diff --git a/src/registrar/management/commands/lint.py b/src/registrar/management/commands/lint.py index 3d0ec0225..f867f5b6f 100644 --- a/src/registrar/management/commands/lint.py +++ b/src/registrar/management/commands/lint.py @@ -52,20 +52,15 @@ class Command(BaseCommand): if result.returncode: self.stderr.write( self.style.NOTICE( - "[manage.py lint] Re-try with: [docker-compose exec app] " - f"{' '.join(linter['args'])}" + "[manage.py lint] Re-try with: [docker-compose exec app] " f"{' '.join(linter['args'])}" ) ) errors.append(CalledProcessError(result.returncode, linter["args"])) else: - self.stdout.write( - f"[manage.py lint] {linter['purpose']} completed with success!" - ) + self.stdout.write(f"[manage.py lint] {linter['purpose']} completed with success!") if errors: self.stdout.write(f"[manage.py lint] {len(errors)} linter(s) failed.") raise LinterError(errors) except (CalledProcessError, LinterError) as e: raise CommandError(e) - self.stdout.write( - self.style.SUCCESS("[manage.py lint] All linters ran successfully.") - ) + self.stdout.write(self.style.SUCCESS("[manage.py lint] All linters ran successfully.")) diff --git a/src/registrar/management/commands/load_domain_invitations.py b/src/registrar/management/commands/load_domain_invitations.py index 83b7163d1..28eb09def 100644 --- a/src/registrar/management/commands/load_domain_invitations.py +++ b/src/registrar/management/commands/load_domain_invitations.py @@ -21,9 +21,7 @@ class Command(BaseCommand): "domain_contacts_filename", help="Data file with domain contact information", ) - parser.add_argument( - "contacts_filename", help="Data file with contact information" - ) + parser.add_argument("contacts_filename", help="Data file with contact information") parser.add_argument("--sep", default="|", help="Delimiter character") diff --git a/src/registrar/management/commands/load_domains_data.py b/src/registrar/management/commands/load_domains_data.py index b205e562e..fd6a560bd 100644 --- a/src/registrar/management/commands/load_domains_data.py +++ b/src/registrar/management/commands/load_domains_data.py @@ -43,9 +43,7 @@ class Command(BaseCommand): help = "Load domain data from a delimited text file on stdin." def add_arguments(self, parser): - parser.add_argument( - "--sep", default="|", help="Separator character for data file" - ) + parser.add_argument("--sep", default="|", help="Separator character for data file") def handle(self, *args, **options): separator_character = options.get("sep") diff --git a/src/registrar/management/commands/load_transition_domain.py b/src/registrar/management/commands/load_transition_domain.py index 687750ed8..c61741506 100644 --- a/src/registrar/management/commands/load_transition_domain.py +++ b/src/registrar/management/commands/load_transition_domain.py @@ -36,24 +36,18 @@ class Command(BaseCommand): Use this to trigger a prompt for deleting all table entries. Useful for testing purposes, but USE WITH CAUTION """ - parser.add_argument( - "domain_contacts_filename", help="Data file with domain contact information" - ) + parser.add_argument("domain_contacts_filename", help="Data file with domain contact information") parser.add_argument( "contacts_filename", help="Data file with contact information", ) - parser.add_argument( - "domain_statuses_filename", help="Data file with domain status information" - ) + parser.add_argument("domain_statuses_filename", help="Data file with domain status information") parser.add_argument("--sep", default="|", help="Delimiter character") parser.add_argument("--debug", action=argparse.BooleanOptionalAction) - parser.add_argument( - "--limitParse", default=0, help="Sets max number of entries to load" - ) + parser.add_argument("--limitParse", default=0, help="Sets max number of entries to load") parser.add_argument( "--resetTable", @@ -61,9 +55,7 @@ class Command(BaseCommand): action=argparse.BooleanOptionalAction, ) - def print_debug_mode_statements( - self, debug_on: bool, debug_max_entries_to_parse: int - ): + def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int): """Prints additional terminal statements to indicate if --debug or --limitParse are in use""" if debug_on: @@ -85,9 +77,7 @@ class Command(BaseCommand): """ ) - def get_domain_user_dict( - self, domain_statuses_filename: str, sep: str - ) -> defaultdict[str, str]: + def get_domain_user_dict(self, domain_statuses_filename: str, sep: str) -> defaultdict[str, str]: """Creates a mapping of domain name -> status""" domain_status_dictionary = defaultdict(str) logger.info("Reading domain statuses data file %s", domain_statuses_filename) @@ -99,9 +89,7 @@ class Command(BaseCommand): logger.info("Loaded statuses for %d domains", len(domain_status_dictionary)) return domain_status_dictionary - def get_user_emails_dict( - self, contacts_filename: str, sep - ) -> defaultdict[str, str]: + def get_user_emails_dict(self, contacts_filename: str, sep) -> defaultdict[str, str]: """Creates mapping of userId -> emails""" user_emails_dictionary = defaultdict(str) logger.info("Reading contacts data file %s", contacts_filename) @@ -149,19 +137,13 @@ class Command(BaseCommand): total_duplicate_domains = len(duplicate_domains) total_users_without_email = len(users_without_email) if total_users_without_email > 0: - users_without_email_as_string = "{}".format( - ", ".join(map(str, duplicate_domain_user_combos)) - ) + users_without_email_as_string = "{}".format(", ".join(map(str, duplicate_domain_user_combos))) logger.warning( f"{TerminalColors.YELLOW} No e-mails found for users: {users_without_email_as_string}" # noqa ) if total_duplicate_pairs > 0 or total_duplicate_domains > 0: - duplicate_pairs_as_string = "{}".format( - ", ".join(map(str, duplicate_domain_user_combos)) - ) - duplicate_domains_as_string = "{}".format( - ", ".join(map(str, duplicate_domains)) - ) + duplicate_pairs_as_string = "{}".format(", ".join(map(str, duplicate_domain_user_combos))) + duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) logger.warning( f"""{TerminalColors.YELLOW} @@ -179,9 +161,7 @@ class Command(BaseCommand): {TerminalColors.ENDC}""" ) - def print_summary_status_findings( - self, domains_without_status: list[str], outlier_statuses: list[str] - ): + def print_summary_status_findings(self, domains_without_status: list[str], outlier_statuses: list[str]): """Called at the end of the script execution to print out a summary of status anomolies in the imported Verisign data. Currently, we check for: - domains without a status @@ -191,9 +171,7 @@ class Command(BaseCommand): total_domains_without_status = len(domains_without_status) total_outlier_statuses = len(outlier_statuses) if total_domains_without_status > 0: - domains_without_status_as_string = "{}".format( - ", ".join(map(str, domains_without_status)) - ) + domains_without_status_as_string = "{}".format(", ".join(map(str, domains_without_status))) logger.warning( f"""{TerminalColors.YELLOW} @@ -207,9 +185,7 @@ class Command(BaseCommand): ) if total_outlier_statuses > 0: - domains_without_status_as_string = "{}".format( - ", ".join(map(str, outlier_statuses)) - ) # noqa + domains_without_status_as_string = "{}".format(", ".join(map(str, outlier_statuses))) # noqa logger.warning( f"""{TerminalColors.YELLOW} @@ -265,18 +241,14 @@ class Command(BaseCommand): debug_on = options.get("debug") # Get --LimitParse argument - debug_max_entries_to_parse = int( - options.get("limitParse") - ) # set to 0 to parse all entries + debug_max_entries_to_parse = int(options.get("limitParse")) # set to 0 to parse all entries # print message to terminal about which args are in use self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) # STEP 1: # Create mapping of domain name -> status - domain_status_dictionary = self.get_domain_user_dict( - domain_statuses_filename, sep - ) + domain_status_dictionary = self.get_domain_user_dict(domain_statuses_filename, sep) # STEP 2: # Create mapping of userId -> email @@ -367,12 +339,7 @@ class Command(BaseCommand): None, ) existing_domain_user_pair = next( - ( - x - for x in to_create - if x.username == new_entry_email - and x.domain_name == new_entry_domain_name - ), + (x for x in to_create if x.username == new_entry_email and x.domain_name == new_entry_domain_name), None, ) if existing_domain is not None: @@ -443,10 +410,7 @@ class Command(BaseCommand): ) # Check Parse limit and exit loop if needed - if ( - total_rows_parsed >= debug_max_entries_to_parse - and debug_max_entries_to_parse != 0 - ): + if total_rows_parsed >= debug_max_entries_to_parse and debug_max_entries_to_parse != 0: logger.info( f"{TerminalColors.YELLOW}" f"----PARSE LIMIT REACHED. HALTING PARSER.----" @@ -467,7 +431,5 @@ class Command(BaseCommand): # Print a summary of findings (duplicate entries, # missing data..etc.) - self.print_summary_duplications( - duplicate_domain_user_combos, duplicate_domains, users_without_email - ) + self.print_summary_duplications(duplicate_domain_user_combos, duplicate_domains, users_without_email) self.print_summary_status_findings(domains_without_status, outlier_statuses) diff --git a/src/registrar/management/commands/master_domain_migrations.py b/src/registrar/management/commands/master_domain_migrations.py index 12149ed7b..2a472c5c9 100644 --- a/src/registrar/management/commands/master_domain_migrations.py +++ b/src/registrar/management/commands/master_domain_migrations.py @@ -94,10 +94,7 @@ class Command(BaseCommand): parser.add_argument( "--migrationDirectory", default="migrationdata", - help=( - "The location of the files used for" - "load_transition_domain migration script" - ), + help=("The location of the files used for load_transition_domain migration script"), ) parser.add_argument( "--migrationFilenames", @@ -116,17 +113,13 @@ class Command(BaseCommand): information""", ) - parser.add_argument( - "--sep", default="|", help="Delimiter character for the migration files" - ) + parser.add_argument("--sep", default="|", help="Delimiter character for the migration files") parser.add_argument("--debug", action=argparse.BooleanOptionalAction) parser.add_argument("--prompt", action=argparse.BooleanOptionalAction) - parser.add_argument( - "--limitParse", default=0, help="Sets max number of entries to load" - ) + parser.add_argument("--limitParse", default=0, help="Sets max number of entries to load") parser.add_argument( "--resetTable", @@ -173,9 +166,7 @@ class Command(BaseCommand): # Check Domain table matching_domains = Domain.objects.filter(name=transition_domain_name) # Check Domain Information table - matching_domain_informations = DomainInformation.objects.filter( - domain__name=transition_domain_name - ) + matching_domain_informations = DomainInformation.objects.filter(domain__name=transition_domain_name) # Check Domain Invitation table matching_domain_invitations = DomainInvitation.objects.filter( email=transition_domain_email.lower(), @@ -215,15 +206,9 @@ class Command(BaseCommand): total_missing_domain_invitations = len(missing_domain_invites) missing_domains_as_string = "{}".format(", ".join(map(str, missing_domains))) - duplicate_domains_as_string = "{}".format( - ", ".join(map(str, duplicate_domains)) - ) - missing_domain_informations_as_string = "{}".format( - ", ".join(map(str, missing_domain_informations)) - ) - missing_domain_invites_as_string = "{}".format( - ", ".join(map(str, missing_domain_invites)) - ) + duplicate_domains_as_string = "{}".format(", ".join(map(str, duplicate_domains))) + missing_domain_informations_as_string = "{}".format(", ".join(map(str, missing_domain_informations))) + missing_domain_invites_as_string = "{}".format(", ".join(map(str, missing_domain_invites))) logger.info( f"""{TerminalColors.OKGREEN} diff --git a/src/registrar/management/commands/send_domain_invitations.py b/src/registrar/management/commands/send_domain_invitations.py index f6abb3cf0..2d7839a3e 100644 --- a/src/registrar/management/commands/send_domain_invitations.py +++ b/src/registrar/management/commands/send_domain_invitations.py @@ -83,10 +83,7 @@ class Command(BaseCommand): # domains_with_errors try: # if prior username does not match current username - if ( - not email_context["email"] - or email_context["email"] != transition_domain.username - ): + if not email_context["email"] or email_context["email"] != transition_domain.username: # if not first in list of transition_domains if email_context["email"]: # append the email context to the emails_to_send array @@ -96,12 +93,8 @@ class Command(BaseCommand): email_context["domains"].append(transition_domain.domain_name) except Exception as err: # error condition if domain not in database - self.domains_with_errors.append( - copy.deepcopy(transition_domain.domain_name) - ) - logger.error( - f"error retrieving domain {transition_domain.domain_name}: {err}" - ) + self.domains_with_errors.append(copy.deepcopy(transition_domain.domain_name)) + logger.error(f"error retrieving domain {transition_domain.domain_name}: {err}") # if there are at least one more transition domains than errors, # then append one more item if len(self.transition_domains) > len(self.domains_with_errors): diff --git a/src/registrar/management/commands/transfer_transition_domains_to_domains.py b/src/registrar/management/commands/transfer_transition_domains_to_domains.py index 3697c74e1..fa8fba75a 100644 --- a/src/registrar/management/commands/transfer_transition_domains_to_domains.py +++ b/src/registrar/management/commands/transfer_transition_domains_to_domains.py @@ -33,9 +33,7 @@ class Command(BaseCommand): help="Sets max number of entries to load, set to 0 to load all entries", ) - def print_debug_mode_statements( - self, debug_on: bool, debug_max_entries_to_parse: int - ): + def print_debug_mode_statements(self, debug_on: bool, debug_max_entries_to_parse: int): """Prints additional terminal statements to indicate if --debug or --limitParse are in use""" TerminalHelper.print_conditional( @@ -57,9 +55,7 @@ class Command(BaseCommand): """, ) - def update_domain_status( - self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool - ) -> bool: + def update_domain_status(self, transition_domain: TransitionDomain, target_domain: Domain, debug_on: bool) -> bool: """Given a transition domain that matches an existing domain, updates the existing domain object with that status of the transition domain. @@ -153,9 +149,7 @@ class Command(BaseCommand): """, ) - def try_add_domain_invitation( - self, domain_email: str, associated_domain: Domain - ) -> DomainInvitation | None: + def try_add_domain_invitation(self, domain_email: str, associated_domain: Domain) -> DomainInvitation | None: """If no domain invitation exists for the given domain and e-mail, create and return a new domain invitation object. If one already exists, or if the email is invalid, return NONE""" @@ -187,9 +181,7 @@ class Command(BaseCommand): ).exists() if not domain_email_already_in_domain_invites: # Create new domain invitation - new_domain_invitation = DomainInvitation( - email=domain_email.lower(), domain=associated_domain - ) + new_domain_invitation = DomainInvitation(email=domain_email.lower(), domain=associated_domain) return new_domain_invitation return None @@ -203,9 +195,7 @@ class Command(BaseCommand): # grab command line arguments and store locally... debug_on = options.get("debug") - debug_max_entries_to_parse = int( - options.get("limitParse") - ) # set to 0 to parse all entries + debug_max_entries_to_parse = int(options.get("limitParse")) # set to 0 to parse all entries self.print_debug_mode_statements(debug_on, debug_max_entries_to_parse) @@ -258,18 +248,14 @@ class Command(BaseCommand): # for existing entry, update the status to # the transition domain status - update_made = self.update_domain_status( - transition_domain, domain_to_update, debug_on - ) + update_made = self.update_domain_status(transition_domain, domain_to_update, debug_on) if update_made: # keep track of updated domains for data analysis purposes updated_domain_entries.append(transition_domain.domain_name) # check if we need to add a domain invitation # (eg. for a new user) - new_domain_invitation = self.try_add_domain_invitation( - transition_domain_email, domain_to_update - ) + new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, domain_to_update) except Domain.MultipleObjectsReturned: # This exception was thrown once before during testing. @@ -338,18 +324,14 @@ class Command(BaseCommand): ) else: # no matching entry, make one - new_domain = Domain( - name=transition_domain_name, state=transition_domain_status - ) + new_domain = Domain(name=transition_domain_name, state=transition_domain_status) domains_to_create.append(new_domain) # DEBUG: TerminalHelper.print_conditional( debug_on, f"{TerminalColors.OKCYAN} Adding domain: {new_domain} {TerminalColors.ENDC}", # noqa ) - new_domain_invitation = self.try_add_domain_invitation( - transition_domain_email, new_domain - ) + new_domain_invitation = self.try_add_domain_invitation(transition_domain_email, new_domain) if new_domain_invitation is None: logger.info( @@ -365,10 +347,7 @@ class Command(BaseCommand): domain_invitations_to_create.append(new_domain_invitation) # Check parse limit and exit loop if parse limit has been reached - if ( - debug_max_entries_to_parse > 0 - and total_rows_parsed >= debug_max_entries_to_parse - ): + if debug_max_entries_to_parse > 0 and total_rows_parsed >= debug_max_entries_to_parse: logger.info( f"""{TerminalColors.YELLOW} ----PARSE LIMIT REACHED. HALTING PARSER.---- diff --git a/src/registrar/management/commands/utility/terminal_helper.py b/src/registrar/management/commands/utility/terminal_helper.py index 7327ef3bd..fef3a2e4d 100644 --- a/src/registrar/management/commands/utility/terminal_helper.py +++ b/src/registrar/management/commands/utility/terminal_helper.py @@ -64,9 +64,7 @@ class TerminalHelper: logger.info(print_statement) @staticmethod - def prompt_for_execution( - system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str - ) -> bool: + def prompt_for_execution(system_exit_on_terminate: bool, info_to_inspect: str, prompt_title: str) -> bool: """Create to reduce code complexity. Prompts the user to inspect the given string and asks if they wish to proceed. diff --git a/src/registrar/migrations/0001_initial.py b/src/registrar/migrations/0001_initial.py index 78f0c5b66..8c50c750d 100644 --- a/src/registrar/migrations/0001_initial.py +++ b/src/registrar/migrations/0001_initial.py @@ -32,9 +32,7 @@ class Migration(migrations.Migration): ("password", models.CharField(max_length=128, verbose_name="password")), ( "last_login", - models.DateTimeField( - blank=True, null=True, verbose_name="last login" - ), + models.DateTimeField(blank=True, null=True, verbose_name="last login"), ), ( "is_superuser", @@ -47,35 +45,25 @@ class Migration(migrations.Migration): ( "username", models.CharField( - error_messages={ - "unique": "A user with that username already exists." - }, + error_messages={"unique": "A user with that username already exists."}, help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.", max_length=150, unique=True, - validators=[ - django.contrib.auth.validators.UnicodeUsernameValidator() - ], + validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name="username", ), ), ( "first_name", - models.CharField( - blank=True, max_length=150, verbose_name="first name" - ), + models.CharField(blank=True, max_length=150, verbose_name="first name"), ), ( "last_name", - models.CharField( - blank=True, max_length=150, verbose_name="last name" - ), + models.CharField(blank=True, max_length=150, verbose_name="last name"), ), ( "email", - models.EmailField( - blank=True, max_length=254, verbose_name="email address" - ), + models.EmailField(blank=True, max_length=254, verbose_name="email address"), ), ( "is_staff", @@ -95,9 +83,7 @@ class Migration(migrations.Migration): ), ( "date_joined", - models.DateTimeField( - default=django.utils.timezone.now, verbose_name="date joined" - ), + models.DateTimeField(default=django.utils.timezone.now, verbose_name="date joined"), ), ( "groups", @@ -145,9 +131,7 @@ class Migration(migrations.Migration): ), ( "first_name", - models.TextField( - blank=True, db_index=True, help_text="First name", null=True - ), + models.TextField(blank=True, db_index=True, help_text="First name", null=True), ), ( "middle_name", @@ -155,22 +139,16 @@ class Migration(migrations.Migration): ), ( "last_name", - models.TextField( - blank=True, db_index=True, help_text="Last name", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Last name", null=True), ), ("title", models.TextField(blank=True, help_text="Title", null=True)), ( "email", - models.TextField( - blank=True, db_index=True, help_text="Email", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Email", null=True), ), ( "phone", - models.TextField( - blank=True, db_index=True, help_text="Phone", null=True - ), + models.TextField(blank=True, db_index=True, help_text="Phone", null=True), ), ], ), @@ -280,21 +258,15 @@ class Migration(migrations.Migration): ), ( "unit_type", - models.CharField( - blank=True, help_text="Unit type", max_length=15, null=True - ), + models.CharField(blank=True, help_text="Unit type", max_length=15, null=True), ), ( "unit_number", - models.CharField( - blank=True, help_text="Unit number", max_length=255, null=True - ), + models.CharField(blank=True, help_text="Unit number", max_length=255, null=True), ), ( "state_territory", - models.CharField( - blank=True, help_text="State/Territory", max_length=2, null=True - ), + models.CharField(blank=True, help_text="State/Territory", max_length=2, null=True), ), ( "zip_code", @@ -308,9 +280,7 @@ class Migration(migrations.Migration): ), ( "purpose", - models.TextField( - blank=True, help_text="Purpose of the domain", null=True - ), + models.TextField(blank=True, help_text="Purpose of the domain", null=True), ), ( "security_email", @@ -323,9 +293,7 @@ class Migration(migrations.Migration): ), ( "anything_else", - models.TextField( - blank=True, help_text="Anything else we should know?", null=True - ), + models.TextField(blank=True, help_text="Anything else we should know?", null=True), ), ( "acknowledged_policy", @@ -337,9 +305,7 @@ class Migration(migrations.Migration): ), ( "alternative_domains", - models.ManyToManyField( - blank=True, related_name="alternatives+", to="registrar.website" - ), + models.ManyToManyField(blank=True, related_name="alternatives+", to="registrar.website"), ), ( "authorizing_official", @@ -361,9 +327,7 @@ class Migration(migrations.Migration): ), ( "current_websites", - models.ManyToManyField( - blank=True, related_name="current+", to="registrar.website" - ), + models.ManyToManyField(blank=True, related_name="current+", to="registrar.website"), ), ( "investigator", diff --git a/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py b/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py index c12ca9d34..a6844bfaf 100644 --- a/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py +++ b/src/registrar/migrations/0003_rename_is_election_office_domainapplication_is_election_board_and_more.py @@ -48,9 +48,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="address_line2", - field=models.CharField( - blank=True, help_text="Address line 2", max_length=15, null=True - ), + field=models.CharField(blank=True, help_text="Address line 2", max_length=15, null=True), ), migrations.AddField( model_name="domainapplication", diff --git a/src/registrar/migrations/0005_domainapplication_city_and_more.py b/src/registrar/migrations/0005_domainapplication_city_and_more.py index 3d1fc1de1..bd4683b8c 100644 --- a/src/registrar/migrations/0005_domainapplication_city_and_more.py +++ b/src/registrar/migrations/0005_domainapplication_city_and_more.py @@ -22,8 +22,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="federal_agency", - field=models.TextField( - blank=True, help_text="Top level federal agency", null=True - ), + field=models.TextField(blank=True, help_text="Top level federal agency", null=True), ), ] diff --git a/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py b/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py index 909b301ab..49df16fbb 100644 --- a/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py +++ b/src/registrar/migrations/0007_domainapplication_more_organization_information_and_more.py @@ -21,9 +21,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="type_of_work", - field=models.TextField( - blank=True, help_text="Type of work of the organization", null=True - ), + field=models.TextField(blank=True, help_text="Type of work of the organization", null=True), ), migrations.AlterField( model_name="domainapplication", @@ -33,9 +31,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="address_line2", - field=models.CharField( - blank=True, help_text="Street address line 2", max_length=15, null=True - ), + field=models.CharField(blank=True, help_text="Street address line 2", max_length=15, null=True), ), migrations.AlterField( model_name="domainapplication", @@ -95,9 +91,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="purpose", - field=models.TextField( - blank=True, help_text="Purpose of your domain", null=True - ), + field=models.TextField(blank=True, help_text="Purpose of your domain", null=True), ), migrations.AlterField( model_name="domainapplication", @@ -112,9 +106,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="urbanization", - field=models.TextField( - blank=True, help_text="Urbanization (Puerto Rico only)", null=True - ), + field=models.TextField(blank=True, help_text="Urbanization (Puerto Rico only)", null=True), ), migrations.AlterField( model_name="domainapplication", diff --git a/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py b/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py index bb52e4320..2e2dbed6c 100644 --- a/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py +++ b/src/registrar/migrations/0008_remove_userprofile_created_at_and_more.py @@ -21,9 +21,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="contact", name="created_at", - field=models.DateTimeField( - auto_now_add=True, default=django.utils.timezone.now - ), + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False, ), migrations.AddField( @@ -34,9 +32,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="website", name="created_at", - field=models.DateTimeField( - auto_now_add=True, default=django.utils.timezone.now - ), + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False, ), migrations.AddField( diff --git a/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py b/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py index 3010b6cd4..7fce25b9e 100644 --- a/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py +++ b/src/registrar/migrations/0009_domainapplication_federally_recognized_tribe_and_more.py @@ -12,16 +12,12 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="federally_recognized_tribe", - field=models.BooleanField( - help_text="Is the tribe federally recognized", null=True - ), + field=models.BooleanField(help_text="Is the tribe federally recognized", null=True), ), migrations.AddField( model_name="domainapplication", name="state_recognized_tribe", - field=models.BooleanField( - help_text="Is the tribe recognized by a state", null=True - ), + field=models.BooleanField(help_text="Is the tribe recognized by a state", null=True), ), migrations.AddField( model_name="domainapplication", diff --git a/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py b/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py index 2fbc5ab19..91ac00ff9 100644 --- a/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py +++ b/src/registrar/migrations/0015_remove_domain_owners_userdomainrole_user_domains_and_more.py @@ -59,8 +59,6 @@ class Migration(migrations.Migration): ), migrations.AddConstraint( model_name="userdomainrole", - constraint=models.UniqueConstraint( - fields=("user", "domain"), name="unique_user_domain_role" - ), + constraint=models.UniqueConstraint(fields=("user", "domain"), name="unique_user_domain_role"), ), ] diff --git a/src/registrar/migrations/0018_domaininformation.py b/src/registrar/migrations/0018_domaininformation.py index 408fa048b..582a6e244 100644 --- a/src/registrar/migrations/0018_domaininformation.py +++ b/src/registrar/migrations/0018_domaininformation.py @@ -64,15 +64,11 @@ class Migration(migrations.Migration): ), ( "federally_recognized_tribe", - models.BooleanField( - help_text="Is the tribe federally recognized", null=True - ), + models.BooleanField(help_text="Is the tribe federally recognized", null=True), ), ( "state_recognized_tribe", - models.BooleanField( - help_text="Is the tribe recognized by a state", null=True - ), + models.BooleanField(help_text="Is the tribe recognized by a state", null=True), ), ( "tribe_name", @@ -172,9 +168,7 @@ class Migration(migrations.Migration): ), ( "purpose", - models.TextField( - blank=True, help_text="Purpose of your domain", null=True - ), + models.TextField(blank=True, help_text="Purpose of your domain", null=True), ), ( "no_other_contacts_rationale", @@ -186,9 +180,7 @@ class Migration(migrations.Migration): ), ( "anything_else", - models.TextField( - blank=True, help_text="Anything else we should know?", null=True - ), + models.TextField(blank=True, help_text="Anything else we should know?", null=True), ), ( "is_policy_acknowledged", diff --git a/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py b/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py index 35e07fe71..89478f5d6 100644 --- a/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py +++ b/src/registrar/migrations/0021_publiccontact_domain_publiccontact_registry_id_and_more.py @@ -76,9 +76,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="org", - field=models.TextField( - help_text="Contact's organization (null ok)", null=True - ), + field=models.TextField(help_text="Contact's organization (null ok)", null=True), ), migrations.AlterField( model_name="publiccontact", @@ -88,9 +86,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="pw", - field=models.TextField( - help_text="Contact's authorization code. 16 characters minimum." - ), + field=models.TextField(help_text="Contact's authorization code. 16 characters minimum."), ), migrations.AlterField( model_name="publiccontact", @@ -115,8 +111,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="publiccontact", name="voice", - field=models.TextField( - help_text="Contact's phone number. Must be in ITU.E164.2005 format" - ), + field=models.TextField(help_text="Contact's phone number. Must be in ITU.E164.2005 format"), ), ] diff --git a/src/registrar/migrations/0024_alter_contact_email.py b/src/registrar/migrations/0024_alter_contact_email.py index f512d5d82..8dcfc4ffe 100644 --- a/src/registrar/migrations/0024_alter_contact_email.py +++ b/src/registrar/migrations/0024_alter_contact_email.py @@ -12,8 +12,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="contact", name="email", - field=models.EmailField( - blank=True, db_index=True, help_text="Email", max_length=254, null=True - ), + field=models.EmailField(blank=True, db_index=True, help_text="Email", max_length=254, null=True), ), ] diff --git a/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py b/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py index 6e28f5cbb..77da9e79c 100644 --- a/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py +++ b/src/registrar/migrations/0026_alter_domainapplication_address_line2_and_more.py @@ -12,15 +12,11 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="domainapplication", name="address_line2", - field=models.TextField( - blank=True, help_text="Street address line 2", null=True - ), + field=models.TextField(blank=True, help_text="Street address line 2", null=True), ), migrations.AlterField( model_name="domaininformation", name="address_line2", - field=models.TextField( - blank=True, help_text="Street address line 2", null=True - ), + field=models.TextField(blank=True, help_text="Street address line 2", null=True), ), ] diff --git a/src/registrar/migrations/0031_transitiondomain_and_more.py b/src/registrar/migrations/0031_transitiondomain_and_more.py index e378a33de..9d1153764 100644 --- a/src/registrar/migrations/0031_transitiondomain_and_more.py +++ b/src/registrar/migrations/0031_transitiondomain_and_more.py @@ -95,16 +95,12 @@ class Migration(migrations.Migration): migrations.AddField( model_name="domainapplication", name="about_your_organization", - field=models.TextField( - blank=True, help_text="Information about your organization", null=True - ), + field=models.TextField(blank=True, help_text="Information about your organization", null=True), ), migrations.AddField( model_name="domaininformation", name="about_your_organization", - field=models.TextField( - blank=True, help_text="Information about your organization", null=True - ), + field=models.TextField(blank=True, help_text="Information about your organization", null=True), ), migrations.AlterField( model_name="domainapplication", diff --git a/src/registrar/models/domain.py b/src/registrar/models/domain.py index c5c0b63f7..cb3b784e9 100644 --- a/src/registrar/models/domain.py +++ b/src/registrar/models/domain.py @@ -262,10 +262,7 @@ class Domain(TimeStampedModel, DomainHelper): doesn't add the created host to the domain returns ErrorCode (int)""" if addrs is not None and addrs != []: - addresses = [ - epp.Ip(addr=addr, ip="v6" if self.is_ipv6(addr) else None) - for addr in addrs - ] + addresses = [epp.Ip(addr=addr, ip="v6" if self.is_ipv6(addr) else None) for addr in addrs] request = commands.CreateHost(name=host, addrs=addresses) else: request = commands.CreateHost(name=host) @@ -358,15 +355,11 @@ class Domain(TimeStampedModel, DomainHelper): raise NameserverError(code=nsErrorCodes.MISSING_IP, nameserver=nameserver) elif not cls.isSubdomain(name, nameserver) and (ip is not None and ip != []): - raise NameserverError( - code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip - ) + raise NameserverError(code=nsErrorCodes.GLUE_RECORD_NOT_ALLOWED, nameserver=nameserver, ip=ip) elif ip is not None and ip != []: for addr in ip: if not cls._valid_ip_addr(addr): - raise NameserverError( - code=nsErrorCodes.INVALID_IP, nameserver=nameserver[:40], ip=ip - ) + raise NameserverError(code=nsErrorCodes.INVALID_IP, nameserver=nameserver[:40], ip=ip) return None @classmethod @@ -382,9 +375,7 @@ class Domain(TimeStampedModel, DomainHelper): except ValueError: return False - def getNameserverChanges( - self, hosts: list[tuple[str, list]] - ) -> tuple[list, list, dict, dict]: + def getNameserverChanges(self, hosts: list[tuple[str, list]]) -> tuple[list, list, dict, dict]: """ calls self.nameserver, it should pull from cache but may result in an epp call @@ -420,40 +411,27 @@ class Domain(TimeStampedModel, DomainHelper): else: # TODO - host is being updated when previous was None+new is empty list # add check here - if newHostDict[prevHost] is not None and set( - newHostDict[prevHost] - ) != set(addrs): - self.__class__.checkHostIPCombo( - name=self.name, nameserver=prevHost, ip=newHostDict[prevHost] - ) + if newHostDict[prevHost] is not None and set(newHostDict[prevHost]) != set(addrs): + self.__class__.checkHostIPCombo(name=self.name, nameserver=prevHost, ip=newHostDict[prevHost]) updated_values.append((prevHost, newHostDict[prevHost])) new_values = { - key: newHostDict.get(key) - for key in newHostDict - if key not in previousHostDict and key.strip() != "" + key: newHostDict.get(key) for key in newHostDict if key not in previousHostDict and key.strip() != "" } for nameserver, ip in new_values.items(): - self.__class__.checkHostIPCombo( - name=self.name, nameserver=nameserver, ip=ip - ) + self.__class__.checkHostIPCombo(name=self.name, nameserver=nameserver, ip=ip) return (deleted_values, updated_values, new_values, previousHostDict) def _update_host_values(self, updated_values, oldNameservers): for hostTuple in updated_values: - updated_response_code = self._update_host( - hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0]) - ) + updated_response_code = self._update_host(hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0])) if updated_response_code not in [ ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY, ErrorCode.OBJECT_EXISTS, ]: - logger.warning( - "Could not update host %s. Error code was: %s " - % (hostTuple[0], updated_response_code) - ) + logger.warning("Could not update host %s. Error code was: %s " % (hostTuple[0], updated_response_code)) def createNewHostList(self, new_values: dict): """convert the dictionary of new values to a list of HostObjSet @@ -469,13 +447,8 @@ class Domain(TimeStampedModel, DomainHelper): hostStringList = [] for key, value in new_values.items(): - createdCode = self._create_host( - host=key, addrs=value - ) # creates in registry - if ( - createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY - or createdCode == ErrorCode.OBJECT_EXISTS - ): + createdCode = self._create_host(host=key, addrs=value) # creates in registry + if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY or createdCode == ErrorCode.OBJECT_EXISTS: hostStringList.append(key) if hostStringList == []: return [], 0 @@ -522,9 +495,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.info("Domain does not have dnssec data defined %s" % err) return None - def getDnssecdataChanges( - self, _dnssecdata: Optional[extensions.DNSSECExtension] - ) -> tuple[dict, dict]: + def getDnssecdataChanges(self, _dnssecdata: Optional[extensions.DNSSECExtension]) -> tuple[dict, dict]: """ calls self.dnssecdata, it should pull from cache but may result in an epp call @@ -550,20 +521,12 @@ class Domain(TimeStampedModel, DomainHelper): if oldDnssecdata and len(oldDnssecdata.dsData) > 0: # if existing dsData not in new dsData, mark for removal - dsDataForRemoval = [ - dsData - for dsData in oldDnssecdata.dsData - if dsData not in _dnssecdata.dsData - ] + dsDataForRemoval = [dsData for dsData in oldDnssecdata.dsData if dsData not in _dnssecdata.dsData] if len(dsDataForRemoval) > 0: remDnssecdata["dsData"] = dsDataForRemoval # if new dsData not in existing dsData, mark for add - dsDataForAdd = [ - dsData - for dsData in _dnssecdata.dsData - if dsData not in oldDnssecdata.dsData - ] + dsDataForAdd = [dsData for dsData in _dnssecdata.dsData if dsData not in oldDnssecdata.dsData] if len(dsDataForAdd) > 0: addDnssecdata["dsData"] = dsDataForAdd else: @@ -598,9 +561,7 @@ class Domain(TimeStampedModel, DomainHelper): if "dsData" in _remDnssecdata and _remDnssecdata["dsData"] is not None: registry.send(remRequest, cleaned=True) except RegistryError as e: - logger.error( - "Error updating DNSSEC, code was %s error was %s" % (e.code, e) - ) + logger.error("Error updating DNSSEC, code was %s error was %s" % (e.code, e)) raise e @nameservers.setter # type: ignore @@ -630,14 +591,10 @@ class Domain(TimeStampedModel, DomainHelper): oldNameservers, ) = self.getNameserverChanges(hosts=hosts) - _ = self._update_host_values( - updated_values, oldNameservers - ) # returns nothing, just need to be run and errors + _ = self._update_host_values(updated_values, oldNameservers) # returns nothing, just need to be run and errors addToDomainList, addToDomainCount = self.createNewHostList(new_values) deleteHostList, deleteCount = self.createDeleteHostList(deleted_values) - responseCode = self.addAndRemoveHostsFromDomain( - hostsToAdd=addToDomainList, hostsToDelete=deleteHostList - ) + responseCode = self.addAndRemoveHostsFromDomain(hostsToAdd=addToDomainList, hostsToDelete=deleteHostList) # if unable to update domain raise error and stop if responseCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: @@ -651,19 +608,13 @@ class Domain(TimeStampedModel, DomainHelper): self.dns_needed() self.save() except Exception as err: - logger.info( - "nameserver setter checked for dns_needed state " - "and it did not succeed. Warning: %s" % err - ) + logger.info("nameserver setter checked for dns_needed state and it did not succeed. Warning: %s" % err) elif successTotalNameservers >= 2 and successTotalNameservers <= 13: try: self.ready() self.save() except Exception as err: - logger.info( - "nameserver setter checked for create state " - "and it did not succeed. Warning: %s" % err - ) + logger.info("nameserver setter checked for create state and it did not succeed. Warning: %s" % err) @Cache def statuses(self) -> list[str]: @@ -698,9 +649,7 @@ class Domain(TimeStampedModel, DomainHelper): so follow on additions will update the current registrant""" logger.info("making registrant contact") - self._set_singleton_contact( - contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT - ) + self._set_singleton_contact(contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT) @Cache def administrative_contact(self) -> PublicContact | None: @@ -712,9 +661,7 @@ class Domain(TimeStampedModel, DomainHelper): def administrative_contact(self, contact: PublicContact): logger.info("making admin contact") if contact.contact_type != contact.ContactTypeChoices.ADMINISTRATIVE: - raise ValueError( - "Cannot set a registrant contact with a different contact type" - ) + raise ValueError("Cannot set a registrant contact with a different contact type") self._make_contact_in_registry(contact=contact) self._update_domain_with_contact(contact, rem=False) @@ -736,20 +683,14 @@ class Domain(TimeStampedModel, DomainHelper): try: registry.send(updateContact, cleaned=True) except RegistryError as e: - logger.error( - "Error updating contact, code was %s error was %s" % (e.code, e) - ) + logger.error("Error updating contact, code was %s error was %s" % (e.code, e)) # TODO - ticket 433 human readable error handling here def _update_domain_with_contact(self, contact: PublicContact, rem=False): """adds or removes a contact from a domain rem being true indicates the contact will be removed from registry""" - logger.info( - "_update_domain_with_contact() received type %s " % contact.contact_type - ) - domainContact = epp.DomainContact( - contact=contact.registry_id, type=contact.contact_type - ) + logger.info("_update_domain_with_contact() received type %s " % contact.contact_type) + domainContact = epp.DomainContact(contact=contact.registry_id, type=contact.contact_type) updateDomain = commands.UpdateDomain(name=self.name, add=[domainContact]) if rem: @@ -758,17 +699,12 @@ class Domain(TimeStampedModel, DomainHelper): try: registry.send(updateDomain, cleaned=True) except RegistryError as e: - logger.error( - "Error changing contact on a domain. Error code is %s error was %s" - % (e.code, e) - ) + logger.error("Error changing contact on a domain. Error code is %s error was %s" % (e.code, e)) action = "add" if rem: action = "remove" - raise Exception( - "Can't %s the contact of type %s" % (action, contact.contact_type) - ) + raise Exception("Can't %s the contact of type %s" % (action, contact.contact_type)) @Cache def security_contact(self) -> PublicContact | None: @@ -778,16 +714,11 @@ class Domain(TimeStampedModel, DomainHelper): def _add_registrant_to_existing_domain(self, contact: PublicContact): """Used to change the registrant contact on an existing domain""" - updateDomain = commands.UpdateDomain( - name=self.name, registrant=contact.registry_id - ) + updateDomain = commands.UpdateDomain(name=self.name, registrant=contact.registry_id) try: registry.send(updateDomain, cleaned=True) except RegistryError as e: - logger.error( - "Error changing to new registrant error code is %s, error is %s" - % (e.code, e) - ) + logger.error("Error changing to new registrant error code is %s, error is %s" % (e.code, e)) # TODO-error handling better here? def _set_singleton_contact(self, contact: PublicContact, expectedType: str): # noqa @@ -800,16 +731,10 @@ class Domain(TimeStampedModel, DomainHelper): Will throw error if contact type is not the same as expectType Raises ValueError if expected type doesn't match the contact type""" if expectedType != contact.contact_type: - raise ValueError( - "Cannot set a contact with a different contact type," - " expected type was %s" % expectedType - ) + raise ValueError("Cannot set a contact with a different contact type, expected type was %s" % expectedType) isRegistrant = contact.contact_type == contact.ContactTypeChoices.REGISTRANT - isEmptySecurity = ( - contact.contact_type == contact.ContactTypeChoices.SECURITY - and contact.email == "" - ) + isEmptySecurity = contact.contact_type == contact.ContactTypeChoices.SECURITY and contact.email == "" # get publicContact objects that have the matching # domain and type but a different id @@ -827,10 +752,7 @@ class Domain(TimeStampedModel, DomainHelper): # contact is already added to the domain, but something may have changed on it alreadyExistsInRegistry = errorCode == ErrorCode.OBJECT_EXISTS # if an error occured besides duplication, stop - if ( - not alreadyExistsInRegistry - and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY - ): + if not alreadyExistsInRegistry and errorCode != ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: # TODO- ticket #433 look here for error handling raise RegistryError(code=errorCode) @@ -839,9 +761,7 @@ class Domain(TimeStampedModel, DomainHelper): # if has conflicting contacts in our db remove them if hasOtherContact: - logger.info( - "_set_singleton_contact()-> updating domain, removing old contact" - ) + logger.info("_set_singleton_contact()-> updating domain, removing old contact") existing_contact = ( PublicContact.objects.exclude(registry_id=contact.registry_id) @@ -859,9 +779,7 @@ class Domain(TimeStampedModel, DomainHelper): self._update_domain_with_contact(contact=existing_contact, rem=True) existing_contact.delete() except Exception as err: - logger.error( - "Raising error after removing and adding a new contact" - ) + logger.error("Raising error after removing and adding a new contact") raise (err) # update domain with contact or update the contact itself @@ -870,9 +788,7 @@ class Domain(TimeStampedModel, DomainHelper): self._update_domain_with_contact(contact=contact, rem=False) # if already exists just update elif alreadyExistsInRegistry: - current_contact = PublicContact.objects.filter( - registry_id=contact.registry_id - ).get() + current_contact = PublicContact.objects.filter(registry_id=contact.registry_id).get() if current_contact.email != contact.email: self._update_epp_contact(contact=contact) @@ -880,9 +796,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.info("removing security contact and setting default again") # get the current contact registry id for security - current_contact = PublicContact.objects.filter( - registry_id=contact.registry_id - ).get() + current_contact = PublicContact.objects.filter(registry_id=contact.registry_id).get() # don't let user delete the default without adding a new email if current_contact.email != PublicContact.get_default_security().email: @@ -900,9 +814,7 @@ class Domain(TimeStampedModel, DomainHelper): from domain information (not domain application) and should have the security email from DomainApplication""" logger.info("making security contact in registry") - self._set_singleton_contact( - contact, expectedType=contact.ContactTypeChoices.SECURITY - ) + self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.SECURITY) @Cache def technical_contact(self) -> PublicContact | None: @@ -913,9 +825,7 @@ class Domain(TimeStampedModel, DomainHelper): @technical_contact.setter # type: ignore def technical_contact(self, contact: PublicContact): logger.info("making technical contact") - self._set_singleton_contact( - contact, expectedType=contact.ContactTypeChoices.TECHNICAL - ) + self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.TECHNICAL) def is_active(self) -> bool: """Currently just returns if the state is created, @@ -996,17 +906,13 @@ class Domain(TimeStampedModel, DomainHelper): expiration_date = DateField( null=True, - help_text=( - "Duplication of registry's expiration" "date saved for ease of reporting" - ), + help_text=("Duplication of registry's expiration date saved for ease of reporting"), ) def isActive(self): return self.state == Domain.State.CREATED - def map_epp_contact_to_public_contact( - self, contact: eppInfo.InfoContactResultData, contact_id, contact_type - ): + def map_epp_contact_to_public_contact(self, contact: eppInfo.InfoContactResultData, contact_id, contact_type): """Maps the Epp contact representation to a PublicContact object. contact -> eppInfo.InfoContactResultData: The converted contact object @@ -1028,10 +934,7 @@ class Domain(TimeStampedModel, DomainHelper): # Since contact_id is registry_id, # check that its the right length contact_id_length = len(contact_id) - if ( - contact_id_length > PublicContact.get_max_id_length() - or contact_id_length < 1 - ): + if contact_id_length > PublicContact.get_max_id_length() or contact_id_length < 1: raise ContactError(code=ContactErrorCodes.CONTACT_ID_INVALID_LENGTH) if not isinstance(contact, eppInfo.InfoContactResultData): @@ -1114,9 +1017,7 @@ class Domain(TimeStampedModel, DomainHelper): ) raise error - def generic_contact_getter( - self, contact_type_choice: PublicContact.ContactTypeChoices - ) -> PublicContact | None: + def generic_contact_getter(self, contact_type_choice: PublicContact.ContactTypeChoices) -> PublicContact | None: """Retrieves the desired PublicContact from the registry. This abstracts the caching and EPP retrieval for all contact items and thus may result in EPP calls being sent. @@ -1187,9 +1088,7 @@ class Domain(TimeStampedModel, DomainHelper): if contact_type == PublicContact.ContactTypeChoices.REGISTRANT: desired_contact = None if isinstance(contacts, str): - desired_contact = self._registrant_to_public_contact( - self._cache["registrant"] - ) + desired_contact = self._registrant_to_public_contact(self._cache["registrant"]) # Set the cache with the updated object # for performance reasons. if "registrant" in self._cache: @@ -1203,9 +1102,7 @@ class Domain(TimeStampedModel, DomainHelper): if contacts is not None and contact_type in contacts: _registry_id = contacts.get(contact_type) - desired = PublicContact.objects.filter( - registry_id=_registry_id, domain=self, contact_type=contact_type - ) + desired = PublicContact.objects.filter(registry_id=_registry_id, domain=self, contact_type=contact_type) if desired.count() == 1: return desired.get() @@ -1214,10 +1111,7 @@ class Domain(TimeStampedModel, DomainHelper): return None def _handle_registrant_contact(self, contact): - if ( - contact.contact_type is not None - and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT - ): + if contact.contact_type is not None and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT: return contact else: raise ValueError("Invalid contact object for registrant_contact") @@ -1297,9 +1191,7 @@ class Domain(TimeStampedModel, DomainHelper): administrative_contact = self.get_default_administrative_contact() administrative_contact.save() - @transition( - field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD - ) + @transition(field="state", source=[State.READY, State.ON_HOLD], target=State.ON_HOLD) def place_client_hold(self, ignoreEPP=False): """place a clienthold on a domain (no longer should resolve) ignoreEPP (boolean) - set to true to by-pass EPP (used for transition domains) @@ -1325,9 +1217,7 @@ class Domain(TimeStampedModel, DomainHelper): self._remove_client_hold() # TODO -on the client hold ticket any additional error handling here - @transition( - field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED - ) + @transition(field="state", source=[State.ON_HOLD, State.DNS_NEEDED], target=State.DELETED) def deletedInEpp(self): """Domain is deleted in epp but is saved in our database. Error handling should be provided by the caller.""" @@ -1345,9 +1235,7 @@ class Domain(TimeStampedModel, DomainHelper): logger.error("Could not delete domain. FSM failure: {err}") raise err except Exception as err: - logger.error( - f"Could not delete domain. An unspecified error occured: {err}" - ) + logger.error(f"Could not delete domain. An unspecified error occured: {err}") raise err else: self._invalidate_cache() @@ -1407,9 +1295,7 @@ class Domain(TimeStampedModel, DomainHelper): is_security = contact.contact_type == contact.ContactTypeChoices.SECURITY DF = epp.DiscloseField fields = {DF.EMAIL} - disclose = ( - is_security and contact.email != PublicContact.get_default_security().email - ) + disclose = is_security and contact.email != PublicContact.get_default_security().email # Will only disclose DF.EMAIL if its not the default return epp.Disclose( flag=disclose, @@ -1421,9 +1307,7 @@ class Domain(TimeStampedModel, DomainHelper): name=contact.name, addr=epp.ContactAddr( street=[ - getattr(contact, street) - for street in ["street1", "street2", "street3"] - if hasattr(contact, street) + getattr(contact, street) for street in ["street1", "street2", "street3"] if hasattr(contact, street) ], # type: ignore city=contact.city, pc=contact.pc, @@ -1488,9 +1372,7 @@ class Domain(TimeStampedModel, DomainHelper): data = registry.send(req, cleaned=True).res_data[0] # Map the object we recieved from EPP to a PublicContact - mapped_object = self.map_epp_contact_to_public_contact( - data, domainContact.contact, domainContact.type - ) + mapped_object = self.map_epp_contact_to_public_contact(data, domainContact.contact, domainContact.type) # Find/create it in the DB in_db = self._get_or_create_public_contact(mapped_object) @@ -1503,9 +1385,7 @@ class Domain(TimeStampedModel, DomainHelper): return self._request_contact_info(contact) except RegistryError as e: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: - logger.info( - "_get_or_create_contact()-> contact doesn't exist so making it" - ) + logger.info("_get_or_create_contact()-> contact doesn't exist so making it") contact.domain = self contact.save() # this will call the function based on type of contact return self._request_contact_info(contact=contact) @@ -1560,9 +1440,7 @@ class Domain(TimeStampedModel, DomainHelper): return [] for ip_addr in ip_list: - edited_ip_list.append( - epp.Ip(addr=ip_addr, ip="v6" if self.is_ipv6(ip_addr) else None) - ) + edited_ip_list.append(epp.Ip(addr=ip_addr, ip="v6" if self.is_ipv6(ip_addr) else None)) return edited_ip_list @@ -1579,12 +1457,7 @@ class Domain(TimeStampedModel, DomainHelper): """ try: - if ( - ip_list is None - or len(ip_list) == 0 - and isinstance(old_ip_list, list) - and len(old_ip_list) != 0 - ): + if ip_list is None or len(ip_list) == 0 and isinstance(old_ip_list, list) and len(old_ip_list) != 0: return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY added_ip_list = set(ip_list).difference(old_ip_list) @@ -1607,9 +1480,7 @@ class Domain(TimeStampedModel, DomainHelper): else: raise e - def addAndRemoveHostsFromDomain( - self, hostsToAdd: list[str], hostsToDelete: list[str] - ): + def addAndRemoveHostsFromDomain(self, hostsToAdd: list[str], hostsToDelete: list[str]): """sends an UpdateDomain message to the registry with the hosts provided Args: hostsToDelete (list[epp.HostObjSet])- list of host objects to delete @@ -1624,22 +1495,14 @@ class Domain(TimeStampedModel, DomainHelper): return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY try: - updateReq = commands.UpdateDomain( - name=self.name, rem=hostsToDelete, add=hostsToAdd - ) + updateReq = commands.UpdateDomain(name=self.name, rem=hostsToDelete, add=hostsToAdd) - logger.info( - "addAndRemoveHostsFromDomain()-> sending update domain req as %s" - % updateReq - ) + logger.info("addAndRemoveHostsFromDomain()-> sending update domain req as %s" % updateReq) response = registry.send(updateReq, cleaned=True) return response.code except RegistryError as e: - logger.error( - "Error addAndRemoveHostsFromDomain, code was %s error was %s" - % (e.code, e) - ) + logger.error("Error addAndRemoveHostsFromDomain, code was %s error was %s" % (e.code, e)) return e.code def _delete_hosts_if_not_used(self, hostsToDelete: list[str]): @@ -1657,22 +1520,13 @@ class Domain(TimeStampedModel, DomainHelper): for nameserver in hostsToDelete: deleteHostReq = commands.DeleteHost(name=nameserver) registry.send(deleteHostReq, cleaned=True) - logger.info( - "_delete_hosts_if_not_used()-> sending delete host req as %s" - % deleteHostReq - ) + logger.info("_delete_hosts_if_not_used()-> sending delete host req as %s" % deleteHostReq) except RegistryError as e: if e.code == ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION: - logger.info( - "Did not remove host %s because it is in use on another domain." - % nameserver - ) + logger.info("Did not remove host %s because it is in use on another domain." % nameserver) else: - logger.error( - "Error _delete_hosts_if_not_used, code was %s error was %s" - % (e.code, e) - ) + logger.error("Error _delete_hosts_if_not_used, code was %s error was %s" % (e.code, e)) def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False): """Contact registry for info about a domain.""" @@ -1768,9 +1622,7 @@ class Domain(TimeStampedModel, DomainHelper): # Raise an error if we find duplicates. # This should not occur if db_contact.count() > 1: - raise Exception( - f"Multiple contacts found for {public_contact.contact_type}" - ) + raise Exception(f"Multiple contacts found for {public_contact.contact_type}") # Save to DB if it doesn't exist already. if db_contact.count() == 0: @@ -1784,10 +1636,7 @@ class Domain(TimeStampedModel, DomainHelper): # Does the item we're grabbing match # what we have in our DB? - if ( - existing_contact.email != public_contact.email - or existing_contact.registry_id != public_contact.registry_id - ): + if existing_contact.email != public_contact.email or existing_contact.registry_id != public_contact.registry_id: existing_contact.delete() public_contact.save() logger.warning("Requested PublicContact is out of sync " "with DB.") @@ -1809,9 +1658,7 @@ class Domain(TimeStampedModel, DomainHelper): # Grabs the expanded contact full_object = self._request_contact_info(contact) # Maps it to type PublicContact - mapped_object = self.map_epp_contact_to_public_contact( - full_object, contact.registry_id, contact.contact_type - ) + mapped_object = self.map_epp_contact_to_public_contact(full_object, contact.registry_id, contact.contact_type) return self._get_or_create_public_contact(mapped_object) def _invalidate_cache(self): @@ -1829,6 +1676,4 @@ class Domain(TimeStampedModel, DomainHelper): if property in self._cache: return self._cache[property] else: - raise KeyError( - "Requested key %s was not found in registry cache." % str(property) - ) + raise KeyError("Requested key %s was not found in registry cache." % str(property)) diff --git a/src/registrar/models/domain_application.py b/src/registrar/models/domain_application.py index 68fbfab0d..72af63f81 100644 --- a/src/registrar/models/domain_application.py +++ b/src/registrar/models/domain_application.py @@ -131,8 +131,7 @@ class DomainApplication(TimeStampedModel): FEDERAL = ( "federal", - "Federal: an agency of the U.S. government's executive, " - "legislative, or judicial branches", + "Federal: an agency of the U.S. government's executive, legislative, or judicial branches", ) INTERSTATE = "interstate", "Interstate: an organization of two or more states" STATE_OR_TERRITORY = ( @@ -143,8 +142,7 @@ class DomainApplication(TimeStampedModel): ) TRIBAL = ( "tribal", - "Tribal: a tribal government recognized by the federal or a state " - "government", + "Tribal: a tribal government recognized by the federal or a state government", ) COUNTY = "county", "County: a county, parish, or borough" CITY = "city", "City: a city, town, township, village, etc." @@ -154,8 +152,7 @@ class DomainApplication(TimeStampedModel): ) SCHOOL_DISTRICT = ( "school_district", - "School district: a school district that is not part of a local " - "government", + "School district: a school district that is not part of a local government", ) class BranchChoices(models.TextChoices): @@ -168,10 +165,7 @@ class DomainApplication(TimeStampedModel): "Advisory Council on Historic Preservation", "American Battle Monuments Commission", "Appalachian Regional Commission", - ( - "Appraisal Subcommittee of the Federal Financial " - "Institutions Examination Council" - ), + ("Appraisal Subcommittee of the Federal Financial Institutions Examination Council"), "Armed Forces Retirement Home", "Barry Goldwater Scholarship and Excellence in Education Program", "Central Intelligence Agency", @@ -507,9 +501,7 @@ class DomainApplication(TimeStampedModel): return not self.approved_domain.is_active() return True - def _send_status_update_email( - self, new_status, email_template, email_template_subject - ): + def _send_status_update_email(self, new_status, email_template, email_template_subject): """Send a atatus update email to the submitter. The email goes to the email address that the submitter gave as their @@ -518,9 +510,7 @@ class DomainApplication(TimeStampedModel): """ if self.submitter is None or self.submitter.email is None: - logger.warning( - f"Cannot send {new_status} email, no submitter email address." - ) + logger.warning(f"Cannot send {new_status} email, no submitter email address.") return try: send_templated_email( @@ -533,9 +523,7 @@ class DomainApplication(TimeStampedModel): except EmailSendingError: logger.warning("Failed to send confirmation email", exc_info=True) - @transition( - field="status", source=[STARTED, ACTION_NEEDED, WITHDRAWN], target=SUBMITTED - ) + @transition(field="status", source=[STARTED, ACTION_NEEDED, WITHDRAWN], target=SUBMITTED) def submit(self): """Submit an application that is started. diff --git a/src/registrar/models/domain_invitation.py b/src/registrar/models/domain_invitation.py index dff03fb87..1e0b7fec8 100644 --- a/src/registrar/models/domain_invitation.py +++ b/src/registrar/models/domain_invitation.py @@ -57,9 +57,7 @@ class DomainInvitation(TimeStampedModel): except User.DoesNotExist: # should not happen because a matching user should exist before # we retrieve this invitation - raise RuntimeError( - "Cannot find the user to retrieve this domain invitation." - ) + raise RuntimeError("Cannot find the user to retrieve this domain invitation.") # and create a role for that user on this domain _, created = UserDomainRole.objects.get_or_create( @@ -68,6 +66,4 @@ class DomainInvitation(TimeStampedModel): if not created: # something strange happened and this role already existed when # the invitation was retrieved. Log that this occurred. - logger.warn( - "Invitation %s was retrieved for a role that already exists.", self - ) + logger.warn("Invitation %s was retrieved for a role that already exists.", self) diff --git a/src/registrar/models/public_contact.py b/src/registrar/models/public_contact.py index 4afe3c467..c49a66260 100644 --- a/src/registrar/models/public_contact.py +++ b/src/registrar/models/public_contact.py @@ -10,9 +10,7 @@ from .utility.time_stamped_model import TimeStampedModel def get_id(): """Generate a 16 character registry ID with a low probability of collision.""" day = datetime.today().strftime("%A")[:2] - rand = "".join( - choices(ascii_uppercase + ascii_lowercase + digits, k=14) # nosec B311 - ) + rand = "".join(choices(ascii_uppercase + ascii_lowercase + digits, k=14)) # nosec B311 return f"{day}{rand}" @@ -69,16 +67,12 @@ class PublicContact(TimeStampedModel): pc = models.TextField(null=False, help_text="Contact's postal code") cc = models.TextField(null=False, help_text="Contact's country code") email = models.TextField(null=False, help_text="Contact's email address") - voice = models.TextField( - null=False, help_text="Contact's phone number. Must be in ITU.E164.2005 format" - ) + voice = models.TextField(null=False, help_text="Contact's phone number. Must be in ITU.E164.2005 format") fax = models.TextField( null=True, help_text="Contact's fax number (null ok). Must be in ITU.E164.2005 format.", ) - pw = models.TextField( - null=False, help_text="Contact's authorization code. 16 characters minimum." - ) + pw = models.TextField(null=False, help_text="Contact's authorization code. 16 characters minimum.") @classmethod def get_default_registrant(cls): @@ -154,8 +148,4 @@ class PublicContact(TimeStampedModel): return cls._meta.get_field("registry_id").max_length def __str__(self): - return ( - f"{self.name} <{self.email}>" - f"id: {self.registry_id} " - f"type: {self.contact_type}" - ) + return f"{self.name} <{self.email}>" f"id: {self.registry_id} " f"type: {self.contact_type}" diff --git a/src/registrar/models/user.py b/src/registrar/models/user.py index 51ac799a2..bde73d3cb 100644 --- a/src/registrar/models/user.py +++ b/src/registrar/models/user.py @@ -68,9 +68,7 @@ class User(AbstractUser): def check_domain_invitations_on_login(self): """When a user first arrives on the site, we need to retrieve any domain invitations that match their email address.""" - for invitation in DomainInvitation.objects.filter( - email=self.email, status=DomainInvitation.INVITED - ): + for invitation in DomainInvitation.objects.filter(email=self.email, status=DomainInvitation.INVITED): try: invitation.retrieve() invitation.save() @@ -78,9 +76,7 @@ class User(AbstractUser): # retrieving should not fail because of a missing user, but # if it does fail, log the error so a new user can continue # logging in - logger.warn( - "Failed to retrieve invitation %s", invitation, exc_info=True - ) + logger.warn("Failed to retrieve invitation %s", invitation, exc_info=True) def create_domain_and_invite(self, transition_domain: TransitionDomain): transition_domain_name = transition_domain.domain_name @@ -89,9 +85,7 @@ class User(AbstractUser): # type safety check. name should never be none if transition_domain_name is not None: - new_domain = Domain( - name=transition_domain_name, state=transition_domain_status - ) + new_domain = Domain(name=transition_domain_name, state=transition_domain_status) new_domain.save() # check that a domain invitation doesn't already # exist for this e-mail / Domain pair @@ -100,9 +94,7 @@ class User(AbstractUser): ).exists() if not domain_email_already_in_domain_invites: # Create new domain invitation - new_domain_invitation = DomainInvitation( - email=transition_domain_email.lower(), domain=new_domain - ) + new_domain_invitation = DomainInvitation(email=transition_domain_email.lower(), domain=new_domain) new_domain_invitation.save() def check_transition_domains_on_login(self): @@ -129,9 +121,7 @@ class User(AbstractUser): # with our data and migrations need to be run again. # Get the domain that corresponds with this transition domain - domain_exists = Domain.objects.filter( - name=transition_domain.domain_name - ).exists() + domain_exists = Domain.objects.filter(name=transition_domain.domain_name).exists() if not domain_exists: logger.warn( """There are transition domains without @@ -147,9 +137,7 @@ class User(AbstractUser): # Create a domain information object, if one doesn't # already exist - domain_info_exists = DomainInformation.objects.filter( - domain=domain - ).exists() + domain_info_exists = DomainInformation.objects.filter(domain=domain).exists() if not domain_info_exists: new_domain_info = DomainInformation(creator=self, domain=domain) new_domain_info.save() diff --git a/src/registrar/models/user_domain_role.py b/src/registrar/models/user_domain_role.py index 7b1f550d3..479f75089 100644 --- a/src/registrar/models/user_domain_role.py +++ b/src/registrar/models/user_domain_role.py @@ -44,7 +44,5 @@ class UserDomainRole(TimeStampedModel): constraints = [ # a user can have only one role on a given domain, that is, there can # be only a single row with a certain (user, domain) pair. - models.UniqueConstraint( - fields=["user", "domain"], name="unique_user_domain_role" - ) + models.UniqueConstraint(fields=["user", "domain"], name="unique_user_domain_role") ] diff --git a/src/registrar/models/user_group.py b/src/registrar/models/user_group.py index 568741786..cf261286e 100644 --- a/src/registrar/models/user_group.py +++ b/src/registrar/models/user_group.py @@ -87,14 +87,10 @@ class UserGroup(Group): permissions = permission["permissions"] # Retrieve the content type for the app and model - content_type = ContentType.objects.get( - app_label=app_label, model=model_name - ) + content_type = ContentType.objects.get(app_label=app_label, model=model_name) # Retrieve the permissions based on their codenames - permissions = Permission.objects.filter( - content_type=content_type, codename__in=permissions - ) + permissions = Permission.objects.filter(content_type=content_type, codename__in=permissions) # Assign the permissions to the group cisa_analysts_group.permissions.add(*permissions) @@ -113,9 +109,7 @@ class UserGroup(Group): ) cisa_analysts_group.save() - logger.debug( - "CISA Analyt permissions added to group " + cisa_analysts_group.name - ) + logger.debug("CISA Analyt permissions added to group " + cisa_analysts_group.name) except Exception as e: logger.error(f"Error creating analyst permissions group: {e}") diff --git a/src/registrar/templatetags/custom_filters.py b/src/registrar/templatetags/custom_filters.py index 14e2c9e3e..de2051989 100644 --- a/src/registrar/templatetags/custom_filters.py +++ b/src/registrar/templatetags/custom_filters.py @@ -55,9 +55,7 @@ def contains_checkbox(html_list): @register.filter def get_organization_long_name(organization_type): - organization_choices_dict = dict( - DomainApplication.OrganizationChoicesVerbose.choices - ) + organization_choices_dict = dict(DomainApplication.OrganizationChoicesVerbose.choices) long_form_type = organization_choices_dict[organization_type] if long_form_type is None: logger.error("Organization type error, triggered by a template's custom filter") diff --git a/src/registrar/tests/common.py b/src/registrar/tests/common.py index ed6ea3366..531bd562e 100644 --- a/src/registrar/tests/common.py +++ b/src/registrar/tests/common.py @@ -317,9 +317,7 @@ class AuditedAdminMockData: of either DomainApplication, DomainInvitation, or DomainInformation based on the 'domain_type' field. """ # noqa - common_args = self.get_common_domain_arg_dictionary( - item_name, org_type, federal_type, purpose - ) + common_args = self.get_common_domain_arg_dictionary(item_name, org_type, federal_type, purpose) full_arg_dict = None match domain_type: case self.APPLICATION: @@ -344,40 +342,22 @@ class AuditedAdminMockData: ) return full_arg_dict - def create_full_dummy_domain_application( - self, item_name, status=DomainApplication.STARTED - ): + def create_full_dummy_domain_application(self, item_name, status=DomainApplication.STARTED): """Creates a dummy domain application object""" - domain_application_kwargs = self.dummy_kwarg_boilerplate( - self.APPLICATION, item_name, status - ) - application = DomainApplication.objects.get_or_create( - **domain_application_kwargs - )[0] + domain_application_kwargs = self.dummy_kwarg_boilerplate(self.APPLICATION, item_name, status) + application = DomainApplication.objects.get_or_create(**domain_application_kwargs)[0] return application - def create_full_dummy_domain_information( - self, item_name, status=DomainApplication.STARTED - ): + def create_full_dummy_domain_information(self, item_name, status=DomainApplication.STARTED): """Creates a dummy domain information object""" - domain_application_kwargs = self.dummy_kwarg_boilerplate( - self.INFORMATION, item_name, status - ) - application = DomainInformation.objects.get_or_create( - **domain_application_kwargs - )[0] + domain_application_kwargs = self.dummy_kwarg_boilerplate(self.INFORMATION, item_name, status) + application = DomainInformation.objects.get_or_create(**domain_application_kwargs)[0] return application - def create_full_dummy_domain_invitation( - self, item_name, status=DomainApplication.STARTED - ): + def create_full_dummy_domain_invitation(self, item_name, status=DomainApplication.STARTED): """Creates a dummy domain invitation object""" - domain_application_kwargs = self.dummy_kwarg_boilerplate( - self.INVITATION, item_name, status - ) - application = DomainInvitation.objects.get_or_create( - **domain_application_kwargs - )[0] + domain_application_kwargs = self.dummy_kwarg_boilerplate(self.INVITATION, item_name, status) + application = DomainInvitation.objects.get_or_create(**domain_application_kwargs)[0] return application @@ -394,17 +374,11 @@ class AuditedAdminMockData: application = None match domain_type: case self.APPLICATION: - application = self.create_full_dummy_domain_application( - item_name, status - ) + application = self.create_full_dummy_domain_application(item_name, status) case self.INVITATION: - application = self.create_full_dummy_domain_invitation( - item_name, status - ) + application = self.create_full_dummy_domain_invitation(item_name, status) case self.INFORMATION: - application = self.create_full_dummy_domain_information( - item_name, status - ) + application = self.create_full_dummy_domain_information(item_name, status) case _: raise ValueError("Invalid domain_type, must conform to given constants") @@ -527,9 +501,7 @@ def completed_application( if has_anything_else: domain_application_kwargs["anything_else"] = "There is more" - application, _ = DomainApplication.objects.get_or_create( - **domain_application_kwargs - ) + application, _ = DomainApplication.objects.get_or_create(**domain_application_kwargs) if has_other_contacts: application.other_contacts.add(other) @@ -631,11 +603,7 @@ class MockEppLib(TestCase): mockDataInfoDomain = fakedEppObject( "fakePw", cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), - contacts=[ - common.DomainContact( - contact="123", type=PublicContact.ContactTypeChoices.SECURITY - ) - ], + contacts=[common.DomainContact(contact="123", type=PublicContact.ContactTypeChoices.SECURITY)], hosts=["fake.host.com"], statuses=[ common.Status(state="serverTransferProhibited", description="", lang="en"), @@ -705,21 +673,11 @@ class MockEppLib(TestCase): mockDefaultTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData( "defaultTech", "dotgov@cisa.dhs.gov" ) - mockDefaultSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData( - "defaultSec", "dotgov@cisa.dhs.gov" - ) - mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData( - "securityContact", "security@mail.gov" - ) - mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData( - "technicalContact", "tech@mail.gov" - ) - mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData( - "adminContact", "admin@mail.gov" - ) - mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData( - "regContact", "registrant@mail.gov" - ) + mockDefaultSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData("defaultSec", "dotgov@cisa.dhs.gov") + mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData("securityContact", "security@mail.gov") + mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData("technicalContact", "tech@mail.gov") + mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData("adminContact", "admin@mail.gov") + mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData("regContact", "registrant@mail.gov") infoDomainNoContact = fakedEppObject( "security", @@ -759,9 +717,7 @@ class MockEppLib(TestCase): addrs=[common.Ip(addr="1.2.3.4"), common.Ip(addr="2.3.4.5")], ) - mockDataHostChange = fakedEppObject( - "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) - ) + mockDataHostChange = fakedEppObject("lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)) addDsData1 = { "keyTag": 1234, "alg": 3, @@ -852,9 +808,7 @@ class MockEppLib(TestCase): def _mockDomainName(self, _name, _avail=False): return MagicMock( res_data=[ - responses.check.CheckDomainResultData( - name=_name, avail=_avail, reason=None - ), + responses.check.CheckDomainResultData(name=_name, avail=_avail, reason=None), ] ) @@ -927,9 +881,7 @@ class MockEppLib(TestCase): name = getattr(_request, "name", None) fake_nameserver = "ns1.failDelete.gov" if name in fake_nameserver: - raise RegistryError( - code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION - ) + raise RegistryError(code=ErrorCode.OBJECT_ASSOCIATION_PROHIBITS_OPERATION) return None def mockInfoDomainCommands(self, _request, cleaned): @@ -948,9 +900,7 @@ class MockEppLib(TestCase): ), "dnssec-none.gov": (self.mockDataInfoDomain, None), "my-nameserver.gov": ( - self.infoDomainTwoHosts - if self.mockedSendFunction.call_count == 5 - else self.infoDomainNoHost, + self.infoDomainTwoHosts if self.mockedSendFunction.call_count == 5 else self.infoDomainNoHost, None, ), "nameserverwithip.gov": (self.infoDomainHasIP, None), @@ -963,9 +913,7 @@ class MockEppLib(TestCase): } # Retrieve the corresponding values from the dictionary - res_data, extensions = request_mappings.get( - request_name, (self.mockDataInfoDomain, None) - ) + res_data, extensions = request_mappings.get(request_name, (self.mockDataInfoDomain, None)) return MagicMock( res_data=[res_data], @@ -996,10 +944,7 @@ class MockEppLib(TestCase): return MagicMock(res_data=[mocked_result]) def mockCreateContactCommands(self, _request, cleaned): - if ( - getattr(_request, "id", None) == "fail" - and self.mockedSendFunction.call_count == 3 - ): + if getattr(_request, "id", None) == "fail" and self.mockedSendFunction.call_count == 3: # use this for when a contact is being updated # sets the second send() to fail raise RegistryError(code=ErrorCode.OBJECT_EXISTS) @@ -1019,9 +964,7 @@ class MockEppLib(TestCase): self.mockedSendFunction = self.mockSendPatch.start() self.mockedSendFunction.side_effect = self.mockSend - def _convertPublicContactToEpp( - self, contact: PublicContact, disclose_email=False, createContact=True - ): + def _convertPublicContactToEpp(self, contact: PublicContact, disclose_email=False, createContact=True): DF = common.DiscloseField fields = {DF.EMAIL} @@ -1033,9 +976,7 @@ class MockEppLib(TestCase): # check docs here looks like we may have more than one address field but addr = common.ContactAddr( [ - getattr(contact, street) - for street in ["street1", "street2", "street3"] - if hasattr(contact, street) + getattr(contact, street) for street in ["street1", "street2", "street3"] if hasattr(contact, street) ], # type: ignore city=contact.city, pc=contact.pc, diff --git a/src/registrar/tests/test_admin.py b/src/registrar/tests/test_admin.py index f2db9d5ee..7e1c3301d 100644 --- a/src/registrar/tests/test_admin.py +++ b/src/registrar/tests/test_admin.py @@ -67,9 +67,7 @@ class TestDomainAdmin(MockEppLib): # for our actual application self.assertContains(response, "Federal", count=4) # This may be a bit more robust - self.assertContains( - response, '