|
import os |
|
os.system("sudo apt-get install xclip") |
|
import nltk |
|
from fastapi import FastAPI, File, Request, UploadFile, Body, Depends, HTTPException |
|
from fastapi.security.api_key import APIKeyHeader |
|
from typing import Optional, Annotated |
|
from fastapi.encoders import jsonable_encoder |
|
from PIL import Image |
|
from io import BytesIO |
|
import pytesseract |
|
from nltk.tokenize import sent_tokenize |
|
from transformers import MarianMTModel, MarianTokenizer |
|
nltk.download('punkt') |
|
|
|
API_KEY = os.environ.get("API_KEY") |
|
|
|
app = FastAPI() |
|
api_key_header = APIKeyHeader(name="api_key", auto_error=False) |
|
|
|
def get_api_key(api_key: Optional[str] = Depends(api_key_header)): |
|
if api_key is None or api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Unauthorized access") |
|
return api_key |
|
|
|
|
|
img_dir = "./data" |
|
|
|
choices = os.popen('tesseract --list-langs').read().split('\n')[1:-1] |
|
|
|
def ocr_lang(lang_list): |
|
lang_str = "" |
|
lang_len = len(lang_list) |
|
if lang_len == 1: |
|
return lang_list[0] |
|
else: |
|
for i in range(lang_len): |
|
lang_list.insert(lang_len - i, "+") |
|
|
|
lang_str = "".join(lang_list[:-1]) |
|
return lang_str |
|
|
|
def ocr_tesseract(img, languages): |
|
print("[img]", img) |
|
print("[languages]", languages) |
|
ocr_str = pytesseract.image_to_string(img, lang=ocr_lang(languages)) |
|
return ocr_str |
|
|
|
@app.post("/api/ocr", response_model=dict) |
|
async def ocr( |
|
api_key: str = Depends(get_api_key), |
|
image: UploadFile = File(...), |
|
|
|
): |
|
|
|
try: |
|
content = await image.read() |
|
image = Image.open(BytesIO(content)) |
|
print("[image]",image) |
|
if hasattr(pytesseract, "image_to_string"): |
|
print("Image to string function is available") |
|
|
|
text = ocr_tesseract(image, ['eng']) |
|
else: |
|
print("Image to string function is not available") |
|
|
|
except Exception as e: |
|
return {"error": str(e)}, 500 |
|
|
|
return {"ImageText": "text"} |
|
|
|
@app.post("/api/translate", response_model=dict) |
|
async def translate( |
|
api_key: str = Depends(get_api_key), |
|
text: str = Body(...), |
|
src: str = "en", |
|
trg: str = "zh", |
|
): |
|
if api_key != API_KEY: |
|
return {"error": "Invalid API key"}, 401 |
|
|
|
tokenizer, model = get_model(src, trg) |
|
|
|
translated_text = "" |
|
for sentence in sent_tokenize(text): |
|
translated_sub = model.generate(**tokenizer(sentence, return_tensors="pt"))[0] |
|
translated_text += tokenizer.decode(translated_sub, skip_special_tokens=True) + "\n" |
|
|
|
return jsonable_encoder({"translated_text": translated_text}) |
|
|
|
def get_model(src: str, trg: str): |
|
model_name = f"Helsinki-NLP/opus-mt-{src}-{trg}" |
|
tokenizer = MarianTokenizer.from_pretrained(model_name) |
|
model = MarianMTModel.from_pretrained(model_name) |
|
return tokenizer, model |
|
|
|
|
|
|