creating / getting accounts

This commit is contained in:
zandercymatics 2024-12-05 09:41:57 -07:00
parent 24feb00327
commit 751ce24378
No known key found for this signature in database
GPG key ID: FF4636ABEC9682B7
5 changed files with 125 additions and 111 deletions

View file

@ -59,6 +59,9 @@ services:
- AWS_S3_BUCKET_NAME - AWS_S3_BUCKET_NAME
# File encryption credentials # File encryption credentials
- SECRET_ENCRYPT_METADATA - SECRET_ENCRYPT_METADATA
- REGISTRY_TENANT_KEY
- REGISTRY_SERVICE_EMAIL
- REGISTRY_TENANT_NAME
stdin_open: true stdin_open: true
tty: true tty: true
ports: ports:

View file

@ -88,7 +88,8 @@ secret_registry_hostname = secret("REGISTRY_HOSTNAME")
# PROTOTYPE: Used for DNS hosting # PROTOTYPE: Used for DNS hosting
secret_registry_tenant_key = secret("REGISTRY_TENANT_KEY", None) secret_registry_tenant_key = secret("REGISTRY_TENANT_KEY", None)
secret_registry_tenant_id = secret("REGISTRY_TENANT_ID", None) secret_registry_tenant_name = secret("REGISTRY_TENANT_NAME", None)
secret_registry_service_email = secret("REGISTRY_SERVICE_EMAIL", None)
# region: Basic Django Config-----------------------------------------------### # region: Basic Django Config-----------------------------------------------###
@ -690,7 +691,8 @@ SECRET_REGISTRY_KEY = secret_registry_key
SECRET_REGISTRY_KEY_PASSPHRASE = secret_registry_key_passphrase SECRET_REGISTRY_KEY_PASSPHRASE = secret_registry_key_passphrase
SECRET_REGISTRY_HOSTNAME = secret_registry_hostname SECRET_REGISTRY_HOSTNAME = secret_registry_hostname
SECRET_REGISTRY_TENANT_KEY = secret_registry_tenant_key SECRET_REGISTRY_TENANT_KEY = secret_registry_tenant_key
SECRET_REGISTRY_TENANT_ID = secret_registry_tenant_id SECRET_REGISTRY_TENANT_NAME = secret_registry_tenant_name
SECRET_REGISTRY_SERVICE_EMAIL = secret_registry_service_email
# endregion # endregion
# region: Security and Privacy----------------------------------------------### # region: Security and Privacy----------------------------------------------###

View file

