Merge pull request #1419 from cisagov/za/1403-add-aws-bucket

Ticket #1075/#1403: Add AWS Bucket for generating current-full.csv and current-federal.csv
This commit is contained in:
zandercymatics 2023-12-05 12:41:58 -07:00 committed by GitHub
commit 23e94c7606
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 618 additions and 7 deletions

33
.github/workflows/daily-csv-upload.yaml vendored Normal file
View file

@ -0,0 +1,33 @@
name: Upload current-full.csv and current-federal.csv
run-name: Upload current-full.csv and current-federal.csv
on:
schedule:
# Runs every day at 5 AM UTC.
- cron: "0 5 * * *"
jobs:
upload-reports:
runs-on: ubuntu-latest
env:
CF_USERNAME: CF_${{ secrets.CF_REPORT_ENV }}_USERNAME
CF_PASSWORD: CF_${{ secrets.CF_REPORT_ENV }}_PASSWORD
steps:
- name: Generate current-federal.csv
uses: cloud-gov/cg-cli-tools@main
with:
cf_username: ${{ secrets[env.CF_USERNAME] }}
cf_password: ${{ secrets[env.CF_PASSWORD] }}
cf_org: cisa-dotgov
cf_space: ${{ secrets.CF_REPORT_ENV }}
cf_command: "run-task getgov-${{ secrets.CF_REPORT_ENV }} --command 'python manage.py generate_current_federal_report' --name federal"
- name: Generate current-full.csv
uses: cloud-gov/cg-cli-tools@main
with:
cf_username: ${{ secrets[env.CF_USERNAME] }}
cf_password: ${{ secrets[env.CF_PASSWORD] }}
cf_org: cisa-dotgov
cf_space: ${{ secrets.CF_REPORT_ENV }}
cf_command: "run-task getgov-${{ secrets.CF_REPORT_ENV }} --command 'python manage.py generate_current_full_report' --name full"

View file

