Looping through changes made by user and deleting and creating and updating plus adding in ip addr work

This commit is contained in:
Rebecca Hsieh 2023-09-26 16:54:01 -07:00
parent 5523d06e59
commit d5dce308bb
No known key found for this signature in database
GPG key ID: 644527A2F375A379
3 changed files with 114 additions and 60 deletions

View file

@ -282,6 +282,7 @@ class Domain(TimeStampedModel, DomainHelper):
except RegistryError as e: except RegistryError as e:
logger.error("Error _create_host, code was %s error was %s" % (e.code, e)) logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code return e.code
def _convert_list_to_dict(self, listToConvert: list[tuple[str]]): def _convert_list_to_dict(self, listToConvert: list[tuple[str]]):
newDict={} newDict={}
for tup in listToConvert: for tup in listToConvert:
@ -298,7 +299,7 @@ class Domain(TimeStampedModel, DomainHelper):
returns tuple of four values as follows: returns tuple of four values as follows:
deleted_values: deleted_values:
updated_values: updated_values:
new_values: new_values: dict
oldNameservers:""" oldNameservers:"""
oldNameservers=self.nameservers oldNameservers=self.nameservers
@ -320,8 +321,10 @@ class Domain(TimeStampedModel, DomainHelper):
if newHostDict[prevHost] != addrs: if newHostDict[prevHost] != addrs:
updated_values.append((prevHost,newHostDict[prevHost])) updated_values.append((prevHost,newHostDict[prevHost]))
new_values=set(newHostDict)-set(previousHostDict) new_values=set(newHostDict)-set(previousHostDict) #returns actually a set
return (deleted_values,updated_values,new_values, oldNameservers)
final_new_values = dict.fromkeys(new_values, None)
return (deleted_values,updated_values,final_new_values, previousHostDict)
@nameservers.setter # type: ignore @nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]): def nameservers(self, hosts: list[tuple[str]]):
@ -344,47 +347,58 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info(hosts) logger.info(hosts)
#get the changes made by user and old nameserver values #get the changes made by user and old nameserver values
deleted_values,updated_values,new_values, oldNameservers=self.getNameserverChanges(hosts=hosts) deleted_values, updated_values, new_values, oldNameservers=self.getNameserverChanges(hosts=hosts)
successDeletedCount = 0
successCreatedCount = 0
count = 0 for hostTuple in deleted_values:
for hostTuple in hosts: deleted_response_code = self._delete_host(hostTuple[0])
addrs = None if deleted_response_code == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
host=hostTuple[0] successDeletedCount += 1
if len(hostTuple) > 1:
addrs = hostTuple[1:] # list of all the ip address
# TODO-848: Check if the host a .gov (do .split on the last item), isdotgov can be a boolean function for hostTuple in updated_values:
# TODO-848: if you are dotgov and don't have an IP address then raise error updated_response_code = self._updated_host(hostTuple[0], hostTuple[1], oldNameservers.get(hostTuple[0]))
# NOTE-848: TRY logger.info() or print()
for key, value in new_values.items():
createdCode = self._create_host(host=host, addrs=addrs) # creates in registry print("HELLO THERE KEY, VALUE PAIR")
print(key)
print(value)
createdCode = self._create_host(host=key, addrs=value) # creates in registry
# TODO-848: Double check if _create_host should handle duplicates + update domain obj? # TODO-848: Double check if _create_host should handle duplicates + update domain obj?
# NOTE-848: if createdCode == ErrorCode.OBJECT_EXISTS: --> self.nameservers # NOTE-848: if createdCode == ErrorCode.OBJECT_EXISTS: --> self.nameservers
count += 1
# NOTE-848: Host can be used by multiple domains # NOTE-848: Host can be used by multiple domains
if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY: if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY or createdCode == ErrorCode.OBJECT_EXISTS:
# NOTE-848: Add host to domain (domain already created, just adding to it)
request = commands.UpdateDomain( request = commands.UpdateDomain(
name=self.name, add=[epp.HostObjSet([host])] name=self.name, add=[epp.HostObjSet([key])]
) )
try: try:
registry.send(request, cleaned=True) registry.send(request, cleaned=True)
# count += 1 successCreatedCount += 1
except RegistryError as e: except RegistryError as e:
logger.error( logger.error(
"Error adding nameserver, code was %s error was %s" "Error adding nameserver, code was %s error was %s"
% (e.code, e) % (e.code, e)
) )
# elif createdCode == ErrorCode.OBJECT_EXISTS:
# count += 1 successTotalNameservers = len(oldNameservers) - successDeletedCount+ successCreatedCount
# unchangedValuesCount=len(oldNameservers)-len(deleted_values)+addedNameservers
print("SUCCESSTOTALNAMESERVERS IS ")
print(successTotalNameservers)
if successTotalNameservers < 2:
try: try:
print("COUNT IS ") print("DNS_NEEDED: We have less than 2 nameservers")
print(count) self.dns_needed()
if count >= 2 and count <= 13: self.save()
except Exception as err:
logger.info(
"nameserver setter checked for dns_needed state "
"and it did not succeed. Error: %s" % err
)
elif successTotalNameservers >= 2 and successTotalNameservers <= 13:
try:
print("READY/SAVE: We are in happy path where btwen 2 and 13 inclusive ns")
self.ready() self.ready()
self.save() self.save()
except Exception as err: except Exception as err:
@ -792,7 +806,7 @@ class Domain(TimeStampedModel, DomainHelper):
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST: if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
# avoid infinite loop # avoid infinite loop
already_tried_to_create = True already_tried_to_create = True
self.pendingCreate() self.dns_needed_from_unknown()
self.save() self.save()
else: else:
logger.error(e) logger.error(e)
@ -806,7 +820,7 @@ class Domain(TimeStampedModel, DomainHelper):
return registrant.registry_id return registrant.registry_id
@transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED) @transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED)
def pendingCreate(self): def dns_needed_from_unknown(self):
logger.info("Changing to dns_needed") logger.info("Changing to dns_needed")
registrantID = self.addRegistrant() registrantID = self.addRegistrant()
@ -862,32 +876,45 @@ class Domain(TimeStampedModel, DomainHelper):
# a child host is being used by # a child host is being used by
# another .gov domains. The host must be first removed # another .gov domains. The host must be first removed
# and/or renamed before the parent domain may be deleted. # and/or renamed before the parent domain may be deleted.
logger.info("pendingCreate()-> inside pending create") logger.info("dns_needed_from_unknown()-> inside pending create")
self._delete_domain() self._delete_domain()
# TODO - delete ticket any additional error handling here # TODO - delete ticket any additional error handling here
def is_dns_needed(self):
self._invalidate_cache()
nameserverList = self.nameservers
return len(nameserverList) < 2
@transition( @transition(
field="state", field="state",
source=[State.DNS_NEEDED], source=[State.DNS_NEEDED],
target=State.READY, target=State.READY,
conditions=[lambda x : not is_dns_needed]
) )
# 811 -- Rachid look at constraint on a transition, could just be a function
def ready(self): def ready(self):
"""Transition to the ready state """Transition to the ready state
domain should have nameservers and all contacts domain should have nameservers and all contacts
and now should be considered live on a domain and now should be considered live on a domain
""" """
# TODO - in nameservers tickets 848 and 562
# check here if updates need to be made
# consider adding these checks as constraints
# within the transistion itself
nameserverList = self.nameservers
logger.info("Changing to ready state") logger.info("Changing to ready state")
# TEST THIS -- assertValue or print (trigger this)
# if len(nameserverList) < 2 or len(nameserverList) > 13:
# raise ValueError("Not ready to become created, cannot transition yet")
logger.info("able to transition to ready state") logger.info("able to transition to ready state")
@transition(
field="state",
source=[State.READY],
target=State.DNS_NEEDED,
conditions=[is_dns_needed]
)
def dns_needed(self):
"""Transition to the DNS_NEEDED state
domain should NOT have nameservers but
SHOULD have all contacts
Going to check nameservers and will
result in an EPP call
"""
logger.info("Changing to DNS_NEEDED state")
logger.info("able to transition to DNS_NEEDED state")
def _disclose_fields(self, contact: PublicContact): def _disclose_fields(self, contact: PublicContact):
"""creates a disclose object that can be added to a contact Create using """creates a disclose object that can be added to a contact Create using
.disclose= <this function> on the command before sending. .disclose= <this function> on the command before sending.
@ -995,16 +1022,42 @@ class Domain(TimeStampedModel, DomainHelper):
raise e raise e
def _update_or_create_host(self, host): # TODO: Need to implement this
# maybe take out current code and put here def is_ipv6(self, ip: str):
raise NotImplementedError() return True
def _delete_host(self, host): def _convert_ips(self, ip_list: list[str]):
# if len(nameserver_list) < 2: edited_ip_list = []
# change from READY to DNS_NEEDED state for ip_addr in ip_list:
if is_ipv6:
edited_ip_list.append(command.Ip(addr=ip_addr, ip="v6"))
else: # default ip addr is v4
edited_ip_list.append(command.Ip(addr=ip_addr))
return edited_ip_list
# Check host to nameserver list, and then use delete command? def _update_host(self, nameserver: str, ip_list: list[str], old_ip_list: list[str]):
raise NotImplementedError() try:
if len(ip_list) == 0:
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
request = commands.UpdateHost(name=nameserver, add=self._convert_ips(ip_list), rem=self._convert_ips(old_ip_list))
response = registry.send(request, cleaned=True)
logger.info("_update_host()-> sending req as %s" % request)
return response.code
except RegistryError as e:
logger.error("Error _delete_host, code was %s error was %s" % (e.code, e))
return e.code
def _delete_host(self, nameserver: str):
try:
request = commands.DeleteHost(name=nameserver)
response = registry.send(request, cleaned=True)
logger.info("_delete_host()-> sending req as %s" % request)
return response.code
except RegistryError as e:
logger.error("Error _delete_host, code was %s error was %s" % (e.code, e))
return e.code
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False): def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
"""Contact registry for info about a domain.""" """Contact registry for info about a domain."""