@ -239,6 +239,7 @@ class Domain(TimeStampedModel, DomainHelper):
is called in the validate function on the request/domain page is called in the validate function on the request/domain page
throws- RegistryError or InvalidDomainError""" throws- RegistryError or InvalidDomainError"""
return True
if not cls.string_could_be_domain(domain): if not cls.string_could_be_domain(domain):
logger.warning("Not a valid domain: %s" % str(domain)) logger.warning("Not a valid domain: %s" % str(domain))
# throw invalid domain error so that it can be caught in # throw invalid domain error so that it can be caught in
@ -308,97 +309,6 @@ class Domain(TimeStampedModel, DomainHelper):
To update the expiration date, use renew_domain method.""" To update the expiration date, use renew_domain method."""
raise NotImplementedError() raise NotImplementedError()
def create_prototype_account(self, base_url, headers, tenant_id):
account_response = requests.post(
f"{base_url}/accounts",
headers=headers,
json={
"name": f"account-{self.name}",
"type": "enterprise",
"unit": {"id": tenant_id}
}
)
account_response.raise_for_status()
account_response_json = account_response.json()
account_id = account_response_json["result"]["id"]
logger.info(f"Created account: {account_response_json}")
return account_id
def create_prototype_zone(self, base_url, headers, account_id):
zone_response = requests.post(
f"{base_url}/zones",
headers=headers,
json={
"name": self.name,
"account": {"id": account_id},
"type": "full"
}
)
zone_response.raise_for_status()
zone_response_json = zone_response.json()
zone_id = zone_response_json["result"]["id"]
logger.info(f"Created zone: {zone_response_json}")
return zone_id
def create_prototype_subscription(self, base_url, headers, zone_id):
subscription_response = requests.post(
f"{base_url}/zones/{zone_id}/subscription",
headers=headers,
json={
"rate_plan": {"id": "PARTNERS_ENT"},
"frequency": "annual"
}
)
subscription_response.raise_for_status()
subscription_response_json = subscription_response.json()
logger.info(f"Created subscription: {subscription_response_json}")
def create_prototype_dns_record(self, dns_record_dict):
print(f"what is the key? {settings.SECRET_REGISTRY_TENANT_KEY}")
# Don't execute this function on any other domain
if settings.IS_PRODUCTION and self.name != "igorville.gov":
logger.warning(f"create_dns_record was called for domain {self.name}")
return None
# Cloudflare API endpoints
base_url = "https://api.cloudflare.com/client/v4"
headers = {
"Authorization": f"Bearer {settings.SECRET_REGISTRY_TENANT_KEY}",
"Content-Type": "application/json"
}
# TODO - check if these things exist before doing stuff
# 1. Get tenant details
# Note: we can grab this more generally but lets be specific to keep things safe.
tenant_id = settings.SECRET_REGISTRY_TENANT_ID
# 2. Create account under tenant
account_id = self.create_prototype_account(base_url, headers, tenant_id)
# 3. Create zone under account
zone_id = self.create_prototype_zone(base_url, headers, account_id)
# 4. Add zone subscription
self.create_prototype_subscription(base_url, headers, zone_id)
# 5. Create DNS record
dns_response = requests.post(
f"{base_url}/zones/{zone_id}/dns_records",
headers=headers,
json=dns_record_dict
)
dns_response.raise_for_status()
dns_response_json = dns_response.json()
logger.info(f"Created DNS record: {dns_response_json}")
return {
"tenant_id": tenant_id,
"account_id": account_id,
"zone_id": zone_id,
"dns_record_id": dns_response_json["result"]["id"]
}
def renew_domain(self, length: int = 1, unit: epp.Unit = epp.Unit.YEAR): def renew_domain(self, length: int = 1, unit: epp.Unit = epp.Unit.YEAR):
""" """
Renew the domain to a length and unit of time relative to the current Renew the domain to a length and unit of time relative to the current

View file

@ -12,6 +12,7 @@
<p> <p>
This is a prototype that demonstrates adding an 'A' record to igorville.gov. This is a prototype that demonstrates adding an 'A' record to igorville.gov.
Do note that this just adds records, but does not update or delete existing ones. Do note that this just adds records, but does not update or delete existing ones.
<strong>You can only use this on igorville.gov, domainops.gov, and dns.gov.</strong>
</p> </p>
<form class="usa-form usa-form--large" method="post" novalidate id="form-container"> <form class="usa-form usa-form--large" method="post" novalidate id="form-container">

View file

