Merge pull request #1560 from cisagov/dk/1352-nameservers

Issue #1352 - Show saved nameservers if registry is down
This commit is contained in:
dave-kennedy-ecs 2023-12-29 16:19:11 -05:00 committed by GitHub
commit 1ce7f39dab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 157 additions and 34 deletions

View file

@ -1246,8 +1246,9 @@ admin.site.register(models.DomainInvitation, DomainInvitationAdmin)
admin.site.register(models.DomainInformation, DomainInformationAdmin)
admin.site.register(models.Domain, DomainAdmin)
admin.site.register(models.DraftDomain, DraftDomainAdmin)
admin.site.register(models.Host, MyHostAdmin)
admin.site.register(models.Nameserver, MyHostAdmin)
# Host and HostIP removed from django admin because changes in admin
# do not propogate to registry and logic not applied
# admin.site.register(models.Host, MyHostAdmin)
admin.site.register(models.Website, WebsiteAdmin)
admin.site.register(models.PublicContact, AuditedAdmin)
admin.site.register(models.DomainApplication, DomainApplicationAdmin)

View file

@ -0,0 +1,15 @@
# Generated by Django 4.2.7 on 2023-12-21 11:07
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("registrar", "0058_alter_domaininformation_options"),
]
operations = [
migrations.DeleteModel(
name="Nameserver",
),
]

View file

