Spaces:
Runtime error
Runtime error
""" | |
Utility that uses boto to create buckets. | |
This work is not our own but is entirely written by https://github.com/full-stack-deep-learning. | |
""" | |
import hashlib | |
import json | |
import boto3 | |
import botocore | |
S3_URL_FORMAT = "https://{bucket}.s3.{region}.amazonaws.com/{key}" | |
S3_URI_FORMAT = "s3://{bucket}/{key}" | |
s3 = boto3.resource("s3") | |
def get_or_create_bucket(name): | |
"""Gets an S3 bucket with boto3 or creates it if it doesn't exist.""" | |
try: # try to create a bucket | |
name, response = _create_bucket(name) | |
except botocore.exceptions.ClientError as err: | |
# error handling from https://github.com/boto/boto3/issues/1195#issuecomment-495842252 | |
status = err.response["ResponseMetadata"][ | |
"HTTPStatusCode" | |
] # status codes identify particular errors | |
if status == 409: # if the bucket exists already, | |
pass # we don't need to make it -- we presume we have the right permissions | |
else: | |
raise err | |
bucket = s3.Bucket(name) | |
return bucket | |
def _create_bucket(name): | |
"""Creates a bucket with the provided name.""" | |
session = boto3.session.Session() # sessions hold on to credentials and config | |
current_region = session.region_name # so we can pull the default region | |
bucket_config = {"LocationConstraint": current_region} # and apply it to the bucket | |
bucket_response = s3.create_bucket( | |
Bucket=name, CreateBucketConfiguration=bucket_config | |
) | |
return name, bucket_response | |
def make_key(fileobj, filetype=None): | |
"""Creates a unique key for the fileobj and optionally append the filetype.""" | |
identifier = make_identifier(fileobj) | |
if filetype is None: | |
return identifier | |
else: | |
return identifier + "." + filetype | |
def make_unique_bucket_name(prefix, seed): | |
"""Creates a unique bucket name from a prefix and a seed.""" | |
name = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10] | |
return prefix + "-" + name | |
def get_url_of(bucket, key=None): | |
"""Returns the url of a bucket and optionally of an object in that bucket.""" | |
if not isinstance(bucket, str): | |
bucket = bucket.name | |
region = _get_region(bucket) | |
key = key or "" | |
url = _format_url(bucket, region, key) | |
return url | |
def get_uri_of(bucket, key=None): | |
"""Returns the s3:// uri of a bucket and optionally of an object in that bucket.""" | |
if not isinstance(bucket, str): | |
bucket = bucket.name | |
key = key or "" | |
uri = _format_uri(bucket, key) | |
return uri | |
def enable_bucket_versioning(bucket): | |
"""Turns on versioning for bucket contents, which avoids deletion.""" | |
if not isinstance(bucket, str): | |
bucket = bucket.name | |
bucket_versioning = s3.BucketVersioning(bucket) | |
return bucket_versioning.enable() | |
def add_access_policy(bucket): | |
"""Adds a policy to our bucket that allows the Gantry app to access data.""" | |
access_policy = json.dumps(_get_policy(bucket.name)) | |
s3.meta.client.put_bucket_policy(Bucket=bucket.name, Policy=access_policy) | |
def _get_policy(bucket_name): | |
"""Returns a bucket policy allowing Gantry app access as a JSON-compatible dictionary.""" | |
return { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"AWS": [ | |
"arn:aws:iam::848836713690:root", | |
"arn:aws:iam::339325199688:root", | |
"arn:aws:iam::665957668247:root", | |
] | |
}, | |
"Action": ["s3:GetObject", "s3:GetObjectVersion"], | |
"Resource": f"arn:aws:s3:::{bucket_name}/*", | |
}, | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"AWS": [ | |
"arn:aws:iam::848836713690:root", | |
"arn:aws:iam::339325199688:root", | |
"arn:aws:iam::665957668247:root", | |
] | |
}, | |
"Action": "s3:ListBucketVersions", | |
"Resource": f"arn:aws:s3:::{bucket_name}", | |
}, | |
], | |
} | |
def make_identifier(byte_data): | |
"""Create a unique identifier for a collection of bytes via hashing.""" | |
# feed them to hashing algo -- security is not critical here, so we use SHA-1 | |
hashed_data = hashlib.sha1(byte_data) # noqa: S3 | |
identifier = hashed_data.hexdigest() # turn it into hexdecimal | |
return identifier | |
def _get_region(bucket): | |
"""Determine the region of an s3 bucket.""" | |
if not isinstance(bucket, str): | |
bucket = bucket.name | |
s3_client = boto3.client("s3") | |
bucket_location_response = s3_client.get_bucket_location(Bucket=bucket) | |
bucket_location = bucket_location_response["LocationConstraint"] | |
return bucket_location | |
def _format_url(bucket_name, region, key=None): | |
key = key or "" | |
url = S3_URL_FORMAT.format(bucket=bucket_name, region=region, key=key) | |
return url | |
def _format_uri(bucket_name, key=None): | |
key = key or "" | |
uri = S3_URI_FORMAT.format(bucket=bucket_name, key=key) | |
return uri | |