@ -295,7 +295,7 @@ sudo sntp -sS time.nist.gov
```
## Connection pool
To handle our connection to the registry, we utilize a connection pool to keep a socket open to increase responsiveness. In order to accomplish this, we are utilizing a heavily modified version of the (geventconnpool)[https://github.com/rasky/geventconnpool] library.
To handle our connection to the registry, we utilize a connection pool to keep a socket open to increase responsiveness. In order to accomplish this, we are utilizing a heavily modified version of the [geventconnpool](https://github.com/rasky/geventconnpool) library.
### Settings
The config for the connection pool exists inside the `settings.py` file.
@ -319,4 +319,36 @@ Our connection pool has a built-in `pool_status` object which you can call at an
5. `print(registry.pool_status.connection_success)`
* Should return true
If you have multiple instances (staging for example), then repeat commands 1-5 for each instance you want to test.
If you have multiple instances (staging for example), then repeat commands 1-5 for each instance you want to test.
## Adding a S3 instance to your sandbox
This can either be done through the CLI, or through the cloud.gov dashboard. Generally, it is better to do it through the dashboard as it handles app binding for you.
To associate a S3 instance to your sandbox, follow these steps:
1. Navigate to https://dashboard.fr.cloud.gov/login
2. Select your sandbox from the `Applications` tab
3. Click `Services` on the application nav bar
4. Add a new service (plus symbol)
5. Click `Marketplace Service`
6. On the `Select the service` dropdown, select `s3`
7. Under the dropdown on `Select Plan`, select `basic-sandbox`
8. Under `Service Instance` enter `getgov-s3` for the name
See this [resource](https://cloud.gov/docs/services/s3/) for information on associating an S3 instance with your sandbox through the CLI.
### Testing your S3 instance locally
To test the S3 bucket associated with your sandbox, you will need to add four additional variables to your `.env` file. These are as follows:
```
AWS_S3_ACCESS_KEY_ID = "{string value of `access_key_id` in getgov-s3}"
AWS_S3_SECRET_ACCESS_KEY = "{string value of `secret_access_key` in getgov-s3}"
AWS_S3_REGION = "{string value of `region` in getgov-s3}"
AWS_S3_BUCKET_NAME = "{string value of `bucket` in getgov-s3}"
```
You can view these variables by running the following command:
```
cf env getgov-{app name}
```
Then, copy the variables under the section labled `s3`.

View file

@ -1,7 +1,7 @@
"""Internal API views"""
from django.apps import apps
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from django.http import HttpResponse, JsonResponse
from django.utils.safestring import mark_safe
from registrar.templatetags.url_helpers import public_site_url
@ -13,6 +13,8 @@ from login_required import login_not_required
from cachetools.func import ttl_cache
from registrar.utility.s3_bucket import S3ClientError, S3ClientHelper
DOMAIN_FILE_URL = "https://raw.githubusercontent.com/cisagov/dotgov-data/main/current-full.csv"
@ -95,3 +97,36 @@ def available(request, domain=""):
return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["unavailable"]})
except Exception:
return JsonResponse({"available": False, "message": DOMAIN_API_MESSAGES["error"]})
@require_http_methods(["GET"])
@login_not_required
def get_current_full(request, file_name="current-full.csv"):
"""This will return the file content of current-full.csv which is the command
output of generate_current_full_report.py. This command iterates through each Domain
and returns a CSV representation."""
return serve_file(file_name)
@require_http_methods(["GET"])
@login_not_required
def get_current_federal(request, file_name="current-federal.csv"):
"""This will return the file content of current-federal.csv which is the command
output of generate_current_federal_report.py. This command iterates through each Domain
and returns a CSV representation."""
return serve_file(file_name)
def serve_file(file_name):
"""Downloads a file based on a given filepath. Returns a 500 if not found."""
s3_client = S3ClientHelper()
# Serve the CSV file. If not found, an exception will be thrown.
# This will then be caught by flat, causing it to not read it - which is what we want.
try:
file = s3_client.get_file(file_name, decode_to_utf=True)
except S3ClientError as err:
# TODO - #1317: Notify operations when auto report generation fails
raise err
response = HttpResponse(file)
return response

View file

@ -51,6 +51,11 @@ services:
# AWS credentials
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
# AWS S3 bucket credentials
- AWS_S3_ACCESS_KEY_ID
- AWS_S3_SECRET_ACCESS_KEY
- AWS_S3_REGION
- AWS_S3_BUCKET_NAME
stdin_open: true
tty: true
ports:

View file

@ -33,11 +33,20 @@ env = environs.Env()
# Get secrets from Cloud.gov user provided service, if exists
# If not, get secrets from environment variables
key_service = AppEnv().get_service(name="getgov-credentials")
# Get secrets from Cloud.gov user provided s3 service, if it exists
s3_key_service = AppEnv().get_service(name="getgov-s3")
if key_service and key_service.credentials:
if s3_key_service and s3_key_service.credentials:
# Concatenate the credentials from our S3 service into our secret service
key_service.credentials.update(s3_key_service.credentials)
secret = key_service.credentials.get
else:
secret = env
# # # ###
# Values obtained externally #
# # # ###
@ -58,6 +67,12 @@ secret_key = secret("DJANGO_SECRET_KEY")
secret_aws_ses_key_id = secret("AWS_ACCESS_KEY_ID", None)
secret_aws_ses_key = secret("AWS_SECRET_ACCESS_KEY", None)
# These keys are present in a getgov-s3 instance, or they can be defined locally
aws_s3_region_name = secret("region", None) or secret("AWS_S3_REGION", None)
secret_aws_s3_key_id = secret("access_key_id", None) or secret("AWS_S3_ACCESS_KEY_ID", None)
secret_aws_s3_key = secret("secret_access_key", None) or secret("AWS_S3_SECRET_ACCESS_KEY", None)
secret_aws_s3_bucket_name = secret("bucket", None) or secret("AWS_S3_BUCKET_NAME", None)
secret_registry_cl_id = secret("REGISTRY_CL_ID")
secret_registry_password = secret("REGISTRY_PASSWORD")
secret_registry_cert = b64decode(secret("REGISTRY_CERT", ""))
@ -257,7 +272,14 @@ AUTH_USER_MODEL = "registrar.User"
AWS_ACCESS_KEY_ID = secret_aws_ses_key_id
AWS_SECRET_ACCESS_KEY = secret_aws_ses_key
AWS_REGION = "us-gov-west-1"
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#standard-retry-mode
# Configuration for accessing AWS S3
AWS_S3_ACCESS_KEY_ID = secret_aws_s3_key_id
AWS_S3_SECRET_ACCESS_KEY = secret_aws_s3_key
AWS_S3_REGION = aws_s3_region_name
AWS_S3_BUCKET_NAME = secret_aws_s3_bucket_name
# https://boto3.amazonaws.com/v1/documentation/latest/guide/retries.html#standard-retry-mode
AWS_RETRY_MODE: Final = "standard"
# base 2 exponential backoff with max of 20 seconds:
AWS_MAX_ATTEMPTS = 3

View file

@ -11,7 +11,8 @@ from django.views.generic import RedirectView
from registrar import views
from registrar.views.application import Step
from registrar.views.utility import always_404
from api.views import available
from api.views import available, get_current_federal, get_current_full
APPLICATION_NAMESPACE = views.ApplicationWizard.URL_NAMESPACE
application_urls = [
@ -73,6 +74,8 @@ urlpatterns = [
path("openid/", include("djangooidc.urls")),
path("register/", include((application_urls, APPLICATION_NAMESPACE))),
path("api/v1/available/<domain>", available, name="available"),
path("api/v1/get-report/current-federal", get_current_federal, name="get-current-federal"),
path("api/v1/get-report/current-full", get_current_full, name="get-current-full"),
path(
"todo",
lambda r: always_404(r, "We forgot to include this link, sorry."),

View file

@ -0,0 +1,58 @@
"""Generates current-full.csv and current-federal.csv then uploads them to the desired URL."""
import logging
import os
from django.core.management import BaseCommand
from registrar.utility import csv_export
from registrar.utility.s3_bucket import S3ClientHelper
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = (
"Generates and uploads a current-federal.csv file to our S3 bucket "
"which is based off of all existing federal Domains."
)
def add_arguments(self, parser):
"""Add our two filename arguments."""
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
parser.add_argument(
"--checkpath",
default=True,
help="Flag that determines if we do a check for os.path.exists. Used for test cases",
)
def handle(self, **options):
"""Grabs the directory then creates current-federal.csv in that directory"""
file_name = "current-federal.csv"
# Ensures a slash is added
directory = os.path.join(options.get("directory"), "")
check_path = options.get("checkpath")
logger.info("Generating report...")
try:
self.generate_current_federal_report(directory, file_name, check_path)
except Exception as err:
# TODO - #1317: Notify operations when auto report generation fails
raise err
else:
logger.info(f"Success! Created {file_name}")
def generate_current_federal_report(self, directory, file_name, check_path):
"""Creates a current-full.csv file under the specified directory,
then uploads it to a AWS S3 bucket"""
s3_client = S3ClientHelper()
file_path = os.path.join(directory, file_name)
# Generate a file locally for upload
with open(file_path, "w") as file:
csv_export.export_data_federal_to_csv(file)
if check_path and not os.path.exists(file_path):
raise FileNotFoundError(f"Could not find newly created file at '{file_path}'")
# Upload this generated file for our S3 instance
s3_client.upload_file(file_path, file_name)

View file

@ -0,0 +1,57 @@
"""Generates current-full.csv and current-federal.csv then uploads them to the desired URL."""
import logging
import os
from django.core.management import BaseCommand
from registrar.utility import csv_export
from registrar.utility.s3_bucket import S3ClientHelper
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = (
"Generates and uploads a current-full.csv file to our S3 bucket " "which is based off of all existing Domains."
)
def add_arguments(self, parser):
"""Add our two filename arguments."""
parser.add_argument("--directory", default="migrationdata", help="Desired directory")
parser.add_argument(
"--checkpath",
default=True,
help="Flag that determines if we do a check for os.path.exists. Used for test cases",
)
def handle(self, **options):
"""Grabs the directory then creates current-full.csv in that directory"""
file_name = "current-full.csv"
# Ensures a slash is added
directory = os.path.join(options.get("directory"), "")
check_path = options.get("checkpath")
logger.info("Generating report...")
try:
self.generate_current_full_report(directory, file_name, check_path)
except Exception as err:
# TODO - #1317: Notify operations when auto report generation fails
raise err
else:
logger.info(f"Success! Created {file_name}")
def generate_current_full_report(self, directory, file_name, check_path):
"""Creates a current-full.csv file under the specified directory,
then uploads it to a AWS S3 bucket"""
s3_client = S3ClientHelper()
file_path = os.path.join(directory, file_name)
# Generate a file locally for upload
with open(file_path, "w") as file:
csv_export.export_data_full_to_csv(file)
if check_path and not os.path.exists(file_path):
raise FileNotFoundError(f"Could not find newly created file at '{file_path}'")
# Upload this generated file for our S3 instance
s3_client.upload_file(file_path, file_name)

View file

@ -0,0 +1,3 @@
Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email
cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,
ddomain3.gov,Federal,Armed Forces Retirement Home,,,,
1 Domain name Domain type Agency Organization name City State Security Contact Email
2 cdomain1.gov Federal - Executive World War I Centennial Commission
3 ddomain3.gov Federal Armed Forces Retirement Home

View file

@ -0,0 +1,4 @@
Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email
cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,
ddomain3.gov,Federal,Armed Forces Retirement Home,,,,
adomain2.gov,Interstate,,,,,
1 Domain name Domain type Agency Organization name City State Security Contact Email
2 cdomain1.gov Federal - Executive World War I Centennial Commission
3 ddomain3.gov Federal Armed Forces Retirement Home
4 adomain2.gov Interstate

View file

@ -1,11 +1,219 @@
from django.test import TestCase
from io import StringIO
import csv
import io
from django.test import Client, RequestFactory, TestCase
from io import StringIO
from registrar.models.domain_information import DomainInformation
from registrar.models.domain import Domain
from registrar.models.user import User
from django.contrib.auth import get_user_model
from registrar.utility.csv_export import export_domains_to_writer
from django.core.management import call_command
from unittest.mock import MagicMock, call, mock_open, patch
from api.views import get_current_federal, get_current_full
from django.conf import settings
from botocore.exceptions import ClientError
import boto3_mocking
from registrar.utility.s3_bucket import S3ClientError, S3ClientErrorCodes # type: ignore
class CsvReportsTest(TestCase):
"""Tests to determine if we are uploading our reports correctly"""
def setUp(self):
"""Create fake domain data"""
self.client = Client(HTTP_HOST="localhost:8080")
self.factory = RequestFactory()
username = "test_user"
first_name = "First"
last_name = "Last"
email = "info@example.com"
self.user = get_user_model().objects.create(
username=username, first_name=first_name, last_name=last_name, email=email
)
self.domain_1, _ = Domain.objects.get_or_create(name="cdomain1.gov", state=Domain.State.READY)
self.domain_2, _ = Domain.objects.get_or_create(name="adomain2.gov", state=Domain.State.DNS_NEEDED)
self.domain_3, _ = Domain.objects.get_or_create(name="ddomain3.gov", state=Domain.State.ON_HOLD)
self.domain_4, _ = Domain.objects.get_or_create(name="bdomain4.gov", state=Domain.State.UNKNOWN)
self.domain_4, _ = Domain.objects.get_or_create(name="bdomain4.gov", state=Domain.State.UNKNOWN)
self.domain_information_1, _ = DomainInformation.objects.get_or_create(
creator=self.user,
domain=self.domain_1,
organization_type="federal",
federal_agency="World War I Centennial Commission",
federal_type="executive",
)
self.domain_information_2, _ = DomainInformation.objects.get_or_create(
creator=self.user,
domain=self.domain_2,
organization_type="interstate",
)
self.domain_information_3, _ = DomainInformation.objects.get_or_create(
creator=self.user,
domain=self.domain_3,
organization_type="federal",
federal_agency="Armed Forces Retirement Home",
)
self.domain_information_4, _ = DomainInformation.objects.get_or_create(
creator=self.user,
domain=self.domain_4,
organization_type="federal",
federal_agency="Armed Forces Retirement Home",
)
def tearDown(self):
"""Delete all faked data"""
Domain.objects.all().delete()
DomainInformation.objects.all().delete()
User.objects.all().delete()
super().tearDown()
@boto3_mocking.patching
def test_generate_federal_report(self):
"""Ensures that we correctly generate current-federal.csv"""
mock_client = MagicMock()
fake_open = mock_open()
expected_file_content = [
call("Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email\r\n"),
call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"),
call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,, \r\n"),
]
# We don't actually want to write anything for a test case,
# we just want to verify what is being written.
with boto3_mocking.clients.handler_for("s3", mock_client):
with patch("builtins.open", fake_open):
call_command("generate_current_federal_report", checkpath=False)
content = fake_open()
content.write.assert_has_calls(expected_file_content)
@boto3_mocking.patching
def test_generate_full_report(self):
"""Ensures that we correctly generate current-full.csv"""
mock_client = MagicMock()
fake_open = mock_open()
expected_file_content = [
call("Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email\r\n"),
call("cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,, \r\n"),
call("ddomain3.gov,Federal,Armed Forces Retirement Home,,,, \r\n"),
call("adomain2.gov,Interstate,,,,, \r\n"),
]
# We don't actually want to write anything for a test case,
# we just want to verify what is being written.
with boto3_mocking.clients.handler_for("s3", mock_client):
with patch("builtins.open", fake_open):
call_command("generate_current_full_report", checkpath=False)
content = fake_open()
content.write.assert_has_calls(expected_file_content)
@boto3_mocking.patching
def test_not_found_full_report(self):
"""Ensures that we get a not found when the report doesn't exist"""
def side_effect(Bucket, Key):
raise ClientError({"Error": {"Code": "NoSuchKey", "Message": "No such key"}}, "get_object")
mock_client = MagicMock()
mock_client.get_object.side_effect = side_effect
response = None
with boto3_mocking.clients.handler_for("s3", mock_client):
with patch("boto3.client", return_value=mock_client):
with self.assertRaises(S3ClientError) as context:
response = self.client.get("/api/v1/get-report/current-full")
# Check that the response has status code 500
self.assertEqual(response.status_code, 500)
# Check that we get the right error back from the page
self.assertEqual(context.exception.code, S3ClientErrorCodes.FILE_NOT_FOUND_ERROR)
@boto3_mocking.patching
def test_not_found_federal_report(self):
"""Ensures that we get a not found when the report doesn't exist"""
def side_effect(Bucket, Key):
raise ClientError({"Error": {"Code": "NoSuchKey", "Message": "No such key"}}, "get_object")
mock_client = MagicMock()
mock_client.get_object.side_effect = side_effect
with boto3_mocking.clients.handler_for("s3", mock_client):
with patch("boto3.client", return_value=mock_client):
with self.assertRaises(S3ClientError) as context:
response = self.client.get("/api/v1/get-report/current-federal")
# Check that the response has status code 500
self.assertEqual(response.status_code, 500)
# Check that we get the right error back from the page
self.assertEqual(context.exception.code, S3ClientErrorCodes.FILE_NOT_FOUND_ERROR)
@boto3_mocking.patching
def test_load_federal_report(self):
"""Tests the get_current_federal api endpoint"""
self.maxDiff = None
mock_client = MagicMock()
mock_client_instance = mock_client.return_value
with open("registrar/tests/data/fake_current_federal.csv", "r") as file:
file_content = file.read()
# Mock a recieved file
mock_client_instance.get_object.return_value = {"Body": io.BytesIO(file_content.encode())}
with boto3_mocking.clients.handler_for("s3", mock_client):
request = self.factory.get("/fake-path")
response = get_current_federal(request)
# Check that we are sending the correct calls.
# Ensures that we are decoding the file content recieved from AWS.
expected_call = [call.get_object(Bucket=settings.AWS_S3_BUCKET_NAME, Key="current-federal.csv")]
mock_client_instance.assert_has_calls(expected_call)
# Check that the response has status code 200
self.assertEqual(response.status_code, 200)
# Check that the response contains what we expect
expected_file_content = (
"Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email\n"
"cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,\n"
"ddomain3.gov,Federal,Armed Forces Retirement Home,,,,"
).encode()
self.assertEqual(expected_file_content, response.content)
@boto3_mocking.patching
def test_load_full_report(self):
"""Tests the current-federal api link"""
mock_client = MagicMock()
mock_client_instance = mock_client.return_value
with open("registrar/tests/data/fake_current_full.csv", "r") as file:
file_content = file.read()
# Mock a recieved file
mock_client_instance.get_object.return_value = {"Body": io.BytesIO(file_content.encode())}
with boto3_mocking.clients.handler_for("s3", mock_client):
request = self.factory.get("/fake-path")
response = get_current_full(request)
# Check that we are sending the correct calls.
# Ensures that we are decoding the file content recieved from AWS.
expected_call = [call.get_object(Bucket=settings.AWS_S3_BUCKET_NAME, Key="current-full.csv")]
mock_client_instance.assert_has_calls(expected_call)
# Check that the response has status code 200
self.assertEqual(response.status_code, 200)
# Check that the response contains what we expect
expected_file_content = (
"Domain name,Domain type,Agency,Organization name,City,State,Security Contact Email\n"
"cdomain1.gov,Federal - Executive,World War I Centennial Commission,,,,\n"
"ddomain3.gov,Federal,Armed Forces Retirement Home,,,,\n"
"adomain2.gov,Interstate,,,,,"
).encode()
self.assertEqual(expected_file_content, response.content)
class ExportDataTest(TestCase):

View file

@ -111,6 +111,8 @@ class TestURLAuth(TestCase):
"/openid/callback/login/",
"/openid/callback/logout/",
"/api/v1/available/whitehouse.gov",
"/api/v1/get-report/current-federal",
"/api/v1/get-report/current-full",
]
def assertURLIsProtectedByAuth(self, url):

View file

@ -0,0 +1,149 @@
"""Utilities for accessing an AWS S3 bucket"""
from enum import IntEnum
import boto3
from botocore.exceptions import ClientError
from django.conf import settings
class S3ClientErrorCodes(IntEnum):
"""Used for S3ClientError
Error code overview:
- 1 ACCESS_S3_CLIENT_ERROR
- 2 UPLOAD_FILE_ERROR
- 3 FILE_NOT_FOUND_ERROR
- 4 GET_FILE_ERROR
"""
ACCESS_S3_CLIENT_ERROR = 1
UPLOAD_FILE_ERROR = 2
FILE_NOT_FOUND_ERROR = 3
GET_FILE_ERROR = 4
class S3ClientError(RuntimeError):
"""
Custom exception class for handling errors related to interactions with the S3 storage service via boto3.client.
This class maps error codes to human-readable error messages. When an instance of S3ClientError is created,
an error code can be passed in to set the error message for that instance.
Attributes:
_error_mapping: A dictionary mapping error codes to error messages.
code: The error code for a specific instance of S3ClientError.
message: The error message for a specific instance of S3ClientError, determined by the error code.
"""
_error_mapping = {
S3ClientErrorCodes.ACCESS_S3_CLIENT_ERROR: "Failed to establish a connection with the storage service.",
S3ClientErrorCodes.UPLOAD_FILE_ERROR: "File upload to the storage service failed.",
S3ClientErrorCodes.FILE_NOT_FOUND_ERROR: "Requested file not found in the storage service.",
S3ClientErrorCodes.GET_FILE_ERROR: (
"Retrieval of the requested file from " "the storage service failed due to an unspecified error."
),
}
def __init__(self, *args, code=None, **kwargs):
super().__init__(*args, **kwargs)
self.code = code
if self.code in self._error_mapping:
self.message = self._error_mapping.get(self.code)
def __str__(self):
return f"{self.message}"
class S3ClientHelper:
"""
A helper class for interacting with Amazon S3 via the boto3 client.
This class simplifies the process of initializing the boto3 client,
uploading files to S3, and retrieving files from S3.
Attributes:
boto_client: The boto3 client used to interact with S3.
"""
def __init__(self):
try:
self.boto_client = boto3.client(
"s3",
region_name=settings.AWS_S3_REGION,
aws_access_key_id=settings.AWS_S3_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_S3_SECRET_ACCESS_KEY,
config=settings.BOTO_CONFIG,
)
except Exception as exc:
raise S3ClientError(code=S3ClientErrorCodes.ACCESS_S3_CLIENT_ERROR) from exc
def get_bucket_name(self):
"""
Retrieves the name of the S3 bucket.
This method returns the name of the S3 bucket as defined in the application's settings.
Returns:
str: The name of the S3 bucket.
"""
return settings.AWS_S3_BUCKET_NAME
def upload_file(self, file_path, file_name):
"""
Uploads a file to the S3 bucket.
This method attempts to upload a file to the S3 bucket using the boto3 client.
If an exception occurs during the upload process, it raises an S3ClientError with an UPLOAD_FILE_ERROR code.
Args:
file_path (str): The path of the file to upload.
file_name (str): The name to give to the file in the S3 bucket.
Returns:
dict: The response from the boto3 client's upload_file method.
Raises:
S3ClientError: If the file cannot be uploaded to the S3 bucket.
"""
try:
response = self.boto_client.upload_file(file_path, self.get_bucket_name(), file_name)
except Exception as exc:
raise S3ClientError(code=S3ClientErrorCodes.UPLOAD_FILE_ERROR) from exc
return response
def get_file(self, file_name, decode_to_utf=False):
"""
Retrieves a file from the S3 bucket and returns its content.
This method attempts to retrieve a file from the S3 bucket using the boto3 client.
If the file is not found, it raises an S3ClientError with a FILE_NOT_FOUND_ERROR code.
For any other exceptions during the retrieval process, it raises an S3ClientError with a GET_FILE_ERROR code.
Args:
file_name (str): The name of the file to retrieve from the S3 bucket.
decode_to_utf (bool, optional): If True, the file content is decoded from bytes to a UTF-8 string.
Defaults to False.
Returns:
bytes or str: The content of the file. If decode_to_utf is True, this is a string. Otherwise, its bytes.
Raises:
S3ClientError: If the file cannot be retrieved from the S3 bucket.
"""
try:
response = self.boto_client.get_object(Bucket=self.get_bucket_name(), Key=file_name)
except ClientError as exc:
if exc.response["Error"]["Code"] == "NoSuchKey":
raise S3ClientError(code=S3ClientErrorCodes.FILE_NOT_FOUND_ERROR) from exc
else:
raise S3ClientError(code=S3ClientErrorCodes.GET_FILE_ERROR) from exc
except Exception as exc:
raise S3ClientError(code=S3ClientErrorCodes.GET_FILE_ERROR) from exc
file_content = response["Body"].read()
if decode_to_utf:
return file_content.decode("utf-8")
else:
return file_content