fixed error with save inside of transition

This commit is contained in:
Alysia Broddrick 2023-09-10 14:22:43 -07:00
parent 53e9d090d9
commit a9f608e353
No known key found for this signature in database
GPG key ID: 03917052CD0F06B7
5 changed files with 209 additions and 79 deletions

View file

@ -4,6 +4,7 @@ from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.contrib.contenttypes.models import ContentType
from django.http.response import HttpResponseRedirect
from django.urls import reverse
from registrar.models.public_contact import PublicContact
from registrar.models.utility.admin_sort_fields import AdminSortFields
from . import models
@ -150,6 +151,12 @@ class DomainAdmin(ListHeaderAdmin):
GET_SECURITY_EMAIL = "_get_security_email"
SET_SECURITY_CONTACT = "_set_security_contact"
MAKE_DOMAIN = "_make_domain_in_registry"
MAKE_NAMESERVERS = "_make_nameservers"
GET_NAMESERVERS="_get_nameservers"
GET_STATUS = "_get_status"
SET_CLIENT_HOLD="_set_client_hold"
REMOVE_CLIENT_HOLD="_rem_client_hold"
DELETE_DOMAIN="_delete_domain"
logger.info("in response")
if ACTION_BUTTON in request.POST:
logger.info("in action button")
@ -171,29 +178,39 @@ class DomainAdmin(ListHeaderAdmin):
if GET_SECURITY_EMAIL in request.POST:
try:
security_email = obj.get_security_email()
contacts=obj._get_property("contacts")
email=None
for contact in contacts:
if ["type","email"] in contact.keys() and contact["type"]=="security":
email=contact["email"]
if email is None:
raise ValueError("Security contact type is not available on this domain")
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("The security email is %" ". Thanks!") % security_email,
("The security email is %s" ". Thanks!") % email,
)
return HttpResponseRedirect(".")
if SET_SECURITY_CONTACT in request.POST:
try:
security_contact = obj.get_default_security_contact()
security_contact.email = "ab@test.gov"
fake_email="manuallyEnteredEmail@test.gov"
if PublicContact.objects.filter(domain=obj, contact_type="security").exists():
sec_contact=PublicContact.objects.filter(domain=obj, contact_type="security").get()
else:
sec_contact=obj.get_default_security_contact()
sec_contact.email=fake_email
sec_contact.save()
obj.security_contact = security_contact
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("The security email is %" ". Thanks!") % security_email,
("The security email is %" ". Thanks!") % fake_email,
)
print("above make domain")
@ -207,31 +224,99 @@ class DomainAdmin(ListHeaderAdmin):
else:
self.message_user(
request,
("Domain created with %" ". Thanks!") % obj.name,
("Domain created with %s" ". Thanks!") % obj.name,
)
return HttpResponseRedirect(".")
#make nameservers here
if MAKE_NAMESERVERS in request.POST:
print("in make domain")
try:
hosts=[("ns1.example.com",None),("ns2.example.com",None) ]
obj.nameservers=hosts
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Hosts set to be %s" ". Thanks!") % hosts,
)
return HttpResponseRedirect(".")
if GET_NAMESERVERS in request.POST:
print("in make domain")
try:
nameservers=obj.nameservers
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Nameservers are %s" ". Thanks!") % nameservers,
)
return HttpResponseRedirect(".")
if GET_STATUS in request.POST:
print("in make domain")
try:
statuses=obj.statuses
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Domain statuses are %s" ". Thanks!") % statuses,
)
return HttpResponseRedirect(".")
if SET_CLIENT_HOLD in request.POST:
print("in make domain")
try:
obj.clientHold()
obj.save()
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Domain %s is now in clientHold") % obj.name,
)
return HttpResponseRedirect(".")
if REMOVE_CLIENT_HOLD in request.POST:
print("in make domain")
try:
obj.revertClientHold()
obj.save()
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Domain %s will now have client hold removed") % obj.name,
)
return HttpResponseRedirect(".")
if DELETE_DOMAIN in request.POST:
print("in make domain")
try:
obj.deleted()
obj.save()
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
self.message_user(
request,
("Domain %s Should now be deleted " ". Thanks!") % obj.name,
)
return HttpResponseRedirect(".")
return super().response_change(request, obj)
# def response_change(self, request, obj):
# ACTION_BUTTON = "_get_security_email"
# if ACTION_BUTTON in request.POST:
# try:
# obj.security
# except Exception as err:
# self.message_user(request, err, messages.ERROR)
# else:
# self.message_user(
# request,
# (
# "%s is in client hold. This domain is no longer accessible on"
# " the public internet."
# )
# % obj.name,
# )
# return HttpResponseRedirect(".")
# return super().response_change(request, obj)
class ContactAdmin(ListHeaderAdmin):

