File size: 1,786 Bytes
7781557 902845d 7781557 902845d 7781557 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import boto3
from botocore.exceptions import ClientError
import datetime
from typing import Optional, List
from .models import User, FileUpload
dynamodb = boto3.resource('dynamodb')
users_table = dynamodb.Table('Users')
files_table = dynamodb.Table('Files')
async def get_user_by_username(username: str) -> Optional[User]:
try:
response = users_table.get_item(Key={'username': username})
if 'Item' in response:
return User(**response['Item'])
return None
except ClientError:
return None
async def create_user(user: User) -> bool:
try:
users_table.put_item(
Item={
'username': user.username,
'email': user.email,
'password': user.password,
},
ConditionExpression='attribute_not_exists(username)'
)
return True
except ClientError:
return False
async def save_file(username: str, file_upload: FileUpload) -> bool:
try:
files_table.put_item(
Item={
'username': username,
'filename': file_upload.filename,
'content': file_upload.content,
'created_at': datetime.datetime.now(datetime.UTC).isoformat(),
'updated_at': datetime.datetime.now(datetime.UTC).isoformat()
}
)
return True
except ClientError:
return False
async def get_user_files(username: str) -> List[FileUpload]:
try:
response = files_table.query(
KeyConditionExpression='username = :username',
ExpressionAttributeValues={':username': username}
)
return [FileUpload(**item) for item in response.get('Items', [])]
except ClientError:
return [] |