@ -7,7 +7,6 @@ from .draft_domain import DraftDomain
from .host_ip import HostIP
from .host import Host
from .domain_invitation import DomainInvitation
from .nameserver import Nameserver
from .user_domain_role import UserDomainRole
from .public_contact import PublicContact
from .user import User
@ -24,7 +23,6 @@ __all__ = [
"DomainInvitation",
"HostIP",
"Host",
"Nameserver",
"UserDomainRole",
"PublicContact",
"User",
@ -41,7 +39,6 @@ auditlog.register(DomainInvitation)
auditlog.register(DomainInformation)
auditlog.register(HostIP)
auditlog.register(Host)
auditlog.register(Nameserver)
auditlog.register(UserDomainRole)
auditlog.register(PublicContact)
auditlog.register(User, m2m_fields=["user_permissions", "groups"])

View file

@ -10,7 +10,8 @@ from django_fsm import FSMField, transition, TransitionNotAllowed # type: ignor
from django.db import models
from django.utils import timezone
from typing import Any
from registrar.models.host import Host
from registrar.models.host_ip import HostIP
from registrar.utility.errors import (
ActionNotAllowed,
@ -295,13 +296,15 @@ class Domain(TimeStampedModel, DomainHelper):
while non-subordinate hosts MUST NOT.
"""
try:
# attempt to retrieve hosts from registry and store in cache and db
hosts = self._get_property("hosts")
except Exception as err:
# Do not raise error when missing nameservers
# this is a standard occurence when a domain
# is first created
logger.info("Domain is missing nameservers %s" % err)
return []
except Exception:
# If exception raised returning hosts from registry, get from db
hosts = []
for hostobj in self.host.all():
host_name = hostobj.name
ips = [ip.address for ip in hostobj.ip.all()]
hosts.append({"name": host_name, "addrs": ips})
# TODO-687 fix this return value
hostList = []
@ -1607,6 +1610,8 @@ class Domain(TimeStampedModel, DomainHelper):
cache = self._extract_data_from_response(data_response)
cleaned = self._clean_cache(cache, data_response)
self._update_hosts_and_contacts(cleaned, fetch_hosts, fetch_contacts)
if fetch_hosts:
self._update_hosts_and_ips_in_db(cleaned)
self._update_dates(cleaned)
self._cache = cleaned
@ -1653,7 +1658,11 @@ class Domain(TimeStampedModel, DomainHelper):
return dnssec_data
def _update_hosts_and_contacts(self, cleaned, fetch_hosts, fetch_contacts):
"""Capture and store old hosts and contacts from cache if they don't exist"""
"""
Update hosts and contacts if fetch_hosts and/or fetch_contacts.
Additionally, capture and cache old hosts and contacts from cache if they
don't exist in cleaned
"""
old_cache_hosts = self._cache.get("hosts")
old_cache_contacts = self._cache.get("contacts")
@ -1668,6 +1677,50 @@ class Domain(TimeStampedModel, DomainHelper):
if old_cache_contacts is not None:
cleaned["contacts"] = old_cache_contacts
def _update_hosts_and_ips_in_db(self, cleaned):
"""Update hosts and host_ips in database if retrieved from registry.
Only called when fetch_hosts is True.
Parameters:
self: the domain to be updated with hosts and ips from cleaned
cleaned: dict containing hosts. Hosts are provided as a list of dicts, e.g.
[{"name": "ns1.example.com",}, {"name": "ns1.example.gov"}, "addrs": ["0.0.0.0"])]
"""
cleaned_hosts = cleaned["hosts"]
# Get all existing hosts from the database for this domain
existing_hosts_in_db = Host.objects.filter(domain=self)
# Identify hosts to delete
cleaned_host_names = set(cleaned_host["name"] for cleaned_host in cleaned_hosts)
hosts_to_delete_from_db = [
existing_host for existing_host in existing_hosts_in_db if existing_host.name not in cleaned_host_names
]
# Delete hosts and their associated HostIP instances
for host_to_delete in hosts_to_delete_from_db:
# Delete associated HostIP instances
HostIP.objects.filter(host=host_to_delete).delete()
# Delete the host itself
host_to_delete.delete()
# Update or create Hosts and HostIPs
for cleaned_host in cleaned_hosts:
# Check if the cleaned_host already exists
host_in_db, host_created = Host.objects.get_or_create(domain=self, name=cleaned_host["name"])
# Get cleaned list of ips for update
cleaned_ips = cleaned_host["addrs"]
if not host_created:
# Get all existing ips from the database for this host
existing_ips_in_db = HostIP.objects.filter(host=host_in_db)
# Identify IPs to delete
ips_to_delete_from_db = [
existing_ip for existing_ip in existing_ips_in_db if existing_ip.address not in cleaned_ips
]
# Delete IPs
for ip_to_delete in ips_to_delete_from_db:
# Delete the ip
ip_to_delete.delete()
# Update or create HostIP instances
for ip_address in cleaned_ips:
HostIP.objects.get_or_create(address=ip_address, host=host_in_db)
def _update_dates(self, cleaned):
"""Update dates (expiration and creation) from cleaned"""
requires_save = False

View file

@ -11,8 +11,8 @@ class Host(TimeStampedModel):
The registry is the source of truth for this data.
This model exists ONLY to allow a new registrant to draft DNS entries
before their application is approved.
This model exists to make hosts/nameservers and ip addresses
available when registry is not available.
"""
name = models.CharField(

View file

@ -10,8 +10,8 @@ class HostIP(TimeStampedModel):
The registry is the source of truth for this data.
This model exists ONLY to allow a new registrant to draft DNS entries
before their application is approved.
This model exists to make hosts/nameservers and ip addresses
available when registry is not available.
"""
address = models.CharField(

View file

@ -1,16 +0,0 @@
from .host import Host
class Nameserver(Host):
"""
A nameserver is a host which has been delegated to respond to DNS queries.
The registry is the source of truth for this data.
This model exists ONLY to allow a new registrant to draft DNS entries
before their application is approved.
"""
# there is nothing here because all of the fields are
# defined over there on the Host class
pass

View file

@ -47,6 +47,15 @@
{% if value|length == 1 %}
{% if users %}
<p class="margin-top-0">{{ value.0.user.email }} </p>
{% elif domains %}
{{ value.0.0 }}
{% if value.0.1 %}
({% spaceless %}
{% for addr in value.0.1 %}
{{addr}}{% if not forloop.last %}, {% endif %}
{% endfor %}
{% endspaceless %})
{% endif %}
{% else %}
<p class="margin-top-0">{{ value | first }} </p>
{% endif %}

View file

@ -7,7 +7,7 @@ from django.test import TestCase
from django.db.utils import IntegrityError
from unittest.mock import MagicMock, patch, call
import datetime
from registrar.models import Domain
from registrar.models import Domain, Host, HostIP
from unittest import skip
from registrar.models.domain_application import DomainApplication
@ -38,6 +38,8 @@ logger = logging.getLogger(__name__)
class TestDomainCache(MockEppLib):
def tearDown(self):
PublicContact.objects.all().delete()
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
super().tearDown()
@ -1511,6 +1513,62 @@ class TestRegistrantNameservers(MockEppLib):
with self.assertRaises(ActionNotAllowed):
domain.nameservers = [self.nameserver1, self.nameserver2]
def test_nameserver_returns_on_registry_error(self):
"""
Scenario: Nameservers previously set through EPP and stored in registrar's database.
Registry is unavailable and throws exception when attempting to build cache from
registry. Nameservers retrieved from database.
"""
domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
# set the host and host_ips directly in the database; this is normally handled through
# fetch_cache
host, _ = Host.objects.get_or_create(domain=domain, name="ns1.fake.gov")
host_ip, _ = HostIP.objects.get_or_create(host=host, address="1.1.1.1")
# mock that registry throws an error on the InfoHost send
def side_effect(_request, cleaned):
raise RegistryError(code=ErrorCode.COMMAND_FAILED)
patcher = patch("registrar.models.domain.registry.send")
mocked_send = patcher.start()
mocked_send.side_effect = side_effect
nameservers = domain.nameservers
self.assertEqual(len(nameservers), 1)
self.assertEqual(nameservers[0][0], "ns1.fake.gov")
self.assertEqual(nameservers[0][1], ["1.1.1.1"])
patcher.stop()
def test_nameservers_stored_on_fetch_cache(self):
"""
Scenario: Nameservers are stored in db when they are retrieved from fetch_cache.
Verify the success of this by asserting get_or_create calls to db.
The mocked data for the EPP calls returns a host name
of 'fake.host.com' from InfoDomain and an array of 2 IPs: 1.2.3.4 and 2.3.4.5
from InfoHost
"""
domain, _ = Domain.objects.get_or_create(name="fake.gov", state=Domain.State.READY)
# mock the get_or_create methods for Host and HostIP
with patch.object(Host.objects, "get_or_create") as mock_host_get_or_create, patch.object(
HostIP.objects, "get_or_create"
) as mock_host_ip_get_or_create:
# Set the return value for the mocks
mock_host_get_or_create.return_value = (Host(), True)
mock_host_ip_get_or_create.return_value = (HostIP(), True)
# force fetch_cache to be called, which will return above documented mocked hosts
domain.nameservers
# assert that the mocks are called
mock_host_get_or_create.assert_called_once_with(domain=domain, name="fake.host.com")
# Retrieve the mocked_host from the return value of the mock
actual_mocked_host, _ = mock_host_get_or_create.return_value
mock_host_ip_get_or_create.assert_called_with(address="2.3.4.5", host=actual_mocked_host)
self.assertEqual(mock_host_ip_get_or_create.call_count, 2)
@skip("not implemented yet")
def test_update_is_unsuccessful(self):
"""
@ -1529,6 +1587,8 @@ class TestRegistrantNameservers(MockEppLib):
domain.nameservers = [("ns1.failednameserver.gov", ["4.5.6"])]
def tearDown(self):
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
return super().tearDown()

View file

@ -28,6 +28,8 @@ from registrar.models import (
DomainInvitation,
Contact,
PublicContact,
Host,
HostIP,
Website,
UserDomainRole,
User,
@ -1173,6 +1175,8 @@ class TestWithDomainPermissions(TestWithUser):
DomainApplication.objects.all().delete()
DomainInformation.objects.all().delete()
PublicContact.objects.all().delete()
HostIP.objects.all().delete()
Host.objects.all().delete()
Domain.objects.all().delete()
UserDomainRole.objects.all().delete()
except ValueError: # pass if already deleted