View file

@ -555,6 +555,7 @@ class MockEppLib(TestCase):
hosts=..., hosts=...,
statuses=..., statuses=...,
avail=..., avail=...,
addrs=...,
): ):
self.auth_info = auth_info self.auth_info = auth_info
self.cr_date = cr_date self.cr_date = cr_date
@ -562,6 +563,7 @@ class MockEppLib(TestCase):
self.hosts = hosts self.hosts = hosts
self.statuses = statuses self.statuses = statuses
self.avail = avail #use for CheckDomain self.avail = avail #use for CheckDomain
self.addrs = addrs
mockDataInfoDomain = fakedEppObject( mockDataInfoDomain = fakedEppObject(
"fakepw", "fakepw",
@ -583,7 +585,7 @@ class MockEppLib(TestCase):
"anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35) "anotherPw", cr_date=datetime.datetime(2023, 7, 25, 19, 45, 35)
) )
mockDataInfoHosts = fakedEppObject( mockDataInfoHosts = fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35), addrs=["1.2.3", "2.3.4"]
) )
mockDataCreateHost =fakedEppObject( mockDataCreateHost =fakedEppObject(
"lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35) "lastPw", cr_date=datetime.datetime(2023, 8, 25, 19, 45, 35)

View file

@ -291,7 +291,7 @@ class TestRegistrantContacts(MockEppLib):
expectedSecContact = PublicContact.get_default_security() expectedSecContact = PublicContact.get_default_security()
expectedSecContact.domain = self.domain expectedSecContact.domain = self.domain
self.domain.pendingCreate() self.domain.dns_needed_from_unknown()
self.assertEqual(self.mockedSendFunction.call_count, 8) self.assertEqual(self.mockedSendFunction.call_count, 8)
self.assertEqual(PublicContact.objects.filter(domain=self.domain).count(), 4) self.assertEqual(PublicContact.objects.filter(domain=self.domain).count(), 4)
@ -334,7 +334,7 @@ class TestRegistrantContacts(MockEppLib):
created contact of type 'security' created contact of type 'security'
""" """
# make a security contact that is a PublicContact # make a security contact that is a PublicContact
self.domain.pendingCreate() # make sure a security email already exists self.domain.dns_needed_from_unknown() # make sure a security email already exists
expectedSecContact = PublicContact.get_default_security() expectedSecContact = PublicContact.get_default_security()
expectedSecContact.domain = self.domain expectedSecContact.domain = self.domain
expectedSecContact.email = "newEmail@fake.com" expectedSecContact.email = "newEmail@fake.com"
@ -553,8 +553,7 @@ class TestRegistrantNameservers(MockEppLib):
self.assertEqual(deleted_values, [('ns2.example.com', ['1.2.3'])]) self.assertEqual(deleted_values, [('ns2.example.com', ['1.2.3'])])
self.assertEqual(updated_values, [('ns3.example.com', ['1.2.4'])]) self.assertEqual(updated_values, [('ns3.example.com', ['1.2.4'])])
self.assertEqual(new_values, {'ns4.example.com'}) self.assertEqual(new_values, {'ns4.example.com'})
self.assertEqual(oldNameservers, [('ns1.example.com', None), ('ns2.example.com', ['1.2.3']), ('ns3.example.com', ['1.2.3'])]) self.assertEqual(oldNameservers, {'ns1.example.com': None, 'ns2.example.com': ['1.2.3'], 'ns3.example.com': ['1.2.3']})
def test_user_adds_one_nameserver(self): def test_user_adds_one_nameserver(self):
""" """