View file

@ -62,7 +62,7 @@ class Domain(TimeStampedModel, DomainHelper):
SERVER_DELETE_PROHIBITED = "serverDeleteProhibited"
# DNS delegation information MUST NOT be published for the object.
CLIENT_HOLD = "clientHold"
ON_HOLD = "clientHold"
SERVER_HOLD = "serverHold"
# Requests to renew the object MUST be rejected.
@ -96,7 +96,7 @@ class Domain(TimeStampedModel, DomainHelper):
# for human review or third-party action. A transform command that
# is processed, but whose requested action is pending, is noted with
# response code 1001.
PENDING_CREATE = "pendingCreate"
DNS_NEEDED = "pendingCreate"
PENDING_DELETE = "pendingDelete"
PENDING_RENEW = "pendingRenew"
PENDING_TRANSFER = "pendingTransfer"
@ -109,16 +109,14 @@ class Domain(TimeStampedModel, DomainHelper):
UNKNOWN = "unknown"
# The domain object exists in the registry but nameservers don't exist for it yet
PENDING_CREATE = "pending create"
DNS_NEEDED = "dns needed"
# Domain has had nameservers set, may or may not be active
CREATED = "created"
READY = "ready"
# Registrar manually changed state to client hold
CLIENT_HOLD = "client hold"
ON_HOLD = "client hold"
# Registry
SERVER_HOLD = "server hold"
# previously existed but has been deleted from the registry
DELETED = "deleted"
@ -227,21 +225,21 @@ class Domain(TimeStampedModel, DomainHelper):
Subordinate hosts (something.your-domain.gov) MUST have IP addresses,
while non-subordinate hosts MUST NOT.
"""
try:
hosts = self._get_property("hosts")
except KeyError as err:
logger.info("Domain is missing nameservers")
return None
hostList = []
for host in hosts:
logger.info(host)
# TODO - this should actually have a second tuple value with the ip address
# ignored because uncertain if we will even have a way to display mult.
# and adresses can be a list of mult address
hostList.append((host.name,))
hostList.append((host["name"],))
print(hostList)
return [
("ns1.example.com",),
("ns2.example.com",),
("ns3.example.com",),
]
return hostList
def _check_host(self, hostnames: list[str]):
"""check if host is available, True if available
@ -265,7 +263,7 @@ class Domain(TimeStampedModel, DomainHelper):
returns int response code"""
logger.info("_create_host()->addresses is NONE")
if not addrs is None:
if not addrs is None and addrs!=[]:
logger.info("addresses is not None %s" % addrs)
addresses = [epp.Ip(addr=addr) for addr in addrs]
request = commands.CreateHost(name=host, addrs=addresses)
@ -323,10 +321,10 @@ class Domain(TimeStampedModel, DomainHelper):
% (e.code, e)
)
if self.state == self.State.PENDING_CREATE and hostSuccessCount >= 2:
if self.state == self.State.DNS_NEEDED and hostSuccessCount >= 2:
self.created()
self.save()
##TODO - handle removed nameservers here will need to change the state go back to pending_create
##TODO - handle removed nameservers here will need to change the state go back to DNS_NEEDED
@Cache
def statuses(self) -> list[str]:
@ -634,7 +632,7 @@ class Domain(TimeStampedModel, DomainHelper):
# add new contact
security_contact = self.get_default_security_contact()
security_contact.save()
logger.info("done with contacts")
@security_contact.setter # type: ignore
def security_contact(self, contact: PublicContact):
"""makes the contact in the registry,
@ -665,7 +663,7 @@ class Domain(TimeStampedModel, DomainHelper):
Is the domain live on the inter webs?
could be replaced with request to see if ok status is set
"""
return self.state == self.State.CREATED
return self.state == self.State.READY
def transfer(self):
"""Going somewhere. Not implemented."""
@ -675,18 +673,31 @@ class Domain(TimeStampedModel, DomainHelper):
"""Time to renew. Not implemented."""
raise NotImplementedError()
def place_client_hold(self):
"""This domain should not be active."""
raise NotImplementedError("This is not implemented yet.")
def get_security_email(self):
logger.info("get_security_email-> getting the contact ")
secContact = self.security_contact
return secContact.email
def remove_client_hold(self):
"""This domain is okay to be active."""
raise NotImplementedError()
def clientHoldStatus(self):
return epp.Status(state=self.Status.ON_HOLD, description="", lang="en")
def _place_client_hold(self):
"""This domain should not be active.
may raises RegistryError, should be caught or handled correctly by caller """
request=commands.UpdateDomain(name=self.name,add=[self.clientHoldStatus()])
registry.send(request)
def _remove_client_hold(self):
"""This domain is okay to be active.
may raises RegistryError, should be caught or handled correctly by caller"""
request=commands.UpdateDomain(name=self.name,rem=[self.clientHoldStatus() ])
registry.send(request)
def _delete_domain(self):
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
request=commands.DeleteDomain(name=self.name)
registry.send(request)
def __str__(self) -> str:
return self.name
@ -747,8 +758,9 @@ class Domain(TimeStampedModel, DomainHelper):
def _get_or_create_domain(self):
"""Try to fetch info about this domain. Create it if it does not exist."""
already_tried_to_create = False
exitEarly=False
count = 0
while not already_tried_to_create and count < 3:
while not exitEarly and count < 3:
try:
logger.info(
"_get_or_create_domain()-> getting info on the domain, should hit an error"
@ -756,7 +768,7 @@ class Domain(TimeStampedModel, DomainHelper):
req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0]
already_tried_to_create = True
exitEarly=True
return domainInfo
except RegistryError as e:
count += 1
@ -775,20 +787,24 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e)
logger.error(e.code)
raise e
def addRegistrant(self):
@transition(field="state", source=State.UNKNOWN, target=State.PENDING_CREATE)
def pendingCreate(self):
logger.info("In make domain in registry ")
registrant = PublicContact.get_default_registrant()
registrant.domain = self
registrant.save() ##calls the registrant_contact.setter
logger.info("registrant is %s" % registrant)
return registrant.registry_id
@transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED)
def pendingCreate(self):
logger.info("In make domain in registry ")
registrantID=self.addRegistrant()
# TODO-notes no chg item for registrant in the epplib should
req = commands.CreateDomain(
name=self.name,
registrant=registrant.registry_id,
registrant=registrantID,
auth_info=epp.DomainAuthInfo(pw="2fooBAR123fooBaz"), # not a password
)
logger.info("_get_or_create_domain()-> about to send domain request")
@ -833,29 +849,42 @@ class Domain(TimeStampedModel, DomainHelper):
# technical_contact.save()
# administrative_contact.save()
@transition(field="state", source=State.PENDING_CREATE, target=State.CLIENT_HOLD)
@transition(field="state", source=State.DNS_NEEDED, target=State.ON_HOLD)
def clientHold(self):
##TODO - check to see if client hold is allowed should happen outside of this function
# (check prohibited statuses)
logger.info("clientHold()-> inside clientHold")
pass
# TODO -send clientHold here
self._place_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=State.CLIENT_HOLD, target=State.DELETED)
@transition(field="state", source=State.ON_HOLD, target=State.DNS_NEEDED)
def revertClientHold(self):
##TODO - check to see if client hold is allowed should happen outside of this function
# (check prohibited statuses)
logger.info("clientHold()-> inside clientHold")
self._remove_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=State.ON_HOLD, target=State.DELETED)
def deleted(self):
logger.info("pendingCreate()-> inside pending create")
pass
# TODO - send delete here
self._delete_domain()
# TODO - delete ticket any additional error handling here
@transition(
field="state",
source=[State.PENDING_CREATE, State.SERVER_HOLD, State.CLIENT_HOLD],
target=State.CREATED,
source=[State.DNS_NEEDED],
target=State.READY,
)
def created(self):
logger.info("created()-> inside setting create")
# TODO - do anything else here?
# TODO - in nameservers ticket check if has everything for creation
#admin, tech and security
#2 or more (but less than 13) nameservers
#if any of the above is violated raise the user raise a human readable error
#not there is another ticket for error handling keep a todo here if
#user friendly error portion is postponed
def _disclose_fields(self, contact: PublicContact):
"""creates a disclose object that can be added to a contact Create using
@ -966,8 +995,8 @@ class Domain(TimeStampedModel, DomainHelper):
"Registry threw error for contact id %s contact type is %s, error code is\n %s full error is %s",
contact.registry_id,
contact.contact_type,
err.code,
err,
e.code,
e,
)
raise e
@ -1003,6 +1032,9 @@ class Domain(TimeStampedModel, DomainHelper):
cleaned = {k: v for k, v in cache.items() if v is not ...}
logger.info("_fetch_cache()-> cleaned is " + str(cleaned))
#statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
cleaned["statuses"]=[status.state for status in cleaned["statuses"]]
# get contact info, if there are any
if (
# fetch_contacts and
@ -1011,11 +1043,14 @@ class Domain(TimeStampedModel, DomainHelper):
and len(cleaned["_contacts"])
):
cleaned["contacts"] = []
for id in cleaned["_contacts"]:
for domainContact in cleaned["_contacts"]:
# we do not use _get_or_create_* because we expect the object we
# just asked the registry for still exists --
# if not, that's a problem
req = commands.InfoContact(id=id)
#TODO- discuss-should we check if contact is in public contacts
#and add it if not- this is really to keep in mine the transisiton
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
# extract properties from response
@ -1024,6 +1059,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info(data)
contact = {
"id": id,
"type":domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),

