ran linter

This commit is contained in:
Alysia Broddrick 2023-09-10 15:43:29 -07:00
parent a9f608e353
commit 6a3a5534db
No known key found for this signature in database
GPG key ID: 03917052CD0F06B7
5 changed files with 116 additions and 111 deletions

View file

@ -146,21 +146,18 @@ class DomainAdmin(ListHeaderAdmin):
readonly_fields = ["state"]
def response_change(self, request, obj):
print(request.POST)
ACTION_BUTTON = "_place_client_hold"
GET_SECURITY_EMAIL = "_get_security_email"
SET_SECURITY_CONTACT = "_set_security_contact"
MAKE_DOMAIN = "_make_domain_in_registry"
MAKE_NAMESERVERS = "_make_nameservers"
GET_NAMESERVERS="_get_nameservers"
GET_NAMESERVERS = "_get_nameservers"
GET_STATUS = "_get_status"
SET_CLIENT_HOLD="_set_client_hold"
REMOVE_CLIENT_HOLD="_rem_client_hold"
DELETE_DOMAIN="_delete_domain"
logger.info("in response")
SET_CLIENT_HOLD = "_set_client_hold"
REMOVE_CLIENT_HOLD = "_rem_client_hold"
DELETE_DOMAIN = "_delete_domain"
if ACTION_BUTTON in request.POST:
logger.info("in action button")
print("in action button")
try:
obj.place_client_hold()
except Exception as err:
@ -178,13 +175,17 @@ class DomainAdmin(ListHeaderAdmin):
if GET_SECURITY_EMAIL in request.POST:
try:
contacts=obj._get_property("contacts")
email=None
contacts = obj._get_property("contacts")
email = None
for contact in contacts:
if ["type","email"] in contact.keys() and contact["type"]=="security":
email=contact["email"]
if ["type", "email"] in contact.keys() and contact[
"type"
] == "security":
email = contact["email"]
if email is None:
raise ValueError("Security contact type is not available on this domain")
raise ValueError(
"Security contact type is not available on this domain"
)
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
@ -196,13 +197,17 @@ class DomainAdmin(ListHeaderAdmin):
if SET_SECURITY_CONTACT in request.POST:
try:
fake_email="manuallyEnteredEmail@test.gov"
if PublicContact.objects.filter(domain=obj, contact_type="security").exists():
sec_contact=PublicContact.objects.filter(domain=obj, contact_type="security").get()
fake_email = "manuallyEnteredEmail@test.gov"
if PublicContact.objects.filter(
domain=obj, contact_type="security"
).exists():
sec_contact = PublicContact.objects.filter(
domain=obj, contact_type="security"
).get()
else:
sec_contact=obj.get_default_security_contact()
sec_contact.email=fake_email
sec_contact = obj.get_default_security_contact()
sec_contact.email = fake_email
sec_contact.save()
except Exception as err:
@ -212,11 +217,8 @@ class DomainAdmin(ListHeaderAdmin):
request,
("The security email is %" ". Thanks!") % fake_email,
)
print("above make domain")
if MAKE_DOMAIN in request.POST:
print("in make domain")
try:
obj._get_or_create_domain()
except Exception as err:
@ -228,14 +230,12 @@ class DomainAdmin(ListHeaderAdmin):
)
return HttpResponseRedirect(".")
#make nameservers here
if MAKE_NAMESERVERS in request.POST:
print("in make domain")
# make nameservers here
if MAKE_NAMESERVERS in request.POST:
try:
hosts=[("ns1.example.com",None),("ns2.example.com",None) ]
obj.nameservers=hosts
hosts = [("ns1.example.com", None), ("ns2.example.com", None)]
obj.nameservers = hosts
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
@ -245,10 +245,8 @@ class DomainAdmin(ListHeaderAdmin):
)
return HttpResponseRedirect(".")
if GET_NAMESERVERS in request.POST:
print("in make domain")
try:
nameservers=obj.nameservers
nameservers = obj.nameservers
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
@ -257,12 +255,10 @@ class DomainAdmin(ListHeaderAdmin):
("Nameservers are %s" ". Thanks!") % nameservers,
)
return HttpResponseRedirect(".")
if GET_STATUS in request.POST:
print("in make domain")
if GET_STATUS in request.POST:
try:
statuses=obj.statuses
statuses = obj.statuses
except Exception as err:
self.message_user(request, err, messages.ERROR)
else:
@ -271,10 +267,8 @@ class DomainAdmin(ListHeaderAdmin):
("Domain statuses are %s" ". Thanks!") % statuses,
)
return HttpResponseRedirect(".")
if SET_CLIENT_HOLD in request.POST:
print("in make domain")
if SET_CLIENT_HOLD in request.POST:
try:
obj.clientHold()
obj.save()
@ -286,10 +280,8 @@ class DomainAdmin(ListHeaderAdmin):
("Domain %s is now in clientHold") % obj.name,
)
return HttpResponseRedirect(".")
if REMOVE_CLIENT_HOLD in request.POST:
print("in make domain")
if REMOVE_CLIENT_HOLD in request.POST:
try:
obj.revertClientHold()
obj.save()
@ -302,8 +294,6 @@ class DomainAdmin(ListHeaderAdmin):
)
return HttpResponseRedirect(".")
if DELETE_DOMAIN in request.POST:
print("in make domain")
try:
obj.deleted()
obj.save()
@ -318,7 +308,6 @@ class DomainAdmin(ListHeaderAdmin):
return super().response_change(request, obj)
class ContactAdmin(ListHeaderAdmin):
"""Custom contact admin class to add search."""