@ -7,7 +7,7 @@ inherit from `DomainPermissionView` (or DomainInvitationPermissionCancelView).
from datetime import date from datetime import date
import logging import logging
import requests
from django.contrib import messages from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin from django.contrib.messages.views import SuccessMessageMixin
from django.db import IntegrityError from django.db import IntegrityError
@ -517,27 +517,125 @@ class PrototypeDomainDNSRecordView(DomainFormBaseView):
form = self.get_form() form = self.get_form()
if form.is_valid(): if form.is_valid():
try: try:
# Format the DNS record according to Cloudflare's API requirements if settings.IS_PRODUCTION and self.object.name != "igorville.gov":
dns_record = { raise Exception(f"create dns record was called for domain {self.name}")
"type": "A",
"name": form.cleaned_data["name"],
"content": form.cleaned_data["content"],
"ttl": int(form.cleaned_data["ttl"]),
"comment": f"Test record (will eventually need to clean up)"
}
result = self.object.create_prototype_dns_record(dns_record) valid_domains = ["igorville.gov", "domainops.gov", "dns.gov"]
if not settings.IS_PRODUCTION and self.object.name not in valid_domains:
if result: # Assuming create_prototype_dns_record returns the response data
messages.success(
request,
f"DNS A record '{form.cleaned_data['name']}' created successfully."
)
else:
messages.error( messages.error(
request, request,
"Failed to create DNS A record. Please try again." f"Can only create DNS records for: {valid_domains}."
" Create one in a test environment if it doesn't already exist."
) )
return super().post(request)
base_url = "https://api.cloudflare.com/client/v4"
headers = {
"X-Auth-Email": settings.SECRET_REGISTRY_SERVICE_EMAIL,
"X-Auth-Key": settings.SECRET_REGISTRY_TENANT_KEY,
"Content-Type": "application/json"
}
params = {"tenant_name": settings.SECRET_REGISTRY_TENANT_NAME}
# 1. Get tenant details
tenant_response = requests.get(f"{base_url}/user/tenants", headers=headers, params=params)
tenant_response.raise_for_status()
tenant_response_json = tenant_response.json()
logger.info(f"Found tenant: {tenant_response_json}")
tenant_id = tenant_response_json["result"][0]["tenant_tag"]
# 2. Create account under tenant
# Check to see if the account already exists. Filters accounts by tenant_id.
account_name = f"account-{self.object.name}"
params = {"tenant_id": tenant_id}
account_response = requests.get(f"{base_url}/accounts", headers=headers, params=params)
account_response.raise_for_status()
account_response_json = account_response.json()
print(f"account stuff: {account_response_json}")
# See if we already made an account
account_id = None
accounts = account_response_json.get("result", [])
for account in accounts:
if account.get("name") == account_name:
account_id = account.get("id")
print(f"Found it! Account: {account_name} (ID: {account_id})")
break
# If we didn't, create one
if not account_id:
account_response = requests.post(
f"{base_url}/accounts",
headers=headers,
json={
"name": account_name,
"type": "enterprise",
"unit": {"id": tenant_id}
}
)
account_response.raise_for_status()
account_response_json = account_response.json()
logger.info(f"Created account: {account_response_json}")
account_id = account_response_json["result"]["id"]
# # 3. Create zone under account
# zone_response = requests.post(
# f"{base_url}/zones",
# headers=headers,
# json={
# "name": self.name,
# "account": {"id": account_id},
# "type": "full"
# }
# )
# zone_response.raise_for_status()
# zone_response_json = zone_response.json()
# zone_id = zone_response_json["result"]["id"]
# logger.info(f"Created zone: {zone_response_json}")
# # 4. Add zone subscription
# subscription_response = requests.post(
# f"{base_url}/zones/{zone_id}/subscription",
# headers=headers,
# json={
# "rate_plan": {"id": "PARTNERS_ENT"},
# "frequency": "annual"
# }
# )
# subscription_response.raise_for_status()
# subscription_response_json = subscription_response.json()
# logger.info(f"Created subscription: {subscription_response_json}")
# # 5. Create DNS record
# # Format the DNS record according to Cloudflare's API requirements
# dns_response = requests.post(
# f"{base_url}/zones/{zone_id}/dns_records",
# headers=headers,
# json={
# "type": "A",
# "name": form.cleaned_data["name"],
# "content": form.cleaned_data["content"],
# "ttl": int(form.cleaned_data["ttl"]),
# "comment": "Test record (will need clean up)"
# }
# )
# dns_response.raise_for_status()
# dns_response_json = dns_response.json()
# logger.info(f"Created DNS record: {dns_response_json}")
# if dns_response_json and "name" in dns_response_json:
# messages.success(
# request,
# f"DNS A record '{form.cleaned_data['name']}' created successfully."
# )
# else:
# messages.error(
# request,
# "Failed to create DNS A record. Please try again."
# )
except Exception as err: except Exception as err:
logger.error(f"Error creating DNS A record for {self.object.name}: {err}") logger.error(f"Error creating DNS A record for {self.object.name}: {err}")