View file

@ -4,9 +4,14 @@
<div class="submit-row">
<input type="submit" value="Place hold" name="_place_client_hold">
<input type="submit" value="Get the email" name="_get_security_email">
<input type="submit" value="Set Security Contact" name="_set_security_contact">
<input type="submit" value="Create the domain obj" name="_make_domain_in_registry">
<input type="submit" value="Set New Security Contact" name="_set_security_contact">
<input type="submit" value="Create the domain obj" name="_make_domain_in_registry">
<input type="submit" value="add nameservers" name="_make_nameservers">
<input type="submit" value="get nameservers" name="_get_nameservers">
<input type="submit" value="get status" name="_get_status">
<input type="submit" value="add nameservers" name="_set_client_hold">
<input type="submit" value="get nameservers" name="_rem_client_hold">
<input type="submit" value="get status" name="_delete_domain">
</div>
{{ block.super }}
{% endblock %}

View file

@ -63,7 +63,7 @@ class MockEppLib(TestCase):
and self.mockedSendFunction.call_count == 3
):
print("raising error")
print()
raise RegistryError(code=ErrorCode.OBJECT_EXISTS)
return MagicMock(res_data=[self.mockDataInfoHosts])

View file

@ -135,6 +135,10 @@ class DomainNameserversView(DomainPermissionView, FormMixin):
def get_initial(self):
"""The initial value for the form (which is a formset here)."""
domain = self.get_object()
nameservers=domain.nameservers
if nameservers is None:
return []
return [{"server": name} for name, *ip in domain.nameservers]
def get_success_url(self):