View file

@ -227,10 +227,10 @@ class Domain(TimeStampedModel, DomainHelper):
"""
try:
hosts = self._get_property("hosts")
except KeyError as err:
except Exception as err:
logger.info("Domain is missing nameservers")
return None
hostList = []
for host in hosts:
logger.info(host)
@ -260,10 +260,10 @@ class Domain(TimeStampedModel, DomainHelper):
"""Call _check_host first before using this function,
This creates the host object in the registry
doesn't add the created host to the domain
returns int response code"""
returns ErrorCode (int)"""
logger.info("_create_host()->addresses is NONE")
if not addrs is None and addrs!=[]:
# TODO - # [epp.Ip(addr="127.0.0.1"), epp.Ip(addr="0:0:0:0:0:0:0:1", ip="v6")]
if not addrs is None:
logger.info("addresses is not None %s" % addrs)
addresses = [epp.Ip(addr=addr) for addr in addrs]
request = commands.CreateHost(name=host, addrs=addresses)
@ -271,7 +271,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("_create_host()-> address IS None")
request = commands.CreateHost(name=host)
# [epp.Ip(addr="127.0.0.1"), epp.Ip(addr="0:0:0:0:0:0:0:1", ip="v6")]
try:
logger.info("_create_host()-> sending req as %s" % request)
response = registry.send(request, cleaned=True)
@ -287,7 +287,7 @@ class Domain(TimeStampedModel, DomainHelper):
example: [(ns1.okay.gov, 127.0.0.1, others ips)]"""
# TODO: call EPP to set this info.
# if two nameservers change state to created, don't do it automatically
hostSuccessCount = 0
if len(hosts) > 13:
raise ValueError(
"Too many hosts provided, you may not have more than 13 nameservers."
@ -303,10 +303,9 @@ class Domain(TimeStampedModel, DomainHelper):
avail = self._check_host([host])
if avail:
createdCode = self._create_host(host=host, addrs=addrs)
if createdCode == ErrorCode.OBJECT_EXISTS:
hostSuccessCount += 1
# update the object instead
elif createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
# update the domain obj
if createdCode == ErrorCode.COMMAND_COMPLETED_SUCCESSFULLY:
# add host to domain
request = commands.UpdateDomain(
name=self.name, add=[epp.HostObjSet([host])]
@ -314,16 +313,19 @@ class Domain(TimeStampedModel, DomainHelper):
try:
registry.send(request, cleaned=True)
hostSuccessCount += 1
except RegistryError as e:
logger.error(
"Error adding nameserver, code was %s error was %s"
% (e.code, e)
)
if self.state == self.State.DNS_NEEDED and hostSuccessCount >= 2:
try:
self.created()
self.save()
except:
logger.info(
"nameserver setter checked for create state and it did not succeed"
)
##TODO - handle removed nameservers here will need to change the state go back to DNS_NEEDED
@Cache
@ -592,7 +594,7 @@ class Domain(TimeStampedModel, DomainHelper):
# remove the old contact and add a new one
try:
self._update_domain_with_contact(contact=existing_contact, rem=True)
print("deleting %s "%existing_contact)
print("deleting %s " % existing_contact)
existing_contact.delete()
print("after deleting")
if isEmptySecurity:
@ -633,6 +635,7 @@ class Domain(TimeStampedModel, DomainHelper):
security_contact = self.get_default_security_contact()
security_contact.save()
logger.info("done with contacts")
@security_contact.setter # type: ignore
def security_contact(self, contact: PublicContact):
"""makes the contact in the registry,
@ -677,26 +680,26 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("get_security_email-> getting the contact ")
secContact = self.security_contact
return secContact.email
def clientHoldStatus(self):
return epp.Status(state=self.Status.ON_HOLD, description="", lang="en")
return epp.Status(state=self.Status.ON_HOLD, description="", lang="en")
def _place_client_hold(self):
"""This domain should not be active.
may raises RegistryError, should be caught or handled correctly by caller """
request=commands.UpdateDomain(name=self.name,add=[self.clientHoldStatus()])
may raises RegistryError, should be caught or handled correctly by caller"""
request = commands.UpdateDomain(name=self.name, add=[self.clientHoldStatus()])
registry.send(request)
def _remove_client_hold(self):
"""This domain is okay to be active.
may raises RegistryError, should be caught or handled correctly by caller"""
request=commands.UpdateDomain(name=self.name,rem=[self.clientHoldStatus() ])
request = commands.UpdateDomain(name=self.name, rem=[self.clientHoldStatus()])
registry.send(request)
def _delete_domain(self):
"""This domain should be deleted from the registry
may raises RegistryError, should be caught or handled correctly by caller"""
request=commands.DeleteDomain(name=self.name)
request = commands.DeleteDomain(name=self.name)
registry.send(request)
def __str__(self) -> str:
@ -758,7 +761,7 @@ class Domain(TimeStampedModel, DomainHelper):
def _get_or_create_domain(self):
"""Try to fetch info about this domain. Create it if it does not exist."""
already_tried_to_create = False
exitEarly=False
exitEarly = False
count = 0
while not exitEarly and count < 3:
try:
@ -768,7 +771,7 @@ class Domain(TimeStampedModel, DomainHelper):
req = commands.InfoDomain(name=self.name)
domainInfo = registry.send(req, cleaned=True).res_data[0]
exitEarly=True
exitEarly = True
return domainInfo
except RegistryError as e:
count += 1
@ -787,18 +790,19 @@ class Domain(TimeStampedModel, DomainHelper):
logger.error(e)
logger.error(e.code)
raise e
def addRegistrant(self):
def addRegistrant(self):
registrant = PublicContact.get_default_registrant()
registrant.domain = self
registrant.save() ##calls the registrant_contact.setter
logger.info("registrant is %s" % registrant)
return registrant.registry_id
@transition(field="state", source=State.UNKNOWN, target=State.DNS_NEEDED)
def pendingCreate(self):
logger.info("In make domain in registry ")
registrantID=self.addRegistrant()
registrantID = self.addRegistrant()
# TODO-notes no chg item for registrant in the epplib should
@ -856,7 +860,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info("clientHold()-> inside clientHold")
self._place_client_hold()
# TODO -on the client hold ticket any additional error handling here
@transition(field="state", source=State.ON_HOLD, target=State.DNS_NEEDED)
def revertClientHold(self):
##TODO - check to see if client hold is allowed should happen outside of this function
@ -877,14 +881,18 @@ class Domain(TimeStampedModel, DomainHelper):
target=State.READY,
)
def created(self):
nameserverList = self.nameservers
logger.info("created()-> inside setting create")
if len(nameserverList) < 2 or len(nameserverList) > 13:
raise ValueError("Not ready to become created, cannot transition yet")
logger.info("able to transition to created state")
# TODO - in nameservers ticket check if has everything for creation
#admin, tech and security
#2 or more (but less than 13) nameservers
#if any of the above is violated raise the user raise a human readable error
#not there is another ticket for error handling keep a todo here if
#user friendly error portion is postponed
# admin, tech and security
# 2 or more (but less than 13) nameservers
# if any of the above is violated raise the user raise a human readable error
# not there is another ticket for error handling keep a todo here if
# user friendly error portion is postponed
def _disclose_fields(self, contact: PublicContact):
"""creates a disclose object that can be added to a contact Create using
@ -1006,7 +1014,7 @@ class Domain(TimeStampedModel, DomainHelper):
def _delete_host(self, host):
raise NotImplementedError()
def _fetch_cache(self, fetch_hosts=False, fetch_contacts=False):
"""Contact registry for info about a domain."""
try:
@ -1032,9 +1040,9 @@ class Domain(TimeStampedModel, DomainHelper):
cleaned = {k: v for k, v in cache.items() if v is not ...}
logger.info("_fetch_cache()-> cleaned is " + str(cleaned))
#statuses can just be a list no need to keep the epp object
# statuses can just be a list no need to keep the epp object
if "statuses" in cleaned.keys():
cleaned["statuses"]=[status.state for status in cleaned["statuses"]]
cleaned["statuses"] = [status.state for status in cleaned["statuses"]]
# get contact info, if there are any
if (
# fetch_contacts and
@ -1048,8 +1056,8 @@ class Domain(TimeStampedModel, DomainHelper):
# just asked the registry for still exists --
# if not, that's a problem
#TODO- discuss-should we check if contact is in public contacts
#and add it if not- this is really to keep in mine the transisiton
# TODO- discuss-should we check if contact is in public contacts
# and add it if not- this is really to keep in mine the transisiton
req = commands.InfoContact(id=domainContact.contact)
data = registry.send(req, cleaned=True).res_data[0]
@ -1059,7 +1067,7 @@ class Domain(TimeStampedModel, DomainHelper):
logger.info(data)
contact = {
"id": id,
"type":domainContact.type,
"type": domainContact.type,
"auth_info": getattr(data, "auth_info", ...),
"cr_date": getattr(data, "cr_date", ...),
"disclose": getattr(data, "disclose", ...),
@ -1071,7 +1079,7 @@ class Domain(TimeStampedModel, DomainHelper):
"up_date": getattr(data, "up_date", ...),
"voice": getattr(data, "voice", ...),
}
cleaned["contacts"].append(
{k: v for k, v in contact.items() if v is not ...}
)

View file

@ -9,9 +9,9 @@
<input type="submit" value="add nameservers" name="_make_nameservers">
<input type="submit" value="get nameservers" name="_get_nameservers">
<input type="submit" value="get status" name="_get_status">
<input type="submit" value="add nameservers" name="_set_client_hold">
<input type="submit" value="get nameservers" name="_rem_client_hold">
<input type="submit" value="get status" name="_delete_domain">
<input type="submit" value="Actual Client Hold Set" name="_set_client_hold">
<input type="submit" value="remove Client Hold" name="_rem_client_hold">
<input type="submit" value="EPP Delete Domain" name="_delete_domain">
</div>
{{ block.super }}
{% endblock %}

View file

@ -31,7 +31,7 @@ class MockEppLib(TestCase):
mockDataInfoDomain = fakedEppObject(
"fakepw",
cr_date=datetime.datetime(2023, 5, 25, 19, 45, 35),
contacts=["123"],
contacts=[common.DomainContact(contact="123", type="security")],
hosts=["fake.host.com"],
)
infoDomainNoContact = fakedEppObject(
@ -72,8 +72,10 @@ class MockEppLib(TestCase):
self.mockSendPatch = patch("registrar.models.domain.registry.send")
self.mockedSendFunction = self.mockSendPatch.start()
self.mockedSendFunction.side_effect = self.mockSend
def _convertPublicContactToEpp(self, contact: PublicContact, disclose_email=False, createContact=True):
def _convertPublicContactToEpp(
self, contact: PublicContact, disclose_email=False, createContact=True
):
DF = common.DiscloseField
fields = {DF.FAX, DF.VOICE, DF.ADDR}
@ -120,12 +122,12 @@ class MockEppLib(TestCase):
)
else:
return commands.UpdateContact(
id=contact.registry_id,
postal_info=pi,
email=contact.email,
voice=contact.voice,
fax=contact.fax,
)
id=contact.registry_id,
postal_info=pi,
email=contact.email,
voice=contact.voice,
fax=contact.fax,
)
def tearDown(self):
self.mockSendPatch.stop()
@ -174,6 +176,7 @@ class TestDomainCache(MockEppLib):
# send was only called once & not on the second getter call
self.mockedSendFunction.assert_called_once()
# @skip("BROKEN by newest changes-fix in getter ticket")
def test_cache_nested_elements(self):
"""Cache works correctly with the nested objects cache and hosts"""
domain, _ = Domain.objects.get_or_create(name="igorville.gov")
@ -276,8 +279,8 @@ class TestDomainCreation(TestCase):
self.assertIn("ok", domain.status)
def tearDown(self) -> None:
Domain.objects.delete()
# User.objects.delete()
Domain.objects.filter(name="igorville.gov").delete()
Domain.objects.all().delete()
class TestRegistrantContacts(MockEppLib):
@ -484,9 +487,11 @@ class TestRegistrantContacts(MockEppLib):
new_contact = self.domain.get_default_security_contact()
new_contact.registry_id = "fail"
new_contact.email = ""
self.domain.security_contact=new_contact
self.domain.security_contact = new_contact
print("old contact %s email is %s" % (str(old_contact), str(old_contact.email)))
print(
"old contact %s email is %s" % (str(old_contact), str(old_contact.email))
)
print("new contact %s " % new_contact)
firstCreateContactCall = self._convertPublicContactToEpp(
old_contact, disclose_email=True
@ -497,9 +502,9 @@ class TestRegistrantContacts(MockEppLib):
common.DomainContact(contact=old_contact.registry_id, type="security")
],
)
print( PublicContact.objects.filter(domain=self.domain))
print(PublicContact.objects.filter(domain=self.domain))
print("just printed the objects for public contact!!")
assert (
PublicContact.objects.filter(domain=self.domain).get().email
== PublicContact.get_default_security().email
@ -550,7 +555,6 @@ class TestRegistrantContacts(MockEppLib):
print(expected_calls)
self.mockedSendFunction.assert_has_calls(expected_calls, any_order=True)
def test_updates_security_email(self):
"""
Scenario: Registrant replaces one valid security contact email with another
@ -560,13 +564,13 @@ class TestRegistrantContacts(MockEppLib):
Then Domain sends `commands.UpdateContact` to the registry
"""
security_contact = self.domain.get_default_security_contact()
security_contact.email="originalUserEmail@gmail.com"
security_contact.email = "originalUserEmail@gmail.com"
security_contact.registry_id = "fail"
security_contact.save()
expectedCreateCommand = self._convertPublicContactToEpp(
security_contact, disclose_email=True
)
print(expectedCreateCommand)
expectedUpdateDomain = commands.UpdateDomain(
name=self.domain.name,
add=[
@ -575,24 +579,28 @@ class TestRegistrantContacts(MockEppLib):
)
],
)
security_contact.email="changedEmail@email.com"
security_contact.email = "changedEmail@email.com"
print("\n\n\n***********\n\n")
security_contact.save()
expectedSecondCreateCommand = self._convertPublicContactToEpp(
security_contact, disclose_email=True
)
updateContact=self._convertPublicContactToEpp(security_contact,disclose_email=True,createContact=False)
updateContact = self._convertPublicContactToEpp(
security_contact, disclose_email=True, createContact=False
)
print("SECOND EXPECTED CREATE")
print(expectedSecondCreateCommand)
print(self.mockedSendFunction.call_args_list)
expected_calls = [
call(expectedCreateCommand, cleaned=True),
call(expectedUpdateDomain, cleaned=True),
call(expectedSecondCreateCommand,cleaned=True),
call(expectedSecondCreateCommand, cleaned=True),
call(updateContact, cleaned=True),
]
self.mockedSendFunction.assert_has_calls(expected_calls, any_order=True)
assert PublicContact.objects.filter(domain=self.domain).count() == 1
@skip("not implemented yet")
def test_update_is_unsuccessful(self):

View file

@ -135,10 +135,10 @@ class DomainNameserversView(DomainPermissionView, FormMixin):
def get_initial(self):
"""The initial value for the form (which is a formset here)."""
domain = self.get_object()
nameservers=domain.nameservers
nameservers = domain.nameservers
if nameservers is None:
return []
return [{"server": name} for name, *ip in domain.nameservers]
def get_success_url(self):