ptchecker / awsfunctions.py
viboognesh-doaz
fixed aws errors
5e32aa7
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import os
def upload_folder_to_s3(local_dir, prefix=''):
s3_bucket = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
for root, dirs, files in os.walk(local_dir):
for dir in dirs:
dir_path = os.path.join(root, dir)
relative_path = os.path.relpath(dir_path, local_dir)
# Create the directory in S3 if it doesn't exist
try:
s3_client.put_object(Bucket=s3_bucket, Key=os.path.join(prefix, relative_path))
except ClientError as e:
if e.response['Error']['Code'] == '404':
continue # Directory already exists
else:
raise
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, local_dir)
try:
s3_client.upload_file(file_path, s3_bucket, os.path.join(prefix, relative_path))
print(f"Uploaded: {file_path} -> s3://{s3_bucket}/{os.path.join(prefix, relative_path)}")
except Exception as e:
raise e
# print(f"Error uploading {file_path}: {e}")
def check_file_exists_in_s3(file_path):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
s3_client.head_object(Bucket=bucket_name, Key=file_path)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise e
def download_files_from_s3(local_folder, file_path_list):
s3 = boto3.client('s3')
bucket_name = os.getenv("AWS_BUCKET_NAME")
folder_prefix = ''
try:
# List objects in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix)
# Download filtered files
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Apply file filter if specified
if key not in file_path_list:
continue
# Construct local file path
local_path = os.path.join(local_folder, key)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
print(f"Downloading: {key} -> {local_path}")
s3.download_file(bucket_name, key, local_path)
print(f"Downloaded: {local_path}")
except Exception as e:
print(f"Error downloading {key}: {e}")
except NoCredentialsError:
print("No AWS credentials found.")
except Exception as e:
print(f"An error occurred: {e}")
def download_folder_from_s3(local_folder, aws_folder_prefix):
s3 = boto3.client('s3')
bucket_name = os.getenv("AWS_BUCKET_NAME")
if not bucket_name:
raise ValueError("AWS_BUCKET_NAME environment variable is not set")
try:
# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)
# List objects in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=aws_folder_prefix)
# Process objects
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Determine if it's a file or directory
if obj['Size'] == 0:
continue
# Construct local file path
local_path = os.path.join(local_folder, os.path.relpath(key, aws_folder_prefix))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
print(f"Downloading: {key} -> {local_path}")
s3.download_file(bucket_name, key, local_path)
print(f"Downloaded: {local_path}")
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
print(f"Permission denied when trying to download {key}: {e}")
elif e.response['Error']['Code'] == 'NoSuchKey':
print(f"The object {key} does not exist in the bucket.")
elif e.response['Error']['Code'] == "":
pass
else:
print(f"An error occurred while downloading {key}: {e}")
raise e
except Exception as e:
print(f"An unexpected error occurred : {e}")
def delete_s3_folder(folder_path):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
# List objects in the S3 bucket
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_path)
# Delete objects within the folder_path
delete_keys = {'Objects': []}
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Construct the full key for deletion
delete_key = {'Key': key}
delete_keys['Objects'].append(delete_key)
print(f"Deleting: {key}")
# Perform batch delete operation
if len(delete_keys['Objects']) > 0:
s3_client.delete_objects(Bucket=bucket_name, Delete=delete_keys)
print(f"Deleted {len(delete_keys['Objects'])} objects in folder '{folder_path}'")
else:
print(f"No objects found in folder '{folder_path}'")
except ClientError as e:
print(f"An error occurred: {e}")
def list_s3_objects(prefix=''):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
for page in page_iterator:
for obj in page.get('Contents', []):
print(f"Key: {obj['Key']}")
print(f"Size: {obj['Size']} bytes")
print(f"Last Modified: {obj['LastModified']}")
print(f"ETag: {obj['ETag']}")
print(f"File Extension: {os.path.splitext(obj['Key'])[-1]}")
print("---")
except ClientError as e:
print(f"An error occurred: {e}")