File size: 6,086 Bytes
bcfd8ed
 
 
 
 
 
 
 
0fe3450
bcfd8ed
 
 
 
 
 
 
 
 
 
 
5ecc89c
 
 
e65910f
 
 
 
 
5ecc89c
 
bcfd8ed
 
cb50fdb
 
 
 
bcfd8ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb50fdb
 
 
bd3c7b9
bcfd8ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
'''
        Created By Lewis Kamau Kimaru 
        Sema translator api backend
        January 2024
        Docker deployment
'''

from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
import ctranslate2
import sentencepiece as spm
import fasttext
import uvicorn
import pytz
from datetime import datetime
import os

app = FastAPI()

origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=False,
    allow_methods=["*"],
    allow_headers=["*"],
)

fasttext.FastText.eprint = lambda x: None

# User interface
templates_folder = os.path.join(os.path.dirname(__file__), "templates")


# Get time of request

def get_time():
    nairobi_timezone = pytz.timezone('Africa/Nairobi')
    current_time_nairobi = datetime.now(nairobi_timezone)
    
    curr_day = current_time_nairobi.strftime('%A')
    curr_date = current_time_nairobi.strftime('%Y-%m-%d')
    curr_time = current_time_nairobi.strftime('%H:%M:%S')
    
    full_date = f"{curr_day} | {curr_date} | {curr_time}"
    return full_date, curr_time
    
# Load the model and tokenizer ..... only once!
beam_size = 1  # change to a smaller value for faster inference
device = "cpu"  # or "cuda"

# Language Prediction model
print("\nimporting Language Prediction model")
lang_model_file = "lid218e.bin"
lang_model_full_path = os.path.join(os.path.dirname(__file__), lang_model_file)
lang_model = fasttext.load_model(lang_model_full_path)


# Load the source SentencePiece model
print("\nimporting SentencePiece model")
sp_model_file = "spm.model"
sp_model_full_path = os.path.join(os.path.dirname(__file__), sp_model_file)
sp = spm.SentencePieceProcessor()
sp.load(sp_model_full_path)

# Import The Translator model
print("\nimporting Translator model")
ct_model_file = "sematrans-3.3B"
ct_model_full_path = os.path.join(os.path.dirname(__file__), ct_model_file)
translator = ctranslate2.Translator(ct_model_full_path, device)

print('\nDone importing models\n')

    
def translate_detect(userinput: str, target_lang: str):
    source_sents = [userinput]
    source_sents = [sent.strip() for sent in source_sents]
    target_prefix = [[target_lang]] * len(source_sents)

    # Predict the source language
    predictions = lang_model.predict(source_sents[0], k=1)
    source_lang = predictions[0][0].replace('__label__', '')

    # Subword the source sentences
    source_sents_subworded = sp.encode(source_sents, out_type=str)
    source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded]

    # Translate the source sentences
    translations = translator.translate_batch(
        source_sents_subworded,
        batch_type="tokens",
        max_batch_size=2024,
        beam_size=beam_size,
        target_prefix=target_prefix,
    )
    translations = [translation[0]['tokens'] for translation in translations]

    # Desubword the target sentences
    translations_desubword = sp.decode(translations)
    translations_desubword = [sent[len(target_lang):] for sent in translations_desubword]

    # Return the source language and the translated text
    return source_lang, translations_desubword

def translate_enter(userinput: str, source_lang: str, target_lang: str):  
  source_sents = [userinput]
  source_sents = [sent.strip() for sent in source_sents]
  target_prefix = [[target_lang]] * len(source_sents)

  # Subword the source sentences
  source_sents_subworded = sp.encode(source_sents, out_type=str)
  source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded]

  # Translate the source sentences
  translations = translator.translate_batch(source_sents_subworded, batch_type="tokens", max_batch_size=2024, beam_size=beam_size, target_prefix=target_prefix)
  translations = [translation[0]['tokens'] for translation in translations]

  # Desubword the target sentences
  translations_desubword = sp.decode(translations)
  translations_desubword = [sent[len(target_lang):] for sent in translations_desubword]

  # Return the source language and the translated text
  return translations_desubword[0]

    
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    return HTMLResponse(content=open(os.path.join(templates_folder, "translator.html"), "r").read(), status_code=200)

@app.post("/translate_detect/")
async def translate_detect_endpoint(request: Request):
    datad = await request.json()
    userinputd = datad.get("userinput")
    target_langd = datad.get("target_lang")
    dfull_date = get_time()[0]
    print(f"\nrequest: {dfull_date}\nTarget Language; {target_langd}, User Input: {userinputd}\n")

    if not userinputd or not target_langd:
        raise HTTPException(status_code=422, detail="Both 'userinput' and 'target_lang' are required.")

    source_langd, translated_text_d = translate_detect(userinputd, target_langd)
    dcurrent_time = get_time()[1]
    print(f"\nresponse: {dcurrent_time}; ... Source_language: {source_langd}, Translated Text: {translated_text_d}\n\n")
    return {
        "source_language": source_langd,
        "translated_text": translated_text_d[0],
    }


@app.post("/translate_enter/")
async def translate_enter_endpoint(request: Request):
    datae = await request.json()
    userinpute = datae.get("userinput")
    source_lange = datae.get("source_lang")
    target_lange = datae.get("target_lang")
    efull_date = get_time()[0]
    print(f"\nrequest: {efull_date}\nSource_language; {source_lange}, Target Language; {target_lange}, User Input: {userinpute}\n")

    if not userinpute or not target_lange:
        raise HTTPException(status_code=422, detail="'userinput' 'sourc_lang'and 'target_lang' are required.")

    translated_text_e = translate_enter(userinpute, source_lange, target_lange)
    ecurrent_time = get_time()[1]
    print(f"\nresponse: {ecurrent_time}; ... Translated Text: {translated_text_e}\n\n")
    return {
        "translated_text": translated_text_e,
    }


print("\nAPI starting .......\n")