working through being Created

This commit is contained in:
Alysia Broddrick 2023-08-23 06:23:11 -07:00
parent 099adb3dc2
commit c0969739b5
No known key found for this signature in database
GPG key ID: 03917052CD0F06B7
3 changed files with 253 additions and 70 deletions

View file

@ -371,4 +371,5 @@ admin.site.register(models.Domain, DomainAdmin)
admin.site.register(models.Host, MyHostAdmin)
admin.site.register(models.Nameserver, MyHostAdmin)
admin.site.register(models.Website, AuditedAdmin)
admin.site.register(models.PublicContact, AuditedAdmin)
admin.site.register(models.DomainApplication, DomainApplicationAdmin)

View file

@ -235,14 +235,77 @@ class Domain(TimeStampedModel, DomainHelper):
("ns2.example.com",),
("ns3.example.com",),
]
def _check_host(self,hostnames:list[str]):
""" check if host is available, True if available
returns boolean"""
checkCommand=commands.CheckHost(hostnames)
try:
response=registry.send(checkCommand,cleaned=True)
return response.res_data[0].avail
except RegistryError as err:
logger.warning("Couldn't check hosts %. Errorcode was %s, error was %s"%(hostnames),err.code, err)
return False
def _create_host(self, host,addrs):
"""Call _check_host first before using this function,
This creates the host object in the registry
doesn't add the created host to the domain
returns int response code"""
logger.info("_create_host()->addresses is NONE")
if not addrs is None:
logger.info("addresses is not None %s"%addrs)
addresses=[epp.Ip(addr=addr) for addr in addrs]
request = commands.CreateHost(name=host, addrs=addresses)
else:
logger.info("_create_host()-> address IS None")
request = commands.CreateHost(name=host)
#[epp.Ip(addr="127.0.0.1"), epp.Ip(addr="0:0:0:0:0:0:0:1", ip="v6")]
try:
logger.info("_create_host()-> sending req as %s"%request)
response=registry.send(request, cleaned=True)
return response.code
except RegistryError as e:
logger.error("Error _create_host, code was %s error was %s" % (e.code, e))
return e.code
@nameservers.setter # type: ignore
def nameservers(self, hosts: list[tuple[str]]):
"""host should be a tuple of type str, str,... where the elements are
Fully qualified host name, addresses associated with the host
example: [(ns1.okay.gov, 127.0.0.1, others ips)]"""
# TODO: call EPP to set this info.
# if two nameservers change state to created, don't do it automatically
hostSuccessCount=0
if len(hosts)>13:
raise ValueError("Too many hosts provided, you may not have more than 13 nameservers.")
logger.info("hosts will follow")
logger.info(hosts)
for hostTuple in hosts:
print("hostTuple is %s"% str(hostTuple))
host=hostTuple[0]
addrs=None
if len(hostTuple)>1:
addrs=hostTuple[1:]
avail=self._check_host([host])
if avail:
createdCode=self._create_host(host=host, addrs=addrs)
if createdCode==ErrorCode.OBJECT_EXISTS:
hostSuccessCount+=1
#update the object instead
elif createdCode==ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
#add host to domain
request = commands.UpdateDomain(name=self.name, add=[epp.HostObjSet([host])])
self.state=Domain.State.CREATED
pass
try:
registry.send(request, cleaned=True)
hostSuccessCount+=1
except RegistryError as e:
logger.error("Error adding nameserver, code was %s error was %s" % (e.code, e))
if self.state==self.State.PENDING_CREATE and hostSuccessCount>=2:
self.created()
##TODO - handle removed nameservers here will need to change the state go back to pending_create
@Cache
def statuses(self) -> list[str]:
@ -253,7 +316,12 @@ class Domain(TimeStampedModel, DomainHelper):
"""
# implementation note: the Status object from EPP stores the string in
# a dataclass property `state`, not to be confused with the `state` field here
raise NotImplementedError()
if not "statuses" in self._cache:
self._fetch_cache()
if not "statuses"in self._cache:
raise Exception("Can't retreive status from domain info")
else:
return self._cache["statuses"]
@statuses.setter # type: ignore
def statuses(self, statuses: list[str]):
@ -261,7 +329,20 @@ class Domain(TimeStampedModel, DomainHelper):
# can be combined; check that here and raise errors for invalid combinations -
# some statuses cannot be set by the client at all
raise NotImplementedError()
# ### implement get status which checks the status of the domain object on error it logs but goes with whatever the status is
# def get_status(self):
# try:
# DomainInfoReq
# response=send
# response.statuses
# for status in status:
# if status==serverhold and self.state!=serverhld
# transition to serverhold
# if status ==client & self.state!=clientHold:
# transition to clienthold
# except:
# logger
# return self.state
@Cache
def registrant_contact(self) -> PublicContact:
"""Get or set the registrant for this domain."""
@ -272,13 +353,9 @@ class Domain(TimeStampedModel, DomainHelper):
"""Registrant is set when a domain is created, so follow on additions will update the current registrant"""
###incorrect should update an existing registrant
logger.info("making registrant contact")
# if contact.contact_type!=contact.ContactTypeChoices.REGISTRANT:
# raise ValueError("Cannot set a registrant contact with a different contact type")
# logger.info("registrant_contact()-> update domain with registrant contact")
# self._update_domain_with_contact(contact, rem=False)
#req= updated contact
#send req
#handle error poorly
self._set_singleton_contact(contact=contact, expectedType=contact.ContactTypeChoices.REGISTRANT)
@Cache
def administrative_contact(self) -> PublicContact:
"""Get or set the admin contact for this domain."""
@ -294,6 +371,7 @@ class Domain(TimeStampedModel, DomainHelper):
if contact.contact_type!=contact.ContactTypeChoices.ADMINISTRATIVE:
raise ValueError("Cannot set a registrant contact with a different contact type")
logger.info("administrative_contact()-> update domain with admin contact")
self._make_contact_in_registry(contact=contact)
self._update_domain_with_contact(contact, rem=False)
@ -302,6 +380,22 @@ class Domain(TimeStampedModel, DomainHelper):
contact = PublicContact.get_default_security()
contact.domain = self
return contact
def _update_epp_contact(self, contact:PublicContact):
"""Sends UpdateContact to update the actual contact object, domain object remains unaffected
should be used when changing email address or other contact infor on an existing domain"""
updateContact=commands.UpdateContact(id=contact.registry_id, postal_info=self._make_epp_contact_postal_info(contact=contact),
email=contact.email,
voice=contact.voice,
fax=contact.fax)
try:
registry.send(updateContact, cleaned=True)
except RegistryError as e:
logger.error("Error updating contact, code was %s error was %s" % (e.code, e))
#add more error handling here
#ticket for error handling in epp
def _update_domain_with_contact(self, contact:PublicContact,rem=False):
logger.info("received type %s " % contact.contact_type)
domainContact=epp.DomainContact(contact=contact.registry_id,type=contact.contact_type)
@ -314,7 +408,12 @@ class Domain(TimeStampedModel, DomainHelper):
try:
registry.send(updateDomain, cleaned=True)
except RegistryError as e:
logger.error("Error removing old security contact code was %s error was %s" % (e.code, e))
logger.error("Error changing contact on a domain. Error code is %s error was %s" % (e.code, e))
action="add"
if rem:
action="remove"
raise Exception("Can't %s the contact of type %s"%( action, contact.contact_type))
@Cache
@ -357,6 +456,86 @@ class Domain(TimeStampedModel, DomainHelper):
#TODO - below line never executes with current logic
return self.get_default_security_contact()
def _add_registrant_to_existing_domain(self, contact: PublicContact):
self._update_epp_contact(contact=contact)
updateDomain=commands.UpdateDomain(name=self.name, registrant=contact.registry_id )
try:
registry.send(updateDomain, cleaned=True)
except RegistryError as e:
logger.error("Error changing to new registrant error code is %s, error is %s" % (e.code, e))
#TODO-error handling better here?
def _set_singleton_contact(self, contact: PublicContact, expectedType:str):
""""""
logger.info("_set_singleton_contact()-> contactype type being set: %s expected type is: %s"%(contact, expectedType))
if expectedType!=contact.contact_type:
raise ValueError("Cannot set a contact with a different contact type, expected type was %s"% expectedType)
isRegistrant=contact.contact_type==contact.ContactTypeChoices.REGISTRANT
domainContactExists = PublicContact.objects.filter(registry_id=contact.registry_id).exists()
contactIsAlreadyOnDomain = PublicContact.objects.filter(domain=self,registry_id=contact.registry_id,contact_type=contact.contact_type ).exists()
contactOfTypeExists = PublicContact.objects.filter(domain=self,contact_type=contact.contact_type ).exists()
#get publicContact objects that have the matching domain and type but a different id, should be only one
hasOtherContact = PublicContact.objects.exclude(registry_id=contact.registry_id).filter(domain=self,contact_type=contact.contact_type ).exists()
logger.info("has other contact %s"%hasOtherContact)
##if no record exists with this contact type
logger.info("_set_singleton_contact()-> adding contact that shouldn't exist already")
#make contact in registry, duplicate and errors handled there
errorCode= self._make_contact_in_registry(contact)
# if contact.contact_type==contact.ContactTypeChoices.REGISTRANT:
# logger.info("_set_singleton_contact()-> creating the registrant")
# self._make_contact_in_registry(contact)
# else:
# logger.info("_set_singleton_contact()-> updating domain with the new contact")
# self._update_domain_with_contact(contact, rem=False)
#contact is already added to the domain, but something has changed on it
#TODO - check here if contact already exists on domain in registry
#if domain has registrant and type is registrant this will be true,
#if type is anything else it should be in the contact list
alreadyExistsInRegistry=errorCode==ErrorCode.OBJECT_EXISTS
#if an error occured besides duplication, stop
if not alreadyExistsInRegistry and errorCode!= ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
raise Exception("Unable to add contact to registry")
#contact doesn't exist on the domain yet
logger.info("_set_singleton_contact()-> contact has been added to the registry")
#if has conflicting contacts in our db remove them
if hasOtherContact:
logger.info("_set_singleton_contact()-> updating domain by removing old contact and adding new one")
existing_contact=PublicContact.objects.exclude(registry_id=contact.registry_id).filter(domain=self,contact_type=contact.contact_type ).get()
if isRegistrant:
#send update domain only for registant contacts
existing_contact.delete()
self._add_registrant_to_existing_domain(contact)
else:
#remove the old contact and add a new one
try:
self._update_domain_with_contact(contact=existing_contact, rem=True)
existing_contact.delete()
except Exception as err:
logger.error("Raising error after removing and adding a new contact")
raise(err)
#if just added to registry and not a registrant add contact to domain
if not alreadyExistsInRegistry and not isRegistrant:
self._update_domain_with_contact(contact=contact, rem=False)
#if already exists just update
elif alreadyExistsInRegistry:
self._update_epp_contact(contact=contact)
@security_contact.setter # type: ignore
def security_contact(self, contact: PublicContact):
"""makes the contact in the registry,
@ -364,14 +543,8 @@ class Domain(TimeStampedModel, DomainHelper):
from domain information (not domain application)
and should have the security email from DomainApplication"""
logger.info("making security contact in registry")
if contact.contact_type!=contact.ContactTypeChoices.SECURITY:
raise ValueError("Cannot set a security contact with a different contact type")
logger.info("security_contact()-> update domain with secutity contact")
self._update_domain_with_contact(contact, rem=False)
##TODO- delete old security contact if one exists??
self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.SECURITY)
@Cache
def technical_contact(self) -> PublicContact:
@ -381,17 +554,15 @@ class Domain(TimeStampedModel, DomainHelper):
@technical_contact.setter # type: ignore
def technical_contact(self, contact: PublicContact):
logger.info("making technical contact")
if contact.contact_type!=contact.ContactTypeChoices.TECHNICAL:
raise ValueError("Cannot set a technical contact with a different contact type")
logger.info("technical_contact()-> update domain with technical contact")
self._update_domain_with_contact(contact, rem=False)
self._set_singleton_contact(contact, expectedType=contact.ContactTypeChoices.TECHNICAL)
def is_active(self) -> bool:
"""Is the domain live on the inter webs?"""
# TODO: implement a check -- should be performant so it can be called for
# any number of domains on a status page
# this is NOT as simple as checking if Domain.Status.OK is in self.statuses
return False
"""Currently just returns if the state is created, because then it should be live, theoretically.
Post mvp this should indicate
Is the domain live on the inter webs?
could be replaced with request to see if ok status is set
"""
return self.state==self.State.CREATED
def transfer(self):
"""Going somewhere. Not implemented."""
@ -493,7 +664,7 @@ class Domain(TimeStampedModel, DomainHelper):
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
# avoid infinite loop
already_tried_to_create = True
self._make_domain_in_registry()
self.pendingCreate()
else:
logger.error(e)
logger.error(e.code)
@ -503,11 +674,12 @@ class Domain(TimeStampedModel, DomainHelper):
def pendingCreate(self):
logger.info("In make domain in registry ")
registrant = PublicContact.get_default_registrant()
self._make_contact_in_registry(registrant)
registrant.domain=self
registrant.save() ##calls the registrant_contact.setter
logger.info("registrant is %s" % registrant)
#TODO-notes no chg item for registrant in the epplib should
security_contact=PublicContact.get_default_security()
security_contact=self.get_default_security_contact()
req = commands.CreateDomain(
name=self.name,
@ -521,19 +693,18 @@ class Domain(TimeStampedModel, DomainHelper):
try:
response=registry.send(req, cleaned=True)
logger.info(response)
except RegistryError as err:
if err.code!=ErrorCode.OBJECT_EXISTS:
raise err
logger.info("_get_or_create_domain()-> registry received create for "+self.name)
logger.info(response)
# no error, so go ahead and add a security contact
self.security_contact=security_contact
def testSettingAllContacts(self):
security_contact.save()
self.save()
def testSettingOtherContacts(self):
##delete this funciton
logger.info("testSettingAllContacts")
security_contact=PublicContact.get_default_security()
security_contact.domain=self
technical_contact=PublicContact.get_default_technical()
technical_contact.domain=self
administrative_contact=PublicContact.get_default_administrative()
@ -543,12 +714,6 @@ class Domain(TimeStampedModel, DomainHelper):
technical_contact.save()
administrative_contact.save()
try:
logger.info("setting registrant")
self.registrant_contact=PublicContact.get_default_registrant()
except Exception as err:
logger.info(err.code)
logger.info(err)
@transition(field="state", source=State.PENDING_CREATE, target=State.CLIENT_HOLD)
def clientHold(self):
@ -563,14 +728,27 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("pendingCreate()-> inside pending create")
pass
#TODO - send delete here
@transition(field="state", source=[State.PENDING_CREATE, State.SERVER_HOLD, State.CLIENT_HOLD], target=State.CREATED)
def created(self):
logger.info("created()-> inside setting create")
def _make_contact_in_registry(self, contact: PublicContact):
"""Create the contact in the registry, ignore duplicate contact errors"""
logger.info(contact)
logger.info(contact.registry_id)
create = commands.CreateContact(
id=contact.registry_id,
postal_info=epp.PostalInfo( # type: ignore
#TODO - do anything else here?
def _disclose_fields(self,isSecurity=False):
"""creates a disclose object that can be added to a contact Create using
.disclose= <this function> on the command before sending.
if item is security email then make sure email is visable"""
DF = epp.DiscloseField
fields={DF.FAX, DF.VOICE, DF.ADDR}
if not isSecurity:
fields.add(DF.EMAIL)
return epp.Disclose(
flag=False,
fields={DF.FAX, DF.VOICE, DF.ADDR},
types={DF.ADDR: "loc"},
)
def _make_epp_contact_postal_info(self, contact:PublicContact):
return epp.PostalInfo( # type: ignore
name=contact.name,
addr=epp.ContactAddr(
street=[
@ -585,36 +763,38 @@ class Domain(TimeStampedModel, DomainHelper):
),
org=contact.org,
type="loc",
),
)
def _make_contact_in_registry(self, contact: PublicContact):
"""Create the contact in the registry, ignore duplicate contact errors
returns int corresponding to ErrorCode values"""
logger.info(contact)
logger.info(contact.registry_id)
create = commands.CreateContact(
id=contact.registry_id,
postal_info=self._make_epp_contact_postal_info(contact=contact),
email=contact.email,
voice=contact.voice,
fax=contact.fax,
auth_info=epp.ContactAuthInfo(pw="2fooBAR123fooBaz"),
)
# security contacts should only show email addresses, for now
if (
contact.contact_type
== PublicContact.ContactTypeChoices.SECURITY
):
DF = epp.DiscloseField
create.disclose = epp.Disclose(
flag=False,
fields={DF.FAX, DF.VOICE, DF.ADDR},
types={DF.ADDR: "loc"},
)
create.disclose=self._disclose_fields(isSecurity=contact.contact_type==contact.ContactTypeChoices.SECURITY)
try:
logger.info("sending contact")
registry.send(create, cleaned=True)
return contact
return ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY
except RegistryError as err:
#don't throw an error if it is just saying this is a duplicate contact
if err.code!=ErrorCode.OBJECT_EXISTS:
logger.error("Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s",contact.registry_id, contact.contact_type, err.code, err)
#TODO - Error handling here
else:
logger.warning("Registrar tried to create duplicate contact for id %s",contact.registry_id)
return err.code
def _request_contact_info(self, contact: PublicContact):
req = commands.InfoContact(id=contact.registry_id)
return registry.send(req, cleaned=True).res_data[0]
@ -629,7 +809,9 @@ class Domain(TimeStampedModel, DomainHelper):
if e.code == ErrorCode.OBJECT_DOES_NOT_EXIST:
logger.info("_get_or_create_contact()-> contact doesn't exist so making it")
return self._make_contact_in_registry(contact=contact)
contact.domain=self
contact.save()#this will call the function based on type of contact
return self._request_contact_info(contact=contact)
else:
logger.error("Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s",contact.registry_id, contact.contact_type, err.code, err)

View file

@ -23,8 +23,8 @@ class PublicContact(TimeStampedModel):
"""These are the types of contacts accepted by the registry."""
REGISTRANT = "registrant", "Registrant"
ADMINISTRATIVE = "administrative", "Administrative"
TECHNICAL = "technical", "Technical"
ADMINISTRATIVE = "admin", "Administrative"
TECHNICAL = "tech", "Technical"
SECURITY = "security", "Security"
def save(self, *args, **kwargs):