Merge MAIN into nl/926-transition-domain-object-creation

Signed-off-by: CocoByte <nicolle.leclair@gmail.com>
This commit is contained in:
CocoByte 2023-10-10 11:08:54 -06:00
commit 87d4fa8f78
No known key found for this signature in database
GPG key ID: BBFAA2526384C97F
34 changed files with 1789 additions and 474 deletions

View file

@ -1,34 +1,35 @@
name: Issue name: Issue
description: Capture uncategorized work or content description: Describe an idea, feature, content, or non-bug finding
body: body:
- type: markdown - type: markdown
id: help id: title-help
attributes: attributes:
value: | value: |
> **Note** > Titles should be short, descriptive, and compelling.
> GitHub Issues use [GitHub Flavored Markdown](https://docs.github.com/en/get-started/writing-on-github/getting-started-with-writing-and-formatting-on-github/basic-writing-and-formatting-syntax) for formatting.
- type: textarea - type: textarea
id: issue id: issue-description
attributes: attributes:
label: Issue Description label: Issue description and context
description: | description: |
Describe the issue you are adding or content you are suggesting. Describe the issue so that someone who wasn't present for its discovery can understand the problem and why it matters. Use full sentences, plain language, and good [formatting](https://docs.github.com/en/get-started/writing-on-github/getting-started-with-writing-and-formatting-on-github/basic-writing-and-formatting-syntax). Share desired outcomes or potential next steps. Images or links to other content/context (like documents or Slack discussions) are welcome.
Share any next steps that should be taken our outcomes that would be beneficial.
validations: validations:
required: true required: true
- type: textarea - type: textarea
id: additional-context id: acceptance-criteria
attributes: attributes:
label: Additional Context (optional) label: Acceptance criteria
description: "Include additional references (screenshots, design links, documentation, etc.) that are relevant" description: "If known, share 1-3 statements that would need to be true for this issue to be considered resolved. Use a [task list](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/about-task-lists#creating-task-lists) if appropriate."
placeholder: "- [ ] The button does the thing."
- type: textarea - type: textarea
id: issue-links id: links-to-other-issues
attributes: attributes:
label: Issue Links label: Links to other issues
description: | description: |
What other issues does this story relate to and how? Add the issue #number of other issues this relates to and how (e.g., 🚧 Blocks, ⛔️ Is blocked by, 🔄 Relates to).
placeholder: 🔄 Relates to...
Example: - type: markdown
- 🚧 Blocked by: #123 id: note
- 🔄 Relates to: #234 attributes:
value: |
> We may edit this issue's text to document our understanding and clarify the product work.

View file

@ -80,7 +80,7 @@ The endpoint /admin can be used to view and manage site content, including but n
1. Login via login.gov 1. Login via login.gov
2. Go to the home page and make sure you can see the part where you can submit an application 2. Go to the home page and make sure you can see the part where you can submit an application
3. Go to /admin and it will tell you that UUID is not authorized, copy that UUID for use in 4 3. Go to /admin and it will tell you that UUID is not authorized, copy that UUID for use in 4
4. in src/registrar/fixtures.py add to the `ADMINS` list in that file by adding your UUID as your username along with your first and last name. See below: 4. in src/registrar/fixtures_users.py add to the `ADMINS` list in that file by adding your UUID as your username along with your first and last name. See below:
``` ```
ADMINS = [ ADMINS = [
@ -102,7 +102,7 @@ Analysts are a variant of the admin role with limited permissions. The process f
1. Login via login.gov (if you already exist as an admin, you will need to create a separate login.gov account for this: i.e. first.last+1@email.com) 1. Login via login.gov (if you already exist as an admin, you will need to create a separate login.gov account for this: i.e. first.last+1@email.com)
2. Go to the home page and make sure you can see the part where you can submit an application 2. Go to the home page and make sure you can see the part where you can submit an application
3. Go to /admin and it will tell you that UUID is not authorized, copy that UUID for use in 4 (this will be a different UUID than the one obtained from creating an admin) 3. Go to /admin and it will tell you that UUID is not authorized, copy that UUID for use in 4 (this will be a different UUID than the one obtained from creating an admin)
4. in src/registrar/fixtures.py add to the `STAFF` list in that file by adding your UUID as your username along with your first and last name. See below: 4. in src/registrar/fixtures_users.py add to the `STAFF` list in that file by adding your UUID as your username along with your first and last name. See below:
``` ```
STAFF = [ STAFF = [
@ -145,7 +145,7 @@ You can change the logging verbosity, if needed. Do a web search for "django log
## Mock data ## Mock data
There is a `post_migrate` signal in [signals.py](../../src/registrar/signals.py) that will load the fixtures from [fixtures.py](../../src/registrar/fixtures.py), giving you some test data to play with while developing. There is a `post_migrate` signal in [signals.py](../../src/registrar/signals.py) that will load the fixtures from [fixtures_user.py](../../src/registrar/fixtures_users.py) and [fixtures_applications.py](../../src/registrar/fixtures_applications.py), giving you some test data to play with while developing.
See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures. See the [database-access README](./database-access.md) for information on how to pull data to update these fixtures.

View file

@ -48,3 +48,7 @@ future, as we add additional roles that our product vision calls for
(read-only? editing only some information?), we need to add conditional (read-only? editing only some information?), we need to add conditional
behavior in the permission mixin, or additional mixins that more clearly behavior in the permission mixin, or additional mixins that more clearly
express what is allowed for those new roles. express what is allowed for those new roles.
# Admin User Permissions
Refer to [Django Admin Roles](../django-admin/roles.md)

View file

@ -1,21 +1,19 @@
# Django admin user roles # Django admin user roles
Roles other than superuser should be defined in authentication and authorization groups in django admin For our MVP, we create and maintain 2 admin roles:
Full access and CISA analyst. Both have the role `staff`.
Permissions on these roles are set through groups:
`full_access_group` and `cisa_analysts_group`. These
groups and the methods to create them are defined in
our `user_group` model and run in a migration.
## Superuser For more details, refer to the [user group model](../../src/registrar/models/user_group.py).
Full access ## Editing group permissions through code
## CISA analyst We can edit and deploy new group permissions by:
### Basic permission level 1. editing `user_group` then:
2. Duplicating migration `0036_create_groups_01`
Staff and running migrations (append the name with a version number
to help django detect the migration eg 0037_create_groups_02)
### Additional group permissions
auditlog | log entry | can view log entry
registrar | contact | can view contact
registrar | domain application | can change domain application
registrar | domain | can view domain
registrar | user | can view user

View file

@ -89,7 +89,8 @@ command in the running Cloud.gov container. For example, to run our Django
admin command that loads test fixture data: admin command that loads test fixture data:
``` ```
cf run-task getgov-{environment} --command "./manage.py load" --name fixtures cf run-task getgov-{environment} --command "./manage.py load" --name fixtures--users
cf run-task getgov-{environment} --command "./manage.py load" --name fixtures--applications
``` ```
However, this task runs asynchronously in the background without any command However, this task runs asynchronously in the background without any command

2
src/Pipfile.lock generated
View file

@ -353,7 +353,7 @@
}, },
"fred-epplib": { "fred-epplib": {
"git": "https://github.com/cisagov/epplib.git", "git": "https://github.com/cisagov/epplib.git",
"ref": "f818cbf0b069a12f03e1d72e4b9f4900924b832d" "ref": "d56d183f1664f34c40ca9716a3a9a345f0ef561c"
}, },
"furl": { "furl": {
"hashes": [ "hashes": [

View file

@ -39,7 +39,7 @@ class AvailableViewTest(TestCase):
self.assertIn("gsa.gov", domains) self.assertIn("gsa.gov", domains)
# entries are all lowercase so GSA.GOV is not in the set # entries are all lowercase so GSA.GOV is not in the set
self.assertNotIn("GSA.GOV", domains) self.assertNotIn("GSA.GOV", domains)
self.assertNotIn("igorville.gov", domains) self.assertNotIn("igorvilleremixed.gov", domains)
# all the entries have dots # all the entries have dots
self.assertNotIn("gsa", domains) self.assertNotIn("gsa", domains)
@ -48,7 +48,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found # input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA.GOV")) self.assertTrue(in_domains("GSA.GOV"))
# This domain should not have been registered # This domain should not have been registered
self.assertFalse(in_domains("igorville.gov")) self.assertFalse(in_domains("igorvilleremixed.gov"))
def test_in_domains_dotgov(self): def test_in_domains_dotgov(self):
"""Domain searches work without trailing .gov""" """Domain searches work without trailing .gov"""
@ -56,7 +56,7 @@ class AvailableViewTest(TestCase):
# input is lowercased so GSA.GOV should be found # input is lowercased so GSA.GOV should be found
self.assertTrue(in_domains("GSA")) self.assertTrue(in_domains("GSA"))
# This domain should not have been registered # This domain should not have been registered
self.assertFalse(in_domains("igorville")) self.assertFalse(in_domains("igorvilleremixed"))
def test_not_available_domain(self): def test_not_available_domain(self):
"""gsa.gov is not available""" """gsa.gov is not available"""
@ -66,17 +66,17 @@ class AvailableViewTest(TestCase):
self.assertFalse(json.loads(response.content)["available"]) self.assertFalse(json.loads(response.content)["available"])
def test_available_domain(self): def test_available_domain(self):
"""igorville.gov is still available""" """igorvilleremixed.gov is still available"""
request = self.factory.get(API_BASE_PATH + "igorville.gov") request = self.factory.get(API_BASE_PATH + "igorvilleremixed.gov")
request.user = self.user request.user = self.user
response = available(request, domain="igorville.gov") response = available(request, domain="igorvilleremixed.gov")
self.assertTrue(json.loads(response.content)["available"]) self.assertTrue(json.loads(response.content)["available"])
def test_available_domain_dotgov(self): def test_available_domain_dotgov(self):
"""igorville.gov is still available even without the .gov suffix""" """igorvilleremixed.gov is still available even without the .gov suffix"""
request = self.factory.get(API_BASE_PATH + "igorville") request = self.factory.get(API_BASE_PATH + "igorvilleremixed")
request.user = self.user request.user = self.user
response = available(request, domain="igorville") response = available(request, domain="igorvilleremixed")
self.assertTrue(json.loads(response.content)["available"]) self.assertTrue(json.loads(response.content)["available"])
def test_error_handling(self): def test_error_handling(self):

View file

@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
NAMESPACE = SimpleNamespace( NAMESPACE = SimpleNamespace(
EPP="urn:ietf:params:xml:ns:epp-1.0", EPP="urn:ietf:params:xml:ns:epp-1.0",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1",
XSI="http://www.w3.org/2001/XMLSchema-instance", XSI="http://www.w3.org/2001/XMLSchema-instance",
FRED="noop", FRED="noop",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0", NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0",
@ -25,6 +26,7 @@ NAMESPACE = SimpleNamespace(
SCHEMA_LOCATION = SimpleNamespace( SCHEMA_LOCATION = SimpleNamespace(
XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd", XSI="urn:ietf:params:xml:ns:epp-1.0 epp-1.0.xsd",
FRED="noop fred-1.5.0.xsd", FRED="noop fred-1.5.0.xsd",
SEC_DNS="urn:ietf:params:xml:ns:secDNS-1.1 secDNS-1.1.xsd",
NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd", NIC_CONTACT="urn:ietf:params:xml:ns:contact-1.0 contact-1.0.xsd",
NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd", NIC_DOMAIN="urn:ietf:params:xml:ns:domain-1.0 domain-1.0.xsd",
NIC_ENUMVAL="noop enumval-1.2.0.xsd", NIC_ENUMVAL="noop enumval-1.2.0.xsd",
@ -44,7 +46,8 @@ except NameError:
try: try:
from .client import CLIENT, commands from .client import CLIENT, commands
from .errors import RegistryError, ErrorCode from .errors import RegistryError, ErrorCode
from epplib.models import common from epplib.models import common, info
from epplib.responses import extensions
from epplib import responses from epplib import responses
except ImportError: except ImportError:
pass pass
@ -53,7 +56,9 @@ __all__ = [
"CLIENT", "CLIENT",
"commands", "commands",
"common", "common",
"extensions",
"responses", "responses",
"info",
"ErrorCode", "ErrorCode",
"RegistryError", "RegistryError",
] ]

View file

@ -3,6 +3,7 @@ from django import forms
from django_fsm import get_available_FIELD_transitions from django_fsm import get_available_FIELD_transitions
from django.contrib import admin, messages from django.contrib import admin, messages
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.http.response import HttpResponseRedirect from django.http.response import HttpResponseRedirect
from django.urls import reverse from django.urls import reverse
@ -137,11 +138,24 @@ class MyUserAdmin(BaseUserAdmin):
"email", "email",
"first_name", "first_name",
"last_name", "last_name",
"is_staff", "group",
"is_superuser",
"status", "status",
) )
list_filter = (
"is_active",
"groups",
)
# Let's define First group
# (which should in theory be the ONLY group)
def group(self, obj):
if obj.groups.filter(name="full_access_group").exists():
return "Full access"
elif obj.groups.filter(name="cisa_analysts_group").exists():
return "Analyst"
return ""
fieldsets = ( fieldsets = (
( (
None, None,
@ -163,6 +177,9 @@ class MyUserAdmin(BaseUserAdmin):
("Important dates", {"fields": ("last_login", "date_joined")}), ("Important dates", {"fields": ("last_login", "date_joined")}),
) )
# Hide Username (uuid), Groups and Permissions
# Q: Now that we're using Groups and Permissions,
# do we expose those to analysts to view?
analyst_fieldsets = ( analyst_fieldsets = (
( (
None, None,
@ -174,14 +191,23 @@ class MyUserAdmin(BaseUserAdmin):
{ {
"fields": ( "fields": (
"is_active", "is_active",
"is_staff", "groups",
"is_superuser",
) )
}, },
), ),
("Important dates", {"fields": ("last_login", "date_joined")}), ("Important dates", {"fields": ("last_login", "date_joined")}),
) )
analyst_list_display = [
"email",
"first_name",
"last_name",
"group",
"status",
]
# NOT all fields are readonly for admin, otherwise we would have
# set this at the permissions level. The exception is 'status'
analyst_readonly_fields = [ analyst_readonly_fields = [
"password", "password",
"Personal Info", "Personal Info",
@ -190,43 +216,42 @@ class MyUserAdmin(BaseUserAdmin):
"email", "email",
"Permissions", "Permissions",
"is_active", "is_active",
"is_staff", "groups",
"is_superuser",
"Important dates", "Important dates",
"last_login", "last_login",
"date_joined", "date_joined",
] ]
def get_list_display(self, request): def get_list_display(self, request):
if not request.user.is_superuser: # The full_access_permission perm will load onto the full_access_group
# Customize the list display for staff users # which is equivalent to superuser. The other group we use to manage
return ( # perms is cisa_analysts_group. cisa_analysts_group will never contain
"email", # full_access_permission
"first_name", if request.user.has_perm("registrar.full_access_permission"):
"last_name", # Use the default list display for all access users
"is_staff",
"is_superuser",
"status",
)
# Use the default list display for non-staff users
return super().get_list_display(request) return super().get_list_display(request)
def get_fieldsets(self, request, obj=None): # Customize the list display for analysts
if not request.user.is_superuser: return self.analyst_list_display
# If the user doesn't have permission to change the model,
# show a read-only fieldset
return self.analyst_fieldsets
# If the user has permission to change the model, show all fields def get_fieldsets(self, request, obj=None):
if request.user.has_perm("registrar.full_access_permission"):
# Show all fields for all access users
return super().get_fieldsets(request, obj) return super().get_fieldsets(request, obj)
elif request.user.has_perm("registrar.analyst_access_permission"):
# show analyst_fieldsets for analysts
return self.analyst_fieldsets
else:
# any admin user should belong to either full_access_group
# or cisa_analyst_group
return []
def get_readonly_fields(self, request, obj=None): def get_readonly_fields(self, request, obj=None):
if request.user.is_superuser: if request.user.has_perm("registrar.full_access_permission"):
return () # No read-only fields for superusers return () # No read-only fields for all access users
elif request.user.is_staff: # Return restrictive Read-only fields for analysts and
return self.analyst_readonly_fields # Read-only fields for staff # users who might not belong to groups
return () # No read-only fields for other users return self.analyst_readonly_fields
class HostIPInline(admin.StackedInline): class HostIPInline(admin.StackedInline):
@ -405,11 +430,12 @@ class DomainInformationAdmin(ListHeaderAdmin):
readonly_fields = list(self.readonly_fields) readonly_fields = list(self.readonly_fields)
if request.user.is_superuser: if request.user.has_perm("registrar.full_access_permission"):
return readonly_fields return readonly_fields
else: # Return restrictive Read-only fields for analysts and
# users who might not belong to groups
readonly_fields.extend([field for field in self.analyst_readonly_fields]) readonly_fields.extend([field for field in self.analyst_readonly_fields])
return readonly_fields return readonly_fields # Read-only fields for analysts
class DomainApplicationAdminForm(forms.ModelForm): class DomainApplicationAdminForm(forms.ModelForm):
@ -623,9 +649,10 @@ class DomainApplicationAdmin(ListHeaderAdmin):
["current_websites", "other_contacts", "alternative_domains"] ["current_websites", "other_contacts", "alternative_domains"]
) )
if request.user.is_superuser: if request.user.has_perm("registrar.full_access_permission"):
return readonly_fields return readonly_fields
else: # Return restrictive Read-only fields for analysts and
# users who might not belong to groups
readonly_fields.extend([field for field in self.analyst_readonly_fields]) readonly_fields.extend([field for field in self.analyst_readonly_fields])
return readonly_fields return readonly_fields
@ -870,7 +897,9 @@ class DomainAdmin(ListHeaderAdmin):
# Fixes a bug wherein users which are only is_staff # Fixes a bug wherein users which are only is_staff
# can access 'change' when GET, # can access 'change' when GET,
# but cannot access this page when it is a request of type POST. # but cannot access this page when it is a request of type POST.
if request.user.is_staff: if request.user.has_perm(
"registrar.full_access_permission"
) or request.user.has_perm("registrar.analyst_access_permission"):
return True return True
return super().has_change_permission(request, obj) return super().has_change_permission(request, obj)
@ -885,6 +914,10 @@ class DraftDomainAdmin(ListHeaderAdmin):
admin.site.unregister(LogEntry) # Unregister the default registration admin.site.unregister(LogEntry) # Unregister the default registration
admin.site.register(LogEntry, CustomLogEntryAdmin) admin.site.register(LogEntry, CustomLogEntryAdmin)
admin.site.register(models.User, MyUserAdmin) admin.site.register(models.User, MyUserAdmin)
# Unregister the built-in Group model
admin.site.unregister(Group)
# Register UserGroup
admin.site.register(models.UserGroup)
admin.site.register(models.UserDomainRole, UserDomainRoleAdmin) admin.site.register(models.UserDomainRole, UserDomainRoleAdmin)
admin.site.register(models.Contact, ContactAdmin) admin.site.register(models.Contact, ContactAdmin)
admin.site.register(models.DomainInvitation, DomainInvitationAdmin) admin.site.register(models.DomainInvitation, DomainInvitationAdmin)

View file

@ -10,244 +10,10 @@ from registrar.models import (
Website, Website,
) )
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
fake = Faker() fake = Faker()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UserFixture:
"""
Load users into the database.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
ADMINS = [
{
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
"first_name": "Rachid",
"last_name": "Mrad",
},
{
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
"first_name": "Alysia",
"last_name": "Broddrick",
},
{
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
"first_name": "Katherine",
"last_name": "Osos",
},
{
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
"first_name": "Gaby",
"last_name": "DiSarli",
},
{
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
"first_name": "Cameron",
"last_name": "Dixon",
},
{
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
"first_name": "Ryan",
"last_name": "Brooks",
},
{
"username": "30001ee7-0467-4df2-8db2-786e79606060",
"first_name": "Zander",
"last_name": "Adkinson",
},
{
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
"first_name": "Paul",
"last_name": "Kuykendall",
},
{
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
"first_name": "Rebecca",
"last_name": "Hsieh",
},
{
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
"first_name": "David",
"last_name": "Kennedy",
},
{
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
"first_name": "Nicolle",
"last_name": "LeClair",
},
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
},
]
STAFF = [
{
"username": "319c490d-453b-43d9-bc4d-7d6cd8ff6844",
"first_name": "Rachid-Analyst",
"last_name": "Mrad-Analyst",
"email": "rachid.mrad@gmail.com",
},
{
"username": "b6a15987-5c88-4e26-8de2-ca71a0bdb2cd",
"first_name": "Alysia-Analyst",
"last_name": "Alysia-Analyst",
},
{
"username": "91a9b97c-bd0a-458d-9823-babfde7ebf44",
"first_name": "Katherine-Analyst",
"last_name": "Osos-Analyst",
"email": "kosos@truss.works",
},
{
"username": "2cc0cde8-8313-4a50-99d8-5882e71443e8",
"first_name": "Zander-Analyst",
"last_name": "Adkinson-Analyst",
},
{
"username": "57ab5847-7789-49fe-a2f9-21d38076d699",
"first_name": "Paul-Analyst",
"last_name": "Kuykendall-Analyst",
},
{
"username": "e474e7a9-71ca-449d-833c-8a6e094dd117",
"first_name": "Rebecca-Analyst",
"last_name": "Hsieh-Analyst",
},
{
"username": "5dc6c9a6-61d9-42b4-ba54-4beff28bac3c",
"first_name": "David-Analyst",
"last_name": "Kennedy-Analyst",
},
{
"username": "0eb6f326-a3d4-410f-a521-aa4c1fad4e47",
"first_name": "Gaby-Analyst",
"last_name": "DiSarli-Analyst",
"email": "gaby@truss.works",
},
{
"username": "cfe7c2fc-e24a-480e-8b78-28645a1459b3",
"first_name": "Nicolle-Analyst",
"last_name": "LeClair-Analyst",
"email": "nicolle.leclair@ecstech.com",
},
{
"username": "378d0bc4-d5a7-461b-bd84-3ae6f6864af9",
"first_name": "Erin-Analyst",
"last_name": "Song-Analyst",
"email": "erin.song+1@gsa.gov",
},
]
STAFF_PERMISSIONS = [
{
"app_label": "auditlog",
"model": "logentry",
"permissions": ["view_logentry"],
},
{"app_label": "registrar", "model": "contact", "permissions": ["view_contact"]},
{
"app_label": "registrar",
"model": "domaininformation",
"permissions": ["change_domaininformation"],
},
{
"app_label": "registrar",
"model": "domainapplication",
"permissions": ["change_domainapplication"],
},
{"app_label": "registrar", "model": "domain", "permissions": ["view_domain"]},
{
"app_label": "registrar",
"model": "draftdomain",
"permissions": ["change_draftdomain"],
},
{"app_label": "registrar", "model": "user", "permissions": ["change_user"]},
]
@classmethod
def load(cls):
logger.info("Going to load %s superusers" % str(len(cls.ADMINS)))
for admin in cls.ADMINS:
try:
user, _ = User.objects.get_or_create(
username=admin["username"],
)
user.is_superuser = True
user.first_name = admin["first_name"]
user.last_name = admin["last_name"]
if "email" in admin.keys():
user.email = admin["email"]
user.is_staff = True
user.is_active = True
user.save()
logger.debug("User object created for %s" % admin["first_name"])
except Exception as e:
logger.warning(e)
logger.info("All superusers loaded.")
logger.info("Going to load %s CISA analysts (staff)" % str(len(cls.STAFF)))
for staff in cls.STAFF:
try:
user, _ = User.objects.get_or_create(
username=staff["username"],
)
user.is_superuser = False
user.first_name = staff["first_name"]
user.last_name = staff["last_name"]
if "email" in admin.keys():
user.email = admin["email"]
user.is_staff = True
user.is_active = True
for permission in cls.STAFF_PERMISSIONS:
app_label = permission["app_label"]
model_name = permission["model"]
permissions = permission["permissions"]
# Retrieve the content type for the app and model
content_type = ContentType.objects.get(
app_label=app_label, model=model_name
)
# Retrieve the permissions based on their codenames
permissions = Permission.objects.filter(
content_type=content_type, codename__in=permissions
)
# Assign the permissions to the user
user.user_permissions.add(*permissions)
# Convert the permissions QuerySet to a list of codenames
permission_list = list(
permissions.values_list("codename", flat=True)
)
logger.debug(
app_label
+ " | "
+ model_name
+ " | "
+ ", ".join(permission_list)
+ " added for user "
+ staff["first_name"]
)
user.save()
logger.debug("User object created for %s" % staff["first_name"])
except Exception as e:
logger.warning(e)
logger.info("All CISA analysts (staff) loaded.")
class DomainApplicationFixture: class DomainApplicationFixture:
""" """
Load domain applications into the database. Load domain applications into the database.

View file

@ -0,0 +1,177 @@
import logging
from faker import Faker
from registrar.models import (
User,
UserGroup,
)
fake = Faker()
logger = logging.getLogger(__name__)
class UserFixture:
"""
Load users into the database.
Make sure this class' `load` method is called from `handle`
in management/commands/load.py, then use `./manage.py load`
to run this code.
"""
ADMINS = [
{
"username": "5f283494-31bd-49b5-b024-a7e7cae00848",
"first_name": "Rachid",
"last_name": "Mrad",
},
{
"username": "eb2214cd-fc0c-48c0-9dbd-bc4cd6820c74",
"first_name": "Alysia",
"last_name": "Broddrick",
},
{
"username": "8f8e7293-17f7-4716-889b-1990241cbd39",
"first_name": "Katherine",
"last_name": "Osos",
},
{
"username": "70488e0a-e937-4894-a28c-16f5949effd4",
"first_name": "Gaby",
"last_name": "DiSarli",
},
{
"username": "83c2b6dd-20a2-4cac-bb40-e22a72d2955c",
"first_name": "Cameron",
"last_name": "Dixon",
},
{
"username": "0353607a-cbba-47d2-98d7-e83dcd5b90ea",
"first_name": "Ryan",
"last_name": "Brooks",
},
{
"username": "30001ee7-0467-4df2-8db2-786e79606060",
"first_name": "Zander",
"last_name": "Adkinson",
},
{
"username": "2bf518c2-485a-4c42-ab1a-f5a8b0a08484",
"first_name": "Paul",
"last_name": "Kuykendall",
},
{
"username": "2a88a97b-be96-4aad-b99e-0b605b492c78",
"first_name": "Rebecca",
"last_name": "Hsieh",
},
{
"username": "fa69c8e8-da83-4798-a4f2-263c9ce93f52",
"first_name": "David",
"last_name": "Kennedy",
},
{
"username": "f14433d8-f0e9-41bf-9c72-b99b110e665d",
"first_name": "Nicolle",
"last_name": "LeClair",
},
{
"username": "24840450-bf47-4d89-8aa9-c612fe68f9da",
"first_name": "Erin",
"last_name": "Song",
},
{
"username": "e0ea8b94-6e53-4430-814a-849a7ca45f21",
"first_name": "Kristina",
"last_name": "Yin",
},
]
STAFF = [
{
"username": "319c490d-453b-43d9-bc4d-7d6cd8ff6844",
"first_name": "Rachid-Analyst",
"last_name": "Mrad-Analyst",
"email": "rachid.mrad@gmail.com",
},
{
"username": "b6a15987-5c88-4e26-8de2-ca71a0bdb2cd",
"first_name": "Alysia-Analyst",
"last_name": "Alysia-Analyst",
},
{
"username": "91a9b97c-bd0a-458d-9823-babfde7ebf44",
"first_name": "Katherine-Analyst",
"last_name": "Osos-Analyst",
"email": "kosos@truss.works",
},
{
"username": "2cc0cde8-8313-4a50-99d8-5882e71443e8",
"first_name": "Zander-Analyst",
"last_name": "Adkinson-Analyst",
},
{
"username": "57ab5847-7789-49fe-a2f9-21d38076d699",
"first_name": "Paul-Analyst",
"last_name": "Kuykendall-Analyst",
},
{
"username": "e474e7a9-71ca-449d-833c-8a6e094dd117",
"first_name": "Rebecca-Analyst",
"last_name": "Hsieh-Analyst",
},
{
"username": "5dc6c9a6-61d9-42b4-ba54-4beff28bac3c",
"first_name": "David-Analyst",
"last_name": "Kennedy-Analyst",
},
{
"username": "0eb6f326-a3d4-410f-a521-aa4c1fad4e47",
"first_name": "Gaby-Analyst",
"last_name": "DiSarli-Analyst",
"email": "gaby@truss.works",
},
{
"username": "cfe7c2fc-e24a-480e-8b78-28645a1459b3",
"first_name": "Nicolle-Analyst",
"last_name": "LeClair-Analyst",
"email": "nicolle.leclair@ecstech.com",
},
{
"username": "378d0bc4-d5a7-461b-bd84-3ae6f6864af9",
"first_name": "Erin-Analyst",
"last_name": "Song-Analyst",
"email": "erin.song+1@gsa.gov",
},
{
"username": "9a98e4c9-9409-479d-964e-4aec7799107f",
"first_name": "Kristina-Analyst",
"last_name": "Yin-Analyst",
"email": "kristina.yin+1@gsa.gov",
},
]
def load_users(cls, users, group_name):
logger.info(f"Going to load {len(users)} users in group {group_name}")
for user_data in users:
try:
user, _ = User.objects.get_or_create(username=user_data["username"])
user.is_superuser = False
user.first_name = user_data["first_name"]
user.last_name = user_data["last_name"]
if "email" in user_data:
user.email = user_data["email"]
user.is_staff = True
user.is_active = True
group = UserGroup.objects.get(name=group_name)
user.groups.add(group)
user.save()
logger.debug(f"User object created for {user_data['first_name']}")
except Exception as e:
logger.warning(e)
logger.info(f"All users in group {group_name} loaded.")
@classmethod
def load(cls):
cls.load_users(cls, cls.ADMINS, "full_access_group")
cls.load_users(cls, cls.STAFF, "cisa_analysts_group")

View file

@ -4,7 +4,8 @@ from django.core.management.base import BaseCommand
from auditlog.context import disable_auditlog # type: ignore from auditlog.context import disable_auditlog # type: ignore
from registrar.fixtures import UserFixture, DomainApplicationFixture, DomainFixture from registrar.fixtures_users import UserFixture
from registrar.fixtures_applications import DomainApplicationFixture, DomainFixture
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -0,0 +1,17 @@
# Generated by Django 4.2.1 on 2023-10-02 22:07
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("registrar", "0032_alter_transitiondomain_status"),
]
operations = [
migrations.AlterField(
model_name="userdomainrole",
name="role",
field=models.TextField(choices=[("manager", "Admin")]),
),
]

View file

@ -0,0 +1,39 @@
# Generated by Django 4.2.1 on 2023-09-20 19:04
import django.contrib.auth.models
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("auth", "0012_alter_user_first_name_max_length"),
("registrar", "0033_alter_userdomainrole_role"),
]
operations = [
migrations.CreateModel(
name="UserGroup",
fields=[
(
"group_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="auth.group",
),
),
],
options={
"verbose_name": "User group",
"verbose_name_plural": "User groups",
},
bases=("auth.group",),
managers=[
("objects", django.contrib.auth.models.GroupManager()),
],
),
]

View file

@ -0,0 +1,21 @@
# Generated by Django 4.2.1 on 2023-09-27 18:53
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("registrar", "0034_usergroup"),
]
operations = [
migrations.AlterModelOptions(
name="user",
options={
"permissions": [
("analyst_access_permission", "Analyst Access Permission"),
("full_access_permission", "Full Access Permission"),
]
},
),
]

View file

@ -0,0 +1,43 @@
# From mithuntnt's answer on:
# https://stackoverflow.com/questions/26464838/getting-model-contenttype-in-migration-django-1-7
# The problem is that ContentType and Permission objects are not already created
# while we're still running migrations, so we'll go ahead and speed up that process
# a bit before we attempt to create groups which require Permissions and ContentType.
from django.conf import settings
from django.db import migrations
def create_all_contenttypes(**kwargs):
from django.apps import apps
from django.contrib.contenttypes.management import create_contenttypes
for app_config in apps.get_app_configs():
create_contenttypes(app_config, **kwargs)
def create_all_permissions(**kwargs):
from django.contrib.auth.management import create_permissions
from django.apps import apps
for app_config in apps.get_app_configs():
create_permissions(app_config, **kwargs)
def forward(apps, schema_editor):
create_all_contenttypes()
create_all_permissions()
def backward(apps, schema_editor):
pass
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("contenttypes", "0002_remove_content_type_name"),
("registrar", "0035_alter_user_options"),
]
operations = [migrations.RunPython(forward, backward)]

View file

@ -0,0 +1,34 @@
# This migration creates the create_full_access_group and create_cisa_analyst_group groups
# It is dependent on 0035 (which populates ContentType and Permissions)
# If permissions on the groups need changing, edit CISA_ANALYST_GROUP_PERMISSIONS
# in the user_group model then:
# step 1: docker-compose exec app ./manage.py migrate --fake registrar 0035_contenttypes_permissions
# step 2: docker-compose exec app ./manage.py migrate registrar 0036_create_groups
# step 3: fake run the latest migration in the migrations list
# Alternatively:
# Only step: duplicate the migtation that loads data and run: docker-compose exec app ./manage.py migrate
from django.db import migrations
from registrar.models import UserGroup
from typing import Any
# For linting: RunPython expects a function reference,
# so let's give it one
def create_groups(apps, schema_editor) -> Any:
UserGroup.create_cisa_analyst_group(apps, schema_editor)
UserGroup.create_full_access_group(apps, schema_editor)
class Migration(migrations.Migration):
dependencies = [
("registrar", "0036_contenttypes_permissions"),
]
operations = [
migrations.RunPython(
create_groups,
reverse_code=migrations.RunPython.noop,
atomic=True,
),
]

View file

@ -12,6 +12,7 @@ from .nameserver import Nameserver
from .user_domain_role import UserDomainRole from .user_domain_role import UserDomainRole
from .public_contact import PublicContact from .public_contact import PublicContact
from .user import User from .user import User
from .user_group import UserGroup
from .website import Website from .website import Website
from .transition_domain import TransitionDomain from .transition_domain import TransitionDomain
@ -28,6 +29,7 @@ __all__ = [
"UserDomainRole", "UserDomainRole",
"PublicContact", "PublicContact",
"User", "User",
"UserGroup",
"Website", "Website",
"TransitionDomain", "TransitionDomain",
] ]
@ -42,6 +44,7 @@ auditlog.register(Host)
auditlog.register(Nameserver) auditlog.register(Nameserver)
auditlog.register(UserDomainRole) auditlog.register(UserDomainRole)
auditlog.register(PublicContact) auditlog.register(PublicContact)
auditlog.register(User) auditlog.register(User, m2m_fields=["user_permissions", "groups"])
auditlog.register(UserGroup, m2m_fields=["permissions"])
auditlog.register(Website) auditlog.register(Website)
auditlog.register(TransitionDomain) auditlog.register(TransitionDomain)

View file

@ -1,5 +1,5 @@
from itertools import zip_longest
import logging import logging
from datetime import date from datetime import date
from string import digits from string import digits
from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignore
@ -10,9 +10,12 @@ from epplibwrapper import (
CLIENT as registry, CLIENT as registry,
commands, commands,
common as epp, common as epp,
extensions,
info as eppInfo,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
from registrar.models.utility.contact_error import ContactError
from .utility.domain_field import DomainField from .utility.domain_field import DomainField
from .utility.domain_helper import DomainHelper from .utility.domain_helper import DomainHelper
@ -279,6 +282,27 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error("Error _create_host, code was %s error was %s" % (e.code, e)) logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code return e.code
@Cache
def dnssecdata(self) -> extensions.DNSSECExtension:
return self._get_property("dnssecdata")
@dnssecdata.setter # type: ignore
def dnssecdata(self, _dnssecdata: extensions.DNSSECExtension):
updateParams = {
"maxSigLife": _dnssecdata.get("maxSigLife", None),
"dsData": _dnssecdata.get("dsData", None),
"keyData": _dnssecdata.get("keyData", None),
"remAllDsKeyData": True,
}
request = commands.UpdateDomain(name=self.name)
extension = commands.UpdateDomainDNSSECExtension(**updateParams)
request.add_extension(extension)
try:
registry.send(request, cleaned=True)
except RegistryError as e:
logger.error("Error adding DNSSEC, code was %s error was %s" % (e.code, e))
raise e
@nameservers.setter # type: ignore @nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]): def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are """host should be a tuple of type str, str,... where the elements are
@ -352,9 +376,9 @@ class Domain(TimeStampedModel, DomainHelper):
raise NotImplementedError() raise NotImplementedError()
@Cache @Cache
def registrant_contact(self) -> PublicContact: def registrant_contact(self) -> PublicContact | None:
"""Get or set the registrant for this domain.""" registrant = PublicContact.ContactTypeChoices.REGISTRANT
raise NotImplementedError() return self.generic_contact_getter(registrant)
@registrant_contact.setter # type: ignore @registrant_contact.setter # type: ignore
def registrant_contact(self, contact: PublicContact): def registrant_contact(self, contact: PublicContact):
@ -367,9 +391,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def administrative_contact(self) -> PublicContact: def administrative_contact(self) -> PublicContact | None:
"""Get or set the admin contact for this domain.""" """Get the admin contact for this domain."""
raise NotImplementedError() admin = PublicContact.ContactTypeChoices.ADMINISTRATIVE
return self.generic_contact_getter(admin)
@administrative_contact.setter # type: ignore @administrative_contact.setter # type: ignore
def administrative_contact(self, contact: PublicContact): def administrative_contact(self, contact: PublicContact):
@ -381,12 +406,6 @@ class Domain(TimeStampedModel, DomainHelper):
self._make_contact_in_registry(contact=contact) self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False) self._update_domain_with_contact(contact, rem=False)
def get_default_security_contact(self):
logger.info("getting default sec contact")
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def _update_epp_contact(self, contact: PublicContact): def _update_epp_contact(self, contact: PublicContact):
"""Sends UpdateContact to update the actual contact object, """Sends UpdateContact to update the actual contact object,
domain object remains unaffected domain object remains unaffected
@ -440,26 +459,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def security_contact(self) -> PublicContact: def security_contact(self) -> PublicContact | None:
"""Get or set the security contact for this domain.""" """Get or set the security contact for this domain."""
try: security = PublicContact.ContactTypeChoices.SECURITY
contacts = self._get_property("contacts") return self.generic_contact_getter(security)
for contact in contacts:
if (
"type" in contact.keys()
and contact["type"] == PublicContact.ContactTypeChoices.SECURITY
):
tempContact = self.get_default_security_contact()
tempContact.email = contact["email"]
return tempContact
except Exception as err: # use better error handling
logger.info("Couldn't get contact %s" % err)
# TODO - remove this ideally it should return None,
# but error handling needs to be
# added on the security email page so that it can handle it being none
return self.get_default_security_contact()
def _add_registrant_to_existing_domain(self, contact: PublicContact): def _add_registrant_to_existing_domain(self, contact: PublicContact):
"""Used to change the registrant contact on an existing domain""" """Used to change the registrant contact on an existing domain"""
@ -533,6 +536,7 @@ class Domain(TimeStampedModel, DomainHelper):
.filter(domain=self, contact_type=contact.contact_type) .filter(domain=self, contact_type=contact.contact_type)
.get() .get()
) )
if isRegistrant: if isRegistrant:
# send update domain only for registant contacts # send update domain only for registant contacts
existing_contact.delete() existing_contact.delete()
@ -589,9 +593,10 @@ class Domain(TimeStampedModel, DomainHelper):
) )
@Cache @Cache
def technical_contact(self) -> PublicContact: def technical_contact(self) -> PublicContact | None:
"""Get or set the tech contact for this domain.""" """Get or set the tech contact for this domain."""
raise NotImplementedError() tech = PublicContact.ContactTypeChoices.TECHNICAL
return self.generic_contact_getter(tech)
@technical_contact.setter # type: ignore @technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact): def technical_contact(self, contact: PublicContact):
@ -674,6 +679,231 @@ class Domain(TimeStampedModel, DomainHelper):
help_text="Very basic info about the lifecycle of this domain object", help_text="Very basic info about the lifecycle of this domain object",
) )
def isActive(self):
return self.state == Domain.State.CREATED
def map_epp_contact_to_public_contact(
self, contact: eppInfo.InfoContactResultData, contact_id, contact_type
):
"""Maps the Epp contact representation to a PublicContact object.
contact -> eppInfo.InfoContactResultData: The converted contact object
contact_id -> str: The given registry_id of the object (i.e "cheese@cia.gov")
contact_type -> str: The given contact type, (i.e. "tech" or "registrant")
"""
if contact is None:
return None
if contact_type is None:
raise ContactError("contact_type is None")
if contact_id is None:
raise ContactError("contact_id is None")
# Since contact_id is registry_id,
# check that its the right length
contact_id_length = len(contact_id)
if (
contact_id_length > PublicContact.get_max_id_length()
or contact_id_length < 1
):
raise ContactError(
"contact_id is of invalid length. "
"Cannot exceed 16 characters, "
f"got {contact_id} with a length of {contact_id_length}"
)
if not isinstance(contact, eppInfo.InfoContactResultData):
raise ContactError("Contact must be of type InfoContactResultData")
auth_info = contact.auth_info
postal_info = contact.postal_info
addr = postal_info.addr
streets = None
if addr is not None:
streets = addr.street
streets_kwargs = self._convert_streets_to_dict(streets)
desired_contact = PublicContact(
domain=self,
contact_type=contact_type,
registry_id=contact_id,
email=contact.email or "",
voice=contact.voice or "",
fax=contact.fax,
name=postal_info.name or "",
org=postal_info.org,
# For linter - default to "" instead of None
pw=getattr(auth_info, "pw", ""),
city=getattr(addr, "city", ""),
pc=getattr(addr, "pc", ""),
cc=getattr(addr, "cc", ""),
sp=getattr(addr, "sp", ""),
**streets_kwargs,
) # type: ignore
return desired_contact
def _convert_streets_to_dict(self, streets):
"""
Converts EPPLibs street representation
to PublicContacts.
Args:
streets (Sequence[str]): Streets from EPPLib.
Returns:
dict: {
"street1": str or "",
"street2": str or None,
"street3": str or None,
}
EPPLib returns 'street' as an sequence of strings.
Meanwhile, PublicContact has this split into three
seperate properties: street1, street2, street3.
Handles this disparity.
"""
# 'zips' two lists together.
# For instance, (('street1', 'some_value_here'),
# ('street2', 'some_value_here'))
# Dict then converts this to a useable kwarg which we can pass in
street_dict = dict(
zip_longest(
["street1", "street2", "street3"],
streets if streets is not None else [""],
fillvalue=None,
)
)
return street_dict
def _request_contact_info(self, contact: PublicContact):
try:
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
except RegistryError as error:
logger.error(
"Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s", # noqa
contact.registry_id,
contact.contact_type,
error.code,
error,
)
raise error
def generic_contact_getter(
self, contact_type_choice: PublicContact.ContactTypeChoices
) -> PublicContact | None:
"""Retrieves the desired PublicContact from the registry.
This abstracts the caching and EPP retrieval for
all contact items and thus may result in EPP calls being sent.
contact_type_choice is a literal in PublicContact.ContactTypeChoices,
for instance: PublicContact.ContactTypeChoices.SECURITY.
If you wanted to setup getter logic for Security, you would call:
cache_contact_helper(PublicContact.ContactTypeChoices.SECURITY),
or cache_contact_helper("security").
"""
# registrant_contact(s) are an edge case. They exist on
# the "registrant" property as opposed to contacts.
desired_property = "contacts"
if contact_type_choice == PublicContact.ContactTypeChoices.REGISTRANT:
desired_property = "registrant"
try:
# Grab from cache
contacts = self._get_property(desired_property)
except KeyError as error:
logger.error(f"Could not find {contact_type_choice}: {error}")
return None
else:
cached_contact = self.get_contact_in_keys(contacts, contact_type_choice)
if cached_contact is None:
# TODO - #1103
raise ContactError("No contact was found in cache or the registry")
return cached_contact
def get_default_security_contact(self):
"""Gets the default security contact."""
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def get_default_administrative_contact(self):
"""Gets the default administrative contact."""
contact = PublicContact.get_default_administrative()
contact.domain = self
return contact
def get_default_technical_contact(self):
"""Gets the default technical contact."""
contact = PublicContact.get_default_technical()
contact.domain = self
return contact
def get_default_registrant_contact(self):
"""Gets the default registrant contact."""
contact = PublicContact.get_default_registrant()
contact.domain = self
return contact
def get_contact_in_keys(self, contacts, contact_type):
"""Gets a contact object.
Args:
contacts ([PublicContact]): List of PublicContacts
contact_type (literal): Which PublicContact to get
Returns:
PublicContact | None
"""
# Registrant doesn't exist as an array, and is of
# a special data type, so we need to handle that.
if contact_type == PublicContact.ContactTypeChoices.REGISTRANT:
desired_contact = None
if isinstance(contacts, str):
desired_contact = self._registrant_to_public_contact(
self._cache["registrant"]
)
# Set the cache with the updated object
# for performance reasons.
if "registrant" in self._cache:
self._cache["registrant"] = desired_contact
elif isinstance(contacts, PublicContact):
desired_contact = contacts
return self._handle_registrant_contact(desired_contact)
_registry_id: str
if contact_type in contacts:
_registry_id = contacts.get(contact_type)
desired = PublicContact.objects.filter(
registry_id=_registry_id, domain=self, contact_type=contact_type
)
if desired.count() == 1:
return desired.get()
logger.info(f"Requested contact {_registry_id} does not exist in cache.")
return None
def _handle_registrant_contact(self, contact):
if (
contact.contact_type is not None
and contact.contact_type == PublicContact.ContactTypeChoices.REGISTRANT
):
return contact
else:
raise ValueError("Invalid contact object for registrant_contact")
# ForeignKey on UserDomainRole creates a "permissions" member for # ForeignKey on UserDomainRole creates a "permissions" member for
# all of the user-roles that are in place for this domain # all of the user-roles that are in place for this domain
@ -720,9 +950,9 @@ class Domain(TimeStampedModel, DomainHelper):
try: try:
logger.info("Getting domain info from epp") logger.info("Getting domain info from epp")
req = commands.InfoDomain(name=self.name) req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0] domainInfoResponse = registry.send(req, cleaned=True)
exitEarly = True exitEarly = True
return domainInfo return domainInfoResponse
except RegistryError as e: except RegistryError as e:
count += 1 count += 1
@ -772,12 +1002,10 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact() security_contact = self.get_default_security_contact()
security_contact.save() security_contact.save()
technical_contact = PublicContact.get_default_technical() technical_contact = self.get_default_technical_contact()
technical_contact.domain = self
technical_contact.save() technical_contact.save()
administrative_contact = PublicContact.get_default_administrative() administrative_contact = self.get_default_administrative_contact()
administrative_contact.domain = self
administrative_contact.save() administrative_contact.save()
@transition( @transition(
@ -930,16 +1158,34 @@ class Domain(TimeStampedModel, DomainHelper):
) )
return err.code return err.code
def _request_contact_info(self, contact: PublicContact): def _fetch_contacts(self, contact_data):
req = commands.InfoContact(id=contact.registry_id) """Fetch contact info."""
return registry.send(req, cleaned=True).res_data[0] choices = PublicContact.ContactTypeChoices
# We expect that all these fields get populated,
# so we can create these early, rather than waiting.
contacts_dict = {
choices.ADMINISTRATIVE: None,
choices.SECURITY: None,
choices.TECHNICAL: None,
}
for domainContact in contact_data:
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# Map the object we recieved from EPP to a PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
data, domainContact.contact, domainContact.type
)
# Find/create it in the DB
in_db = self._get_or_create_public_contact(mapped_object)
contacts_dict[in_db.contact_type] = in_db.registry_id
return contacts_dict
def _get_or_create_contact(self, contact: PublicContact): def _get_or_create_contact(self, contact: PublicContact):
"""Try to fetch info about a contact. Create it if it does not exist.""" """Try to fetch info about a contact. Create it if it does not exist."""
try: try:
return self._request_contact_info(contact) return self._request_contact_info(contact)
except RegistryError as e: except RegistryError as e:
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info( logger.info(
@ -962,6 +1208,23 @@ class Domain(TimeStampedModel, DomainHelper):
raise e raise e
def _fetch_hosts(self, host_data):
"""Fetch host info."""
hosts = []
for name in host_data:
req = commands.InfoHost(name=name)
data = registry.send(req, cleaned=True).res_data[0]
host = {
"name": name,
"addrs": getattr(data, "addrs", ...),
"cr_date": getattr(data, "cr_date", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
}
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts
def _update_or_create_host(self, host): def _update_or_create_host(self, host):
raise NotImplementedError() raise NotImplementedError()
@ -972,7 +1235,8 @@ class Domain(TimeStampedModel, DomainHelper):
"""Contact registry for info about a domain.""" """Contact registry for info about a domain."""
try: try:
# get info from registry # get info from registry
data = self._get_or_create_domain() dataResponse = self._get_or_create_domain()
data = dataResponse.res_data[0]
# extract properties from response # extract properties from response
# (Ellipsis is used to mean "null") # (Ellipsis is used to mean "null")
cache = { cache = {
@ -987,14 +1251,21 @@ class Domain(TimeStampedModel, DomainHelper):
"tr_date": getattr(data, "tr_date", ...), "tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...), "up_date": getattr(data, "up_date", ...),
} }
# remove null properties (to distinguish between "a value of None" and null) # remove null properties (to distinguish between "a value of None" and null)
cleaned = {k: v for k, v in cache.items() if v is not ...} cleaned = {k: v for k, v in cache.items() if v is not ...}
# statuses can just be a list no need to keep the epp object # statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys(): if "statuses" in cleaned:
cleaned["statuses"] = [status.state for status in cleaned["statuses"]] cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get extensions info, if there is any
# DNSSECExtension is one possible extension, make sure to handle
# only DNSSECExtension and not other type extensions
returned_extensions = dataResponse.extensions
cleaned["dnssecdata"] = None
for extension in returned_extensions:
if isinstance(extension, extensions.DNSSECExtension):
cleaned["dnssecdata"] = extension
# Capture and store old hosts and contacts from cache if they exist # Capture and store old hosts and contacts from cache if they exist
old_cache_hosts = self._cache.get("hosts") old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts") old_cache_contacts = self._cache.get("contacts")
@ -1004,7 +1275,7 @@ class Domain(TimeStampedModel, DomainHelper):
fetch_contacts fetch_contacts
and "_contacts" in cleaned and "_contacts" in cleaned
and isinstance(cleaned["_contacts"], list) and isinstance(cleaned["_contacts"], list)
and len(cleaned["_contacts"]) and len(cleaned["_contacts"]) > 0
): ):
cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"]) cleaned["contacts"] = self._fetch_contacts(cleaned["_contacts"])
# We're only getting contacts, so retain the old # We're only getting contacts, so retain the old
@ -1026,52 +1297,69 @@ class Domain(TimeStampedModel, DomainHelper):
# and pass them along. # and pass them along.
if old_cache_contacts is not None: if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts cleaned["contacts"] = old_cache_contacts
# replace the prior cache with new data # replace the prior cache with new data
self._cache = cleaned self._cache = cleaned
except RegistryError as e: except RegistryError as e:
logger.error(e) logger.error(e)
def _fetch_contacts(self, contact_data): def _get_or_create_public_contact(self, public_contact: PublicContact):
"""Fetch contact info.""" """Tries to find a PublicContact object in our DB.
contacts = [] If it can't, it'll create it. Returns PublicContact"""
for domainContact in contact_data: db_contact = PublicContact.objects.filter(
req = commands.InfoContact(id=domainContact.contact) registry_id=public_contact.registry_id,
data = registry.send(req, cleaned=True).res_data[0] contact_type=public_contact.contact_type,
contact = { domain=self,
"id": domainContact.contact, )
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
"email": getattr(data, "email", ...),
"fax": getattr(data, "fax", ...),
"postal_info": getattr(data, "postal_info", ...),
"statuses": getattr(data, "statuses", ...),
"tr_date": getattr(data, "tr_date", ...),
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
contacts.append({k: v for k, v in contact.items() if v is not ...})
return contacts
def _fetch_hosts(self, host_data): # Raise an error if we find duplicates.
"""Fetch host info.""" # This should not occur
hosts = [] if db_contact.count() > 1:
for name in host_data: raise Exception(
req = commands.InfoHost(name=name) f"Multiple contacts found for {public_contact.contact_type}"
data = registry.send(req, cleaned=True).res_data[0] )
host = {
"name": name, # Save to DB if it doesn't exist already.
"addrs": getattr(data, "addrs", ...), if db_contact.count() == 0:
"cr_date": getattr(data, "cr_date", ...), # Doesn't run custom save logic, just saves to DB
"statuses": getattr(data, "statuses", ...), public_contact.save(skip_epp_save=True)
"tr_date": getattr(data, "tr_date", ...), logger.info(f"Created a new PublicContact: {public_contact}")
"up_date": getattr(data, "up_date", ...), # Append the item we just created
} return public_contact
hosts.append({k: v for k, v in host.items() if v is not ...})
return hosts existing_contact = db_contact.get()
# Does the item we're grabbing match
# what we have in our DB?
if (
existing_contact.email != public_contact.email
or existing_contact.registry_id != public_contact.registry_id
):
existing_contact.delete()
public_contact.save()
logger.warning("Requested PublicContact is out of sync " "with DB.")
return public_contact
# If it already exists, we can
# assume that the DB instance was updated
# during set, so we should just use that.
return existing_contact
def _registrant_to_public_contact(self, registry_id: str):
"""EPPLib returns the registrant as a string,
which is the registrants associated registry_id. This function is used to
convert that id to a useable object by calling commands.InfoContact
on that ID, then mapping that object to type PublicContact."""
contact = PublicContact(
registry_id=registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
# Grabs the expanded contact
full_object = self._request_contact_info(contact)
# Maps it to type PublicContact
mapped_object = self.map_epp_contact_to_public_contact(
full_object, contact.registry_id, contact.contact_type
)
return self._get_or_create_public_contact(mapped_object)
def _invalidate_cache(self): def _invalidate_cache(self):
"""Remove cache data when updates are made.""" """Remove cache data when updates are made."""

View file

@ -29,7 +29,8 @@ class PublicContact(TimeStampedModel):
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
"""Save to the registry and also locally in the registrar database.""" """Save to the registry and also locally in the registrar database."""
if hasattr(self, "domain"): skip_epp_save = kwargs.pop("skip_epp_save", False)
if hasattr(self, "domain") and not skip_epp_save:
match self.contact_type: match self.contact_type:
case PublicContact.ContactTypeChoices.REGISTRANT: case PublicContact.ContactTypeChoices.REGISTRANT:
self.domain.registrant_contact = self self.domain.registrant_contact = self
@ -148,6 +149,10 @@ class PublicContact(TimeStampedModel):
pw="thisisnotapassword", pw="thisisnotapassword",
) )
@classmethod
def get_max_id_length(cls):
return cls._meta.get_field("registry_id").max_length
def __str__(self): def __str__(self):
return ( return (
f"{self.name} <{self.email}>" f"{self.name} <{self.email}>"

View file

@ -81,3 +81,9 @@ class User(AbstractUser):
logger.warn( logger.warn(
"Failed to retrieve invitation %s", invitation, exc_info=True "Failed to retrieve invitation %s", invitation, exc_info=True
) )
class Meta:
permissions = [
("analyst_access_permission", "Analyst Access Permission"),
("full_access_permission", "Full Access Permission"),
]

View file

@ -15,7 +15,7 @@ class UserDomainRole(TimeStampedModel):
elsewhere. elsewhere.
""" """
ADMIN = "admin" ADMIN = "manager"
user = models.ForeignKey( user = models.ForeignKey(
"registrar.User", "registrar.User",

View file

@ -0,0 +1,132 @@
from django.contrib.auth.models import Group
import logging
logger = logging.getLogger(__name__)
class UserGroup(Group):
class Meta:
verbose_name = "User group"
verbose_name_plural = "User groups"
def create_cisa_analyst_group(apps, schema_editor):
"""This method gets run from a data migration."""
# Hard to pass self to these methods as the calls from migrations
# are only expecting apps and schema_editor, so we'll just define
# apps, schema_editor in the local scope instead
CISA_ANALYST_GROUP_PERMISSIONS = [
{
"app_label": "auditlog",
"model": "logentry",
"permissions": ["view_logentry"],
},
{
"app_label": "registrar",
"model": "contact",
"permissions": ["view_contact"],
},
{
"app_label": "registrar",
"model": "domaininformation",
"permissions": ["change_domaininformation"],
},
{
"app_label": "registrar",
"model": "domainapplication",
"permissions": ["change_domainapplication"],
},
{
"app_label": "registrar",
"model": "domain",
"permissions": ["view_domain"],
},
{
"app_label": "registrar",
"model": "draftdomain",
"permissions": ["change_draftdomain"],
},
{
"app_label": "registrar",
"model": "user",
"permissions": ["analyst_access_permission", "change_user"],
},
]
# Avoid error: You can't execute queries until the end
# of the 'atomic' block.
# From django docs:
# https://docs.djangoproject.com/en/4.2/topics/migrations/#data-migrations
# We cant import the Person model directly as it may be a newer
# version than this migration expects. We use the historical version.
ContentType = apps.get_model("contenttypes", "ContentType")
Permission = apps.get_model("auth", "Permission")
UserGroup = apps.get_model("registrar", "UserGroup")
logger.info("Going to create the Analyst Group")
try:
cisa_analysts_group, _ = UserGroup.objects.get_or_create(
name="cisa_analysts_group",
)
cisa_analysts_group.permissions.clear()
for permission in CISA_ANALYST_GROUP_PERMISSIONS:
app_label = permission["app_label"]
model_name = permission["model"]
permissions = permission["permissions"]
# Retrieve the content type for the app and model
content_type = ContentType.objects.get(
app_label=app_label, model=model_name
)
# Retrieve the permissions based on their codenames
permissions = Permission.objects.filter(
content_type=content_type, codename__in=permissions
)
# Assign the permissions to the group
cisa_analysts_group.permissions.add(*permissions)
# Convert the permissions QuerySet to a list of codenames
permission_list = list(permissions.values_list("codename", flat=True))
logger.debug(
app_label
+ " | "
+ model_name
+ " | "
+ ", ".join(permission_list)
+ " added to group "
+ cisa_analysts_group.name
)
cisa_analysts_group.save()
logger.debug(
"CISA Analyt permissions added to group " + cisa_analysts_group.name
)
except Exception as e:
logger.error(f"Error creating analyst permissions group: {e}")
def create_full_access_group(apps, schema_editor):
"""This method gets run from a data migration."""
Permission = apps.get_model("auth", "Permission")
UserGroup = apps.get_model("registrar", "UserGroup")
logger.info("Going to create the Full Access Group")
try:
full_access_group, _ = UserGroup.objects.get_or_create(
name="full_access_group",
)
# Get all available permissions
all_permissions = Permission.objects.all()
# Assign all permissions to the group
full_access_group.permissions.add(*all_permissions)
full_access_group.save()
logger.debug("All permissions added to group " + full_access_group.name)
except Exception as e:
logger.error(f"Error creating full access group: {e}")

View file

@ -0,0 +1,2 @@
class ContactError(Exception):
...

View file

@ -21,7 +21,7 @@
<button <button
type="submit" type="submit"
class="usa-button" class="usa-button"
>Add security email</button> >{% if form.security_email.value is None or form.security_email.value == "dotgov@cisa.dhs.gov"%}Add security email{% else %}Save{% endif %}</button>
</form> </form>
{% endblock %} {# domain_content #} {% endblock %} {# domain_content #}

View file

@ -19,6 +19,7 @@ from registrar.models import (
DomainApplication, DomainApplication,
DomainInvitation, DomainInvitation,
User, User,
UserGroup,
DomainInformation, DomainInformation,
PublicContact, PublicContact,
Domain, Domain,
@ -26,6 +27,7 @@ from registrar.models import (
from epplibwrapper import ( from epplibwrapper import (
commands, commands,
common, common,
info,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
@ -94,7 +96,10 @@ class MockUserLogin:
} }
user, _ = UserModel.objects.get_or_create(**args) user, _ = UserModel.objects.get_or_create(**args)
user.is_staff = True user.is_staff = True
user.is_superuser = True # Create or retrieve the group
group, _ = UserGroup.objects.get_or_create(name="full_access_group")
# Add the user to the group
user.groups.set([group])
user.save() user.save()
backend = settings.AUTHENTICATION_BACKENDS[-1] backend = settings.AUTHENTICATION_BACKENDS[-1]
login(request, user, backend=backend) login(request, user, backend=backend)
@ -426,22 +431,33 @@ def mock_user():
def create_superuser(): def create_superuser():
User = get_user_model() User = get_user_model()
p = "adminpass" p = "adminpass"
return User.objects.create_superuser( user = User.objects.create_user(
username="superuser", username="superuser",
email="admin@example.com", email="admin@example.com",
is_staff=True,
password=p, password=p,
) )
# Retrieve the group or create it if it doesn't exist
group, _ = UserGroup.objects.get_or_create(name="full_access_group")
# Add the user to the group
user.groups.set([group])
return user
def create_user(): def create_user():
User = get_user_model() User = get_user_model()
p = "userpass" p = "userpass"
return User.objects.create_user( user = User.objects.create_user(
username="staffuser", username="staffuser",
email="user@example.com", email="user@example.com",
is_staff=True, is_staff=True,
password=p, password=p,
) )
# Retrieve the group or create it if it doesn't exist
group, _ = UserGroup.objects.get_or_create(name="cisa_analysts_group")
# Add the user to the group
user.groups.set([group])
return user
def create_ready_domain(): def create_ready_domain():
@ -555,32 +571,117 @@ class MockEppLib(TestCase):
contacts=..., contacts=...,
hosts=..., hosts=...,
statuses=..., statuses=...,
registrant=...,
): ):
self.auth_info = auth_info self.auth_info = auth_info
self.cr_date = cr_date self.cr_date = cr_date
self.contacts = contacts self.contacts = contacts
self.hosts = hosts self.hosts = hosts
self.statuses = statuses self.statuses = statuses
self.registrant = registrant
def dummyInfoContactResultData(
self,
id,
email,
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
pw="thisisnotapassword",
):
fake = info.InfoContactResultData(
id=id,
postal_info=common.PostalInfo(
name="Registry Customer Service",
addr=common.ContactAddr(
street=["4200 Wilson Blvd."],
city="Arlington",
pc="22201",
cc="US",
sp="VA",
),
org="Cybersecurity and Infrastructure Security Agency",
type="type",
),
voice="+1.8882820870",
fax="+1-212-9876543",
email=email,
auth_info=common.ContactAuthInfo(pw=pw),
roid=...,
statuses=[],
cl_id=...,
cr_id=...,
cr_date=cr_date,
up_id=...,
up_date=...,
tr_date=...,
disclose=...,
vat=...,
ident=...,
notify_email=...,
)
return fake
mockDataInfoDomain = fakedEppObject( mockDataInfoDomain = fakedEppObject(
"fakepw", "fakePw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[common.DomainContact(contact="123", type="security")], contacts=[
common.DomainContact(
contact="123", type=PublicContact.ContactTypeChoices.SECURITY
)
],
hosts=["fake.host.com"], hosts=["fake.host.com"],
statuses=[ statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"), common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"), common.Status(state="inactive", description="", lang="en"),
], ],
) )
mockDataInfoContact = mockDataInfoDomain.dummyInfoContactResultData(
"123", "123@mail.gov", datetime.datetime(2023, 5, 25, 19, 45, 35), "lastPw"
)
InfoDomainWithContacts = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[
common.DomainContact(
contact="securityContact",
type=PublicContact.ContactTypeChoices.SECURITY,
),
common.DomainContact(
contact="technicalContact",
type=PublicContact.ContactTypeChoices.TECHNICAL,
),
common.DomainContact(
contact="adminContact",
type=PublicContact.ContactTypeChoices.ADMINISTRATIVE,
),
],
hosts=["fake.host.com"],
statuses=[
common.Status(state="serverTransferProhibited", description="", lang="en"),
common.Status(state="inactive", description="", lang="en"),
],
registrant="regContact",
)
mockSecurityContact = InfoDomainWithContacts.dummyInfoContactResultData(
"securityContact", "security@mail.gov"
)
mockTechnicalContact = InfoDomainWithContacts.dummyInfoContactResultData(
"technicalContact", "tech@mail.gov"
)
mockAdministrativeContact = InfoDomainWithContacts.dummyInfoContactResultData(
"adminContact", "admin@mail.gov"
)
mockRegistrantContact = InfoDomainWithContacts.dummyInfoContactResultData(
"regContact", "registrant@mail.gov"
)
infoDomainNoContact = fakedEppObject( infoDomainNoContact = fakedEppObject(
"security", "security",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35), cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=[], contacts=[],
hosts=["fake.host.com"], hosts=["fake.host.com"],
) )
mockDataInfoContact = fakedEppObject(
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
)
mockDataInfoHosts = fakedEppObject( mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)
) )
@ -593,9 +694,28 @@ class MockEppLib(TestCase):
if isinstance(_request, commands.InfoDomain): if isinstance(_request, commands.InfoDomain):
if getattr(_request, "name", None) == "security.gov": if getattr(_request, "name", None) == "security.gov":
return MagicMock(res_data=[self.infoDomainNoContact]) return MagicMock(res_data=[self.infoDomainNoContact])
elif getattr(_request, "name", None) == "freeman.gov":
return MagicMock(res_data=[self.InfoDomainWithContacts])
else:
return MagicMock(res_data=[self.mockDataInfoDomain]) return MagicMock(res_data=[self.mockDataInfoDomain])
elif isinstance(_request, commands.InfoContact): elif isinstance(_request, commands.InfoContact):
return MagicMock(res_data=[self.mockDataInfoContact]) mocked_result: info.InfoContactResultData
# For testing contact types
match getattr(_request, "id", None):
case "securityContact":
mocked_result = self.mockSecurityContact
case "technicalContact":
mocked_result = self.mockTechnicalContact
case "adminContact":
mocked_result = self.mockAdministrativeContact
case "regContact":
mocked_result = self.mockRegistrantContact
case _:
# Default contact return
mocked_result = self.mockDataInfoContact
return MagicMock(res_data=[mocked_result])
elif ( elif (
isinstance(_request, commands.CreateContact) isinstance(_request, commands.CreateContact)
and getattr(_request, "id", None) == "fail" and getattr(_request, "id", None) == "fail"

View file

@ -52,6 +52,7 @@ class TestDomainAdmin(MockEppLib):
self.factory = RequestFactory() self.factory = RequestFactory()
super().setUp() super().setUp()
@skip("Why did this test stop working, and is is a good test")
def test_place_and_remove_hold(self): def test_place_and_remove_hold(self):
domain = create_ready_domain() domain = create_ready_domain()
# get admin page and assert Place Hold button # get admin page and assert Place Hold button
@ -933,14 +934,13 @@ class MyUserAdminTest(TestCase):
request.user = create_user() request.user = create_user()
list_display = self.admin.get_list_display(request) list_display = self.admin.get_list_display(request)
expected_list_display = ( expected_list_display = [
"email", "email",
"first_name", "first_name",
"last_name", "last_name",
"is_staff", "group",
"is_superuser",
"status", "status",
) ]
self.assertEqual(list_display, expected_list_display) self.assertEqual(list_display, expected_list_display)
self.assertNotIn("username", list_display) self.assertNotIn("username", list_display)
@ -952,14 +952,14 @@ class MyUserAdminTest(TestCase):
expected_fieldsets = super(MyUserAdmin, self.admin).get_fieldsets(request) expected_fieldsets = super(MyUserAdmin, self.admin).get_fieldsets(request)
self.assertEqual(fieldsets, expected_fieldsets) self.assertEqual(fieldsets, expected_fieldsets)
def test_get_fieldsets_non_superuser(self): def test_get_fieldsets_cisa_analyst(self):
request = self.client.request().wsgi_request request = self.client.request().wsgi_request
request.user = create_user() request.user = create_user()
fieldsets = self.admin.get_fieldsets(request) fieldsets = self.admin.get_fieldsets(request)
expected_fieldsets = ( expected_fieldsets = (
(None, {"fields": ("password", "status")}), (None, {"fields": ("password", "status")}),
("Personal Info", {"fields": ("first_name", "last_name", "email")}), ("Personal Info", {"fields": ("first_name", "last_name", "email")}),
("Permissions", {"fields": ("is_active", "is_staff", "is_superuser")}), ("Permissions", {"fields": ("is_active", "groups")}),
("Important dates", {"fields": ("last_login", "date_joined")}), ("Important dates", {"fields": ("last_login", "date_joined")}),
) )
self.assertEqual(fieldsets, expected_fieldsets) self.assertEqual(fieldsets, expected_fieldsets)

View file

@ -0,0 +1,51 @@
from django.test import TestCase
from registrar.models import (
UserGroup,
)
import logging
logger = logging.getLogger(__name__)
class TestGroups(TestCase):
def test_groups_created(self):
"""The test enviroment contains data that was created in migration,
so we are able to test groups and permissions.
- Test cisa_analysts_group and full_access_group created
- Test permissions on full_access_group
"""
# Get the UserGroup objects
cisa_analysts_group = UserGroup.objects.get(name="cisa_analysts_group")
full_access_group = UserGroup.objects.get(name="full_access_group")
# Assert that the cisa_analysts_group exists in the database
self.assertQuerysetEqual(
UserGroup.objects.filter(name="cisa_analysts_group"), [cisa_analysts_group]
)
# Assert that the full_access_group exists in the database
self.assertQuerysetEqual(
UserGroup.objects.filter(name="full_access_group"), [full_access_group]
)
# Test permissions for cisa_analysts_group
# Define the expected permission codenames
expected_permissions = [
"view_logentry",
"view_contact",
"view_domain",
"change_domainapplication",
"change_domaininformation",
"change_draftdomain",
"analyst_access_permission",
"change_user",
]
# Get the codenames of actual permissions associated with the group
actual_permissions = [p.codename for p in cisa_analysts_group.permissions.all()]
# Assert that the actual permissions match the expected permissions
self.assertListEqual(actual_permissions, expected_permissions)

View file

@ -3,6 +3,7 @@ Feature being tested: Registry Integration
This file tests the various ways in which the registrar interacts with the registry. This file tests the various ways in which the registrar interacts with the registry.
""" """
from typing import Mapping, Any
from django.test import TestCase from django.test import TestCase
from django.db.utils import IntegrityError from django.db.utils import IntegrityError
from unittest.mock import MagicMock, patch, call from unittest.mock import MagicMock, patch, call
@ -20,19 +21,28 @@ from django_fsm import TransitionNotAllowed # type: ignore
from epplibwrapper import ( from epplibwrapper import (
commands, commands,
common, common,
extensions,
responses, responses,
RegistryError, RegistryError,
ErrorCode, ErrorCode,
) )
import logging
logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib): class TestDomainCache(MockEppLib):
def tearDown(self):
PublicContact.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
def test_cache_sets_resets(self): def test_cache_sets_resets(self):
"""Cache should be set on getter and reset on setter calls""" """Cache should be set on getter and reset on setter calls"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov") domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# trigger getter # trigger getter
_ = domain.creation_date _ = domain.creation_date
domain._get_property("contacts")
# getter should set the domain cache with a InfoDomain object # getter should set the domain cache with a InfoDomain object
# (see InfoDomainResult) # (see InfoDomainResult)
self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info) self.assertEquals(domain._cache["auth_info"], self.mockDataInfoDomain.auth_info)
@ -80,13 +90,16 @@ class TestDomainCache(MockEppLib):
def test_cache_nested_elements(self): def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts""" """Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov") domain, _ = Domain.objects.get_or_create(name="igorville.gov")
# The contact list will initially contain objects of type 'DomainContact'
# the cached contacts and hosts should be dictionaries of what is passed to them # this is then transformed into PublicContact, and cache should NOT
# hold onto the DomainContact object
expectedUnfurledContactsList = [
common.DomainContact(contact="123", type="security"),
]
expectedContactsDict = { expectedContactsDict = {
"id": self.mockDataInfoDomain.contacts[0].contact, PublicContact.ContactTypeChoices.ADMINISTRATIVE: None,
"type": self.mockDataInfoDomain.contacts[0].type, PublicContact.ContactTypeChoices.SECURITY: "123",
"auth_info": self.mockDataInfoContact.auth_info, PublicContact.ContactTypeChoices.TECHNICAL: None,
"cr_date": self.mockDataInfoContact.cr_date,
} }
expectedHostsDict = { expectedHostsDict = {
"name": self.mockDataInfoDomain.hosts[0], "name": self.mockDataInfoDomain.hosts[0],
@ -102,13 +115,15 @@ class TestDomainCache(MockEppLib):
# check contacts # check contacts
self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts) self.assertEqual(domain._cache["_contacts"], self.mockDataInfoDomain.contacts)
self.assertEqual(domain._cache["contacts"], [expectedContactsDict]) # The contact list should not contain what is sent by the registry by default,
# as _fetch_cache will transform the type to PublicContact
self.assertNotEqual(domain._cache["contacts"], expectedUnfurledContactsList)
self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# get and check hosts is set correctly # get and check hosts is set correctly
domain._get_property("hosts") domain._get_property("hosts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict]) self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict]) self.assertEqual(domain._cache["contacts"], expectedContactsDict)
# invalidate cache # invalidate cache
domain._cache = {} domain._cache = {}
@ -119,11 +134,64 @@ class TestDomainCache(MockEppLib):
# get contacts # get contacts
domain._get_property("contacts") domain._get_property("contacts")
self.assertEqual(domain._cache["hosts"], [expectedHostsDict]) self.assertEqual(domain._cache["hosts"], [expectedHostsDict])
self.assertEqual(domain._cache["contacts"], [expectedContactsDict]) self.assertEqual(domain._cache["contacts"], expectedContactsDict)
def tearDown(self) -> None: def test_map_epp_contact_to_public_contact(self):
Domain.objects.all().delete() # Tests that the mapper is working how we expect
super().tearDown() domain, _ = Domain.objects.get_or_create(name="registry.gov")
security = PublicContact.ContactTypeChoices.SECURITY
mapped = domain.map_epp_contact_to_public_contact(
self.mockDataInfoContact,
self.mockDataInfoContact.id,
security,
)
expected_contact = PublicContact(
domain=domain,
contact_type=security,
registry_id="123",
email="123@mail.gov",
voice="+1.8882820870",
fax="+1-212-9876543",
pw="lastPw",
name="Registry Customer Service",
org="Cybersecurity and Infrastructure Security Agency",
city="Arlington",
pc="22201",
cc="US",
sp="VA",
street1="4200 Wilson Blvd.",
)
# Test purposes only, since we're comparing
# two duplicate objects. We would expect
# these not to have the same state.
expected_contact._state = mapped._state
# Mapped object is what we expect
self.assertEqual(mapped.__dict__, expected_contact.__dict__)
# The mapped object should correctly translate to a DB
# object. If not, something else went wrong.
db_object = domain._get_or_create_public_contact(mapped)
in_db = PublicContact.objects.filter(
registry_id=domain.security_contact.registry_id,
contact_type=security,
).get()
# DB Object is the same as the mapped object
self.assertEqual(db_object, in_db)
domain.security_contact = in_db
# Trigger the getter
_ = domain.security_contact
# Check to see that changes made
# to DB objects persist in cache correctly
in_db.email = "123test@mail.gov"
in_db.save()
cached_contact = domain._cache["contacts"].get(security)
self.assertEqual(cached_contact, in_db.registry_id)
self.assertEqual(domain.security_contact.email, "123test@mail.gov")
class TestDomainCreation(MockEppLib): class TestDomainCreation(MockEppLib):
@ -207,7 +275,10 @@ class TestDomainCreation(MockEppLib):
def tearDown(self) -> None: def tearDown(self) -> None:
DomainInformation.objects.all().delete() DomainInformation.objects.all().delete()
DomainApplication.objects.all().delete() DomainApplication.objects.all().delete()
PublicContact.objects.all().delete()
Domain.objects.all().delete() Domain.objects.all().delete()
User.objects.all().delete()
DraftDomain.objects.all().delete()
super().tearDown() super().tearDown()
@ -221,7 +292,6 @@ class TestDomainStatuses(MockEppLib):
_ = domain.statuses _ = domain.statuses
status_list = [status.state for status in self.mockDataInfoDomain.statuses] status_list = [status.state for status in self.mockDataInfoDomain.statuses]
self.assertEquals(domain._cache["statuses"], status_list) self.assertEquals(domain._cache["statuses"], status_list)
# Called in _fetch_cache # Called in _fetch_cache
self.mockedSendFunction.assert_has_calls( self.mockedSendFunction.assert_has_calls(
[ [
@ -265,6 +335,7 @@ class TestDomainStatuses(MockEppLib):
raise raise
def tearDown(self) -> None: def tearDown(self) -> None:
PublicContact.objects.all().delete()
Domain.objects.all().delete() Domain.objects.all().delete()
super().tearDown() super().tearDown()
@ -387,12 +458,17 @@ class TestRegistrantContacts(MockEppLib):
And the registrant is the admin on a domain And the registrant is the admin on a domain
""" """
super().setUp() super().setUp()
# Creates a domain with no contact associated to it
self.domain, _ = Domain.objects.get_or_create(name="security.gov") self.domain, _ = Domain.objects.get_or_create(name="security.gov")
# Creates a domain with an associated contact
self.domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
def tearDown(self): def tearDown(self):
super().tearDown() super().tearDown()
# self.contactMailingAddressPatch.stop() self.domain._invalidate_cache()
# self.createContactPatch.stop() self.domain_contact._invalidate_cache()
PublicContact.objects.all().delete()
Domain.objects.all().delete()
def test_no_security_email(self): def test_no_security_email(self):
""" """
@ -638,6 +714,133 @@ class TestRegistrantContacts(MockEppLib):
""" """
raise raise
def test_contact_getter_security(self):
security = PublicContact.ContactTypeChoices.SECURITY
# Create prexisting object
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockSecurityContact,
contact_id="securityContact",
contact_type=security,
)
# Checks if we grabbed the correct PublicContact
self.assertEqual(
self.domain_contact.security_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.security_contact.registry_id,
contact_type=security,
).get()
self.assertEqual(self.domain_contact.security_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="securityContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(security), "securityContact")
def test_contact_getter_technical(self):
technical = PublicContact.ContactTypeChoices.TECHNICAL
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockTechnicalContact,
contact_id="technicalContact",
contact_type=technical,
)
self.assertEqual(
self.domain_contact.technical_contact.email, expected_contact.email
)
# Checks if we grab the correct PublicContact
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.technical_contact.registry_id,
contact_type=technical,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.technical_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="technicalContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(technical), "technicalContact")
def test_contact_getter_administrative(self):
administrative = PublicContact.ContactTypeChoices.ADMINISTRATIVE
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockAdministrativeContact,
contact_id="adminContact",
contact_type=administrative,
)
self.assertEqual(
self.domain_contact.administrative_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.administrative_contact.registry_id,
contact_type=administrative,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(
self.domain_contact.administrative_contact, expected_contact_db
)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="adminContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect
cache = self.domain_contact._cache["contacts"]
self.assertEqual(cache.get(administrative), "adminContact")
def test_contact_getter_registrant(self):
expected_contact = self.domain.map_epp_contact_to_public_contact(
self.mockRegistrantContact,
contact_id="regContact",
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
)
self.assertEqual(
self.domain_contact.registrant_contact.email, expected_contact.email
)
expected_contact_db = PublicContact.objects.filter(
registry_id=self.domain_contact.registrant_contact.registry_id,
contact_type=PublicContact.ContactTypeChoices.REGISTRANT,
).get()
# Checks if we grab the correct PublicContact
self.assertEqual(self.domain_contact.registrant_contact, expected_contact_db)
self.mockedSendFunction.assert_has_calls(
[
call(
commands.InfoContact(id="regContact", auth_info=None),
cleaned=True,
),
]
)
# Checks if we are receiving the cache we expect.
self.assertEqual(self.domain_contact._cache["registrant"], expected_contact_db)
class TestRegistrantNameservers(TestCase): class TestRegistrantNameservers(TestCase):
"""Rule: Registrants may modify their nameservers""" """Rule: Registrants may modify their nameservers"""
@ -778,44 +981,372 @@ class TestRegistrantNameservers(TestCase):
raise raise
class TestRegistrantDNSSEC(TestCase): class TestRegistrantDNSSEC(MockEppLib):
"""Rule: Registrants may modify their secure DNS data""" """Rule: Registrants may modify their secure DNS data"""
# helper function to create UpdateDomainDNSSECExtention object for verification
def createUpdateExtension(self, dnssecdata: extensions.DNSSECExtension):
return commands.UpdateDomainDNSSECExtension(
maxSigLife=dnssecdata.maxSigLife,
dsData=dnssecdata.dsData,
keyData=dnssecdata.keyData,
remDsData=None,
remKeyData=None,
remAllDsKeyData=True,
)
def setUp(self): def setUp(self):
""" """
Background: Background:
Given the registrant is logged in Given the analyst is logged in
And the registrant is the admin on a domain And a domain exists in the registry
""" """
pass super().setUp()
# for the tests, need a domain in the unknown state
self.domain, _ = Domain.objects.get_or_create(name="fake.gov")
self.addDsData1 = {
"keyTag": 1234,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adec0bdd99",
}
self.addDsData2 = {
"keyTag": 2345,
"alg": 3,
"digestType": 1,
"digest": "ec0bdd990b39feead889f0ba613db4adecb4adec",
}
self.keyDataDict = {
"flags": 257,
"protocol": 3,
"alg": 1,
"pubKey": "AQPJ////4Q==",
}
self.dnssecExtensionWithDsData: Mapping[str, Any] = {
"dsData": [common.DSData(**self.addDsData1)]
}
self.dnssecExtensionWithMultDsData: Mapping[str, Any] = {
"dsData": [
common.DSData(**self.addDsData1),
common.DSData(**self.addDsData2),
],
}
self.dnssecExtensionWithKeyData: Mapping[str, Any] = {
"maxSigLife": 3215,
"keyData": [common.DNSSECKeyData(**self.keyDataDict)],
}
@skip("not implemented yet") def tearDown(self):
def test_user_adds_dns_data(self): Domain.objects.all().delete()
super().tearDown()
def test_user_adds_dnssec_data(self):
""" """
Scenario: Registrant adds DNS data Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
""" """
raise
@skip("not implemented yet") # make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# get the DNS SEC extension added to the UpdateDomain command and
# verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_dnssec_is_idempotent(self): def test_dnssec_is_idempotent(self):
""" """
Scenario: Registrant adds DNS data twice, due to a UI glitch Scenario: Registrant adds DNS data twice, due to a UI glitch
"""
# implementation note: this requires seeing what happens when these are actually # implementation note: this requires seeing what happens when these are actually
# sent like this, and then implementing appropriate mocks for any errors the # sent like this, and then implementing appropriate mocks for any errors the
# registry normally sends in this case # registry normally sends in this case
raise
@skip("not implemented yet") This test verifies:
1 - UpdateDomain command called twice
2 - setter causes the getter to call info domain on next get from cache
3 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# set the dnssecdata once
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# set the dnssecdata again
self.domain.dnssecdata = self.dnssecExtensionWithDsData
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_data_multiple_dsdata(self):
"""
Scenario: Registrant adds DNSSEC data with multiple DSData.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithMultDsData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithMultDsData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.dsData, self.dnssecExtensionWithMultDsData["dsData"]
)
patcher.stop()
def test_user_adds_dnssec_keydata(self):
"""
Scenario: Registrant adds DNSSEC data.
Verify that both the setter and getter are functioning properly
This test verifies:
1 - setter calls UpdateDomain command
2 - setter adds the UpdateDNSSECExtension extension to the command
3 - setter causes the getter to call info domain on next get from cache
4 - getter properly parses dnssecdata from InfoDomain response and sets to cache
"""
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
return MagicMock(
res_data=[self.mockDataInfoDomain],
extensions=[
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
],
)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
self.domain.dnssecdata = self.dnssecExtensionWithKeyData
# get the DNS SEC extension added to the UpdateDomain command
# and verify that it is properly sent
# args[0] is the _request sent to registry
args, _ = mocked_send.call_args
# assert that the extension matches
self.assertEquals(
args[0].extensions[0],
self.createUpdateExtension(
extensions.DNSSECExtension(**self.dnssecExtensionWithKeyData)
),
)
# test that the dnssecdata getter is functioning properly
dnssecdata_get = self.domain.dnssecdata
mocked_send.assert_has_calls(
[
call(
commands.UpdateDomain(
name="fake.gov",
nsset=None,
keyset=None,
registrant=None,
auth_info=None,
),
cleaned=True,
),
call(
commands.InfoDomain(
name="fake.gov",
),
cleaned=True,
),
]
)
self.assertEquals(
dnssecdata_get.keyData, self.dnssecExtensionWithKeyData["keyData"]
)
patcher.stop()
def test_update_is_unsuccessful(self): def test_update_is_unsuccessful(self):
""" """
Scenario: An update to the dns data is unsuccessful Scenario: An update to the dns data is unsuccessful
When an error is returned from epplibwrapper When an error is returned from epplibwrapper
Then a user-friendly error message is returned for displaying on the web Then a user-friendly error message is returned for displaying on the web
""" """
raise
# make sure to stop any other patcher so there are no conflicts
self.mockSendPatch.stop()
def side_effect(_request, cleaned):
raise RegistryError(code=ErrorCode.PARAMETER_VALUE_RANGE_ERROR)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
# if RegistryError is raised, view formats user-friendly
# error message if error is_client_error, is_session_error, or
# is_server_error; so test for those conditions
with self.assertRaises(RegistryError) as err:
self.domain.dnssecdata = self.dnssecExtensionWithDsData
self.assertTrue(
err.is_client_error() or err.is_session_error() or err.is_server_error()
)
patcher.stop()
class TestAnalystClientHold(MockEppLib): class TestAnalystClientHold(MockEppLib):

View file

@ -1,11 +1,11 @@
from unittest import skip from unittest import skip
from unittest.mock import MagicMock, ANY from unittest.mock import MagicMock, ANY, patch
from django.conf import settings from django.conf import settings
from django.test import Client, TestCase from django.test import Client, TestCase
from django.urls import reverse from django.urls import reverse
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from .common import completed_application from .common import MockEppLib, completed_application # type: ignore
from django_webtest import WebTest # type: ignore from django_webtest import WebTest # type: ignore
import boto3_mocking # type: ignore import boto3_mocking # type: ignore
@ -25,7 +25,6 @@ from registrar.models import (
from registrar.views.application import ApplicationWizard, Step from registrar.views.application import ApplicationWizard, Step
from .common import less_console_noise from .common import less_console_noise
from .common import MockEppLib
class TestViews(TestCase): class TestViews(TestCase):
@ -1133,7 +1132,7 @@ class TestDomainPermissions(TestWithDomainPermissions):
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
class TestDomainDetail(TestWithDomainPermissions, WebTest): class TestDomainDetail(TestWithDomainPermissions, WebTest, MockEppLib):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.app.set_user(self.user.username) self.app.set_user(self.user.username)
@ -1426,6 +1425,40 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
) )
self.assertContains(page, "Testy") self.assertContains(page, "Testy")
def test_domain_security_email_existing_security_contact(self):
"""Can load domain's security email page."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
domain_contact, _ = Domain.objects.get_or_create(name="freeman.gov")
# Add current user to this domain
_ = UserDomainRole(user=self.user, domain=domain_contact, role="admin").save()
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": domain_contact.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertContains(page, "security@mail.gov")
self.mockSendPatch.stop()
def test_domain_security_email_no_security_contact(self):
"""Loads a domain with no defined security email.
We should not show the default."""
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
page = self.client.get(
reverse("domain-security-email", kwargs={"pk": self.domain.id})
)
# Loads correctly
self.assertContains(page, "Domain security email")
self.assertNotContains(page, "dotgov@cisa.dhs.gov")
self.mockSendPatch.stop()
def test_domain_security_email(self): def test_domain_security_email(self):
"""Can load domain's security email page.""" """Can load domain's security email page."""
page = self.client.get( page = self.client.get(
@ -1433,10 +1466,8 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
) )
self.assertContains(page, "Domain security email") self.assertContains(page, "Domain security email")
@skip("Ticket 912 needs to fix this one")
def test_domain_security_email_form(self): def test_domain_security_email_form(self):
"""Adding a security email works. """Adding a security email works.
Uses self.app WebTest because we need to interact with forms. Uses self.app WebTest because we need to interact with forms.
""" """
security_email_page = self.app.get( security_email_page = self.app.get(
@ -1456,7 +1487,7 @@ class TestDomainDetail(TestWithDomainPermissions, WebTest):
self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id) self.app.set_cookie(settings.SESSION_COOKIE_NAME, session_id)
success_page = result.follow() success_page = result.follow()
self.assertContains( self.assertContains(
success_page, "The security email for this domain have been updated" success_page, "The security email for this domain has been updated"
) )
def test_domain_overview_blocked_for_ineligible_user(self): def test_domain_overview_blocked_for_ineligible_user(self):

View file

@ -259,7 +259,11 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
"""The initial value for the form.""" """The initial value for the form."""
domain = self.get_object() domain = self.get_object()
initial = super().get_initial() initial = super().get_initial()
initial["security_email"] = domain.security_contact.email security_contact = domain.security_contact
if security_contact is None or security_contact.email == "dotgov@cisa.dhs.gov":
initial["security_email"] = None
return initial
initial["security_email"] = security_contact.email
return initial return initial
def get_success_url(self): def get_success_url(self):
@ -288,7 +292,7 @@ class DomainSecurityEmailView(DomainPermissionView, FormMixin):
contact.save() contact.save()
messages.success( messages.success(
self.request, "The security email for this domain have been updated." self.request, "The security email for this domain has been updated."
) )
# superclass has the redirect # superclass has the redirect

View file

@ -63,9 +63,9 @@ class DomainPermission(PermissionsLoginMixin):
""" """
# Check if the user is permissioned... # Check if the user is permissioned...
user_is_analyst_or_superuser = ( user_is_analyst_or_superuser = self.request.user.has_perm(
self.request.user.is_staff or self.request.user.is_superuser "registrar.analyst_access_permission"
) ) or self.request.user.has_perm("registrar.full_access_permission")
if not user_is_analyst_or_superuser: if not user_is_analyst_or_superuser:
return False return False

View file

@ -33,7 +33,9 @@ class DomainPermissionView(DomainPermission, DetailView, abc.ABC):
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs) context = super().get_context_data(**kwargs)
user = self.request.user user = self.request.user
context["is_analyst_or_superuser"] = user.is_staff or user.is_superuser context["is_analyst_or_superuser"] = user.has_perm(
"registrar.analyst_access_permission"
) or user.has_perm("registrar.full_access_permission")
# Stored in a variable for the linter # Stored in a variable for the linter
action = "analyst_action" action = "analyst_action"
action_location = "analyst_action_location" action_location = "analyst_action_location"

View file

@ -22,7 +22,7 @@ django-phonenumber-field[phonenumberslite]==7.1.0
django-widget-tweaks==1.4.12 django-widget-tweaks==1.4.12
environs[django]==9.5.0 environs[django]==9.5.0
faker==18.10.0 faker==18.10.0
git+https://github.com/cisagov/epplib.git@f818cbf0b069a12f03e1d72e4b9f4900924b832d#egg=fred-epplib git+https://github.com/cisagov/epplib.git@d56d183f1664f34c40ca9716a3a9a345f0ef561c#egg=fred-epplib
furl==2.1.3 furl==2.1.3
future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3' future==0.18.3 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
gunicorn==20.1.0 gunicorn==20.1.0
@ -49,5 +49,5 @@ setuptools==67.8.0 ; python_version >= '3.7'
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
sqlparse==0.4.4 ; python_version >= '3.5' sqlparse==0.4.4 ; python_version >= '3.5'
typing-extensions==4.6.3 typing-extensions==4.6.3
urllib3==1.26.16 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' urllib3==1.26.17 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
whitenoise==6.4.0 whitenoise==6.4.0