Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn as nn | |
import numpy as np | |
import pandas as pd | |
import random | |
import sys | |
import os | |
import json | |
import enum | |
import traceback | |
import re | |
F_DIR = os.path.dirname(os.environ.get('translit_model_base_path', os.path.realpath(__file__))) | |
class XlitError(enum.Enum): | |
lang_err = "Unsupported langauge ID requested ;( Please check available languages." | |
string_err = "String passed is incompatable ;(" | |
internal_err = "Internal crash ;(" | |
unknown_err = "Unknown Failure" | |
loading_err = "Loading failed ;( Check if metadata/paths are correctly configured." | |
##=================== Network ================================================== | |
class Encoder(nn.Module): | |
def __init__( | |
self, | |
input_dim, | |
embed_dim, | |
hidden_dim, | |
rnn_type="gru", | |
layers=1, | |
bidirectional=False, | |
dropout=0, | |
device="cpu", | |
): | |
super(Encoder, self).__init__() | |
self.input_dim = input_dim # src_vocab_sz | |
self.enc_embed_dim = embed_dim | |
self.enc_hidden_dim = hidden_dim | |
self.enc_rnn_type = rnn_type | |
self.enc_layers = layers | |
self.enc_directions = 2 if bidirectional else 1 | |
self.device = device | |
self.embedding = nn.Embedding(self.input_dim, self.enc_embed_dim) | |
if self.enc_rnn_type == "gru": | |
self.enc_rnn = nn.GRU( | |
input_size=self.enc_embed_dim, | |
hidden_size=self.enc_hidden_dim, | |
num_layers=self.enc_layers, | |
bidirectional=bidirectional, | |
) | |
elif self.enc_rnn_type == "lstm": | |
self.enc_rnn = nn.LSTM( | |
input_size=self.enc_embed_dim, | |
hidden_size=self.enc_hidden_dim, | |
num_layers=self.enc_layers, | |
bidirectional=bidirectional, | |
) | |
else: | |
raise Exception("XlitError: unknown RNN type mentioned") | |
def forward(self, x, x_sz, hidden=None): | |
""" | |
x_sz: (batch_size, 1) - Unpadded sequence lengths used for pack_pad | |
""" | |
batch_sz = x.shape[0] | |
# x: batch_size, max_length, enc_embed_dim | |
x = self.embedding(x) | |
## pack the padded data | |
# x: max_length, batch_size, enc_embed_dim -> for pack_pad | |
x = x.permute(1, 0, 2) | |
x = nn.utils.rnn.pack_padded_sequence(x, x_sz, enforce_sorted=False) # unpad | |
# output: packed_size, batch_size, enc_embed_dim | |
# hidden: n_layer**num_directions, batch_size, hidden_dim | if LSTM (h_n, c_n) | |
output, hidden = self.enc_rnn( | |
x | |
) # gru returns hidden state of all timesteps as well as hidden state at last timestep | |
## pad the sequence to the max length in the batch | |
# output: max_length, batch_size, enc_emb_dim*directions) | |
output, _ = nn.utils.rnn.pad_packed_sequence(output) | |
# output: batch_size, max_length, hidden_dim | |
output = output.permute(1, 0, 2) | |
return output, hidden | |
def get_word_embedding(self, x): | |
""" """ | |
x_sz = torch.tensor([len(x)]) | |
x_ = torch.tensor(x).unsqueeze(0).to(dtype=torch.long) | |
# x: 1, max_length, enc_embed_dim | |
x = self.embedding(x_) | |
## pack the padded data | |
# x: max_length, 1, enc_embed_dim -> for pack_pad | |
x = x.permute(1, 0, 2) | |
x = nn.utils.rnn.pack_padded_sequence(x, x_sz, enforce_sorted=False) # unpad | |
# output: packed_size, 1, enc_embed_dim | |
# hidden: n_layer**num_directions, 1, hidden_dim | if LSTM (h_n, c_n) | |
output, hidden = self.enc_rnn( | |
x | |
) # gru returns hidden state of all timesteps as well as hidden state at last timestep | |
out_embed = hidden[0].squeeze() | |
return out_embed | |
class Decoder(nn.Module): | |
def __init__( | |
self, | |
output_dim, | |
embed_dim, | |
hidden_dim, | |
rnn_type="gru", | |
layers=1, | |
use_attention=True, | |
enc_outstate_dim=None, # enc_directions * enc_hidden_dim | |
dropout=0, | |
device="cpu", | |
): | |
super(Decoder, self).__init__() | |
self.output_dim = output_dim # tgt_vocab_sz | |
self.dec_hidden_dim = hidden_dim | |
self.dec_embed_dim = embed_dim | |
self.dec_rnn_type = rnn_type | |
self.dec_layers = layers | |
self.use_attention = use_attention | |
self.device = device | |
if self.use_attention: | |
self.enc_outstate_dim = enc_outstate_dim if enc_outstate_dim else hidden_dim | |
else: | |
self.enc_outstate_dim = 0 | |
self.embedding = nn.Embedding(self.output_dim, self.dec_embed_dim) | |
if self.dec_rnn_type == "gru": | |
self.dec_rnn = nn.GRU( | |
input_size=self.dec_embed_dim | |
+ self.enc_outstate_dim, # to concat attention_output | |
hidden_size=self.dec_hidden_dim, # previous Hidden | |
num_layers=self.dec_layers, | |
batch_first=True, | |
) | |
elif self.dec_rnn_type == "lstm": | |
self.dec_rnn = nn.LSTM( | |
input_size=self.dec_embed_dim | |
+ self.enc_outstate_dim, # to concat attention_output | |
hidden_size=self.dec_hidden_dim, # previous Hidden | |
num_layers=self.dec_layers, | |
batch_first=True, | |
) | |
else: | |
raise Exception("XlitError: unknown RNN type mentioned") | |
self.fc = nn.Sequential( | |
nn.Linear(self.dec_hidden_dim, self.dec_embed_dim), | |
nn.LeakyReLU(), | |
# nn.Linear(self.dec_embed_dim, self.dec_embed_dim), nn.LeakyReLU(), # removing to reduce size | |
nn.Linear(self.dec_embed_dim, self.output_dim), | |
) | |
##----- Attention ---------- | |
if self.use_attention: | |
self.W1 = nn.Linear(self.enc_outstate_dim, self.dec_hidden_dim) | |
self.W2 = nn.Linear(self.dec_hidden_dim, self.dec_hidden_dim) | |
self.V = nn.Linear(self.dec_hidden_dim, 1) | |
def attention(self, x, hidden, enc_output): | |
""" | |
x: (batch_size, 1, dec_embed_dim) -> after Embedding | |
enc_output: batch_size, max_length, enc_hidden_dim *num_directions | |
hidden: n_layers, batch_size, hidden_size | if LSTM (h_n, c_n) | |
""" | |
## perform addition to calculate the score | |
# hidden_with_time_axis: batch_size, 1, hidden_dim | |
## hidden_with_time_axis = hidden.permute(1, 0, 2) ## replaced with below 2lines | |
hidden_with_time_axis = ( | |
torch.sum(hidden, axis=0) | |
if self.dec_rnn_type != "lstm" | |
else torch.sum(hidden[0], axis=0) | |
) # h_n | |
hidden_with_time_axis = hidden_with_time_axis.unsqueeze(1) | |
# score: batch_size, max_length, hidden_dim | |
score = torch.tanh(self.W1(enc_output) + self.W2(hidden_with_time_axis)) | |
# attention_weights: batch_size, max_length, 1 | |
# we get 1 at the last axis because we are applying score to self.V | |
attention_weights = torch.softmax(self.V(score), dim=1) | |
# context_vector shape after sum == (batch_size, hidden_dim) | |
context_vector = attention_weights * enc_output | |
context_vector = torch.sum(context_vector, dim=1) | |
# context_vector: batch_size, 1, hidden_dim | |
context_vector = context_vector.unsqueeze(1) | |
# attend_out (batch_size, 1, dec_embed_dim + hidden_size) | |
attend_out = torch.cat((context_vector, x), -1) | |
return attend_out, attention_weights | |
def forward(self, x, hidden, enc_output): | |
""" | |
x: (batch_size, 1) | |
enc_output: batch_size, max_length, dec_embed_dim | |
hidden: n_layer, batch_size, hidden_size | lstm: (h_n, c_n) | |
""" | |
if (hidden is None) and (self.use_attention is False): | |
raise Exception( | |
"XlitError: No use of a decoder with No attention and No Hidden" | |
) | |
batch_sz = x.shape[0] | |
if hidden is None: | |
# hidden: n_layers, batch_size, hidden_dim | |
hid_for_att = torch.zeros( | |
(self.dec_layers, batch_sz, self.dec_hidden_dim) | |
).to(self.device) | |
elif self.dec_rnn_type == "lstm": | |
hid_for_att = hidden[1] # c_n | |
# x (batch_size, 1, dec_embed_dim) -> after embedding | |
x = self.embedding(x) | |
if self.use_attention: | |
# x (batch_size, 1, dec_embed_dim + hidden_size) -> after attention | |
# aw: (batch_size, max_length, 1) | |
x, aw = self.attention(x, hidden, enc_output) | |
else: | |
x, aw = x, 0 | |
# passing the concatenated vector to the GRU | |
# output: (batch_size, n_layers, hidden_size) | |
# hidden: n_layers, batch_size, hidden_size | if LSTM (h_n, c_n) | |
output, hidden = ( | |
self.dec_rnn(x, hidden) if hidden is not None else self.dec_rnn(x) | |
) | |
# output :shp: (batch_size * 1, hidden_size) | |
output = output.view(-1, output.size(2)) | |
# output :shp: (batch_size * 1, output_dim) | |
output = self.fc(output) | |
return output, hidden, aw | |
class Seq2Seq(nn.Module): | |
""" | |
Class dependency: Encoder, Decoder | |
""" | |
def __init__( | |
self, encoder, decoder, pass_enc2dec_hid=False, dropout=0, device="cpu" | |
): | |
super(Seq2Seq, self).__init__() | |
self.encoder = encoder | |
self.decoder = decoder | |
self.device = device | |
self.pass_enc2dec_hid = pass_enc2dec_hid | |
_force_en2dec_hid_conv = False | |
if self.pass_enc2dec_hid: | |
assert ( | |
decoder.dec_hidden_dim == encoder.enc_hidden_dim | |
), "Hidden Dimension of encoder and decoder must be same, or unset `pass_enc2dec_hid`" | |
if decoder.use_attention: | |
assert ( | |
decoder.enc_outstate_dim | |
== encoder.enc_directions * encoder.enc_hidden_dim | |
), "Set `enc_out_dim` correctly in decoder" | |
assert ( | |
self.pass_enc2dec_hid or decoder.use_attention | |
), "No use of a decoder with No attention and No Hidden from Encoder" | |
self.use_conv_4_enc2dec_hid = False | |
if ( | |
self.pass_enc2dec_hid | |
and (encoder.enc_directions * encoder.enc_layers != decoder.dec_layers) | |
) or _force_en2dec_hid_conv: | |
if encoder.enc_rnn_type == "lstm" or encoder.enc_rnn_type == "lstm": | |
raise Exception( | |
"XlitError: conv for enc2dec_hid not implemented; Change the layer numbers appropriately" | |
) | |
self.use_conv_4_enc2dec_hid = True | |
self.enc_hid_1ax = encoder.enc_directions * encoder.enc_layers | |
self.dec_hid_1ax = decoder.dec_layers | |
self.e2d_hidden_conv = nn.Conv1d(self.enc_hid_1ax, self.dec_hid_1ax, 1) | |
def enc2dec_hidden(self, enc_hidden): | |
""" | |
enc_hidden: n_layer, batch_size, hidden_dim*num_directions | |
TODO: Implement the logic for LSTm bsed model | |
""" | |
# hidden: batch_size, enc_layer*num_directions, enc_hidden_dim | |
hidden = enc_hidden.permute(1, 0, 2).contiguous() | |
# hidden: batch_size, dec_layers, dec_hidden_dim -> [N,C,Tstep] | |
hidden = self.e2d_hidden_conv(hidden) | |
# hidden: dec_layers, batch_size , dec_hidden_dim | |
hidden_for_dec = hidden.permute(1, 0, 2).contiguous() | |
return hidden_for_dec | |
def active_beam_inference(self, src, beam_width=3, max_tgt_sz=50): | |
"""Search based decoding | |
src: (sequence_len) | |
""" | |
def _avg_score(p_tup): | |
"""Used for Sorting | |
TODO: Dividing by length of sequence power alpha as hyperparam | |
""" | |
return p_tup[0] | |
import sys | |
batch_size = 1 | |
start_tok = src[0] | |
end_tok = src[-1] | |
src_sz = torch.tensor([len(src)]) | |
src_ = src.unsqueeze(0) | |
# enc_output: (batch_size, padded_seq_length, enc_hidden_dim*num_direction) | |
# enc_hidden: (enc_layers*num_direction, batch_size, hidden_dim) | |
enc_output, enc_hidden = self.encoder(src_, src_sz) | |
if self.pass_enc2dec_hid: | |
# dec_hidden: dec_layers, batch_size , dec_hidden_dim | |
if self.use_conv_4_enc2dec_hid: | |
init_dec_hidden = self.enc2dec_hidden(enc_hidden) | |
else: | |
init_dec_hidden = enc_hidden | |
else: | |
# dec_hidden -> Will be initialized to zeros internally | |
init_dec_hidden = None | |
# top_pred[][0] = Σ-log_softmax | |
# top_pred[][1] = sequence torch.tensor shape: (1) | |
# top_pred[][2] = dec_hidden | |
top_pred_list = [(0, start_tok.unsqueeze(0), init_dec_hidden)] | |
for t in range(max_tgt_sz): | |
cur_pred_list = [] | |
for p_tup in top_pred_list: | |
if p_tup[1][-1] == end_tok: | |
cur_pred_list.append(p_tup) | |
continue | |
# dec_hidden: dec_layers, 1, hidden_dim | |
# dec_output: 1, output_dim | |
dec_output, dec_hidden, _ = self.decoder( | |
x=p_tup[1][-1].view(1, 1), # dec_input: (1,1) | |
hidden=p_tup[2], | |
enc_output=enc_output, | |
) | |
## π{prob} = Σ{log(prob)} -> to prevent diminishing | |
# dec_output: (1, output_dim) | |
dec_output = nn.functional.log_softmax(dec_output, dim=1) | |
# pred_topk.values & pred_topk.indices: (1, beam_width) | |
pred_topk = torch.topk(dec_output, k=beam_width, dim=1) | |
for i in range(beam_width): | |
sig_logsmx_ = p_tup[0] + pred_topk.values[0][i] | |
# seq_tensor_ : (seq_len) | |
seq_tensor_ = torch.cat((p_tup[1], pred_topk.indices[0][i].view(1))) | |
cur_pred_list.append((sig_logsmx_, seq_tensor_, dec_hidden)) | |
cur_pred_list.sort(key=_avg_score, reverse=True) # Maximized order | |
top_pred_list = cur_pred_list[:beam_width] | |
# check if end_tok of all topk | |
end_flags_ = [1 if t[1][-1] == end_tok else 0 for t in top_pred_list] | |
if beam_width == sum(end_flags_): | |
break | |
pred_tnsr_list = [t[1] for t in top_pred_list] | |
return pred_tnsr_list | |
##===================== Glyph handlers ======================================= | |
class GlyphStrawboss: | |
def __init__(self, glyphs="en"): | |
"""list of letters in a language in unicode | |
lang: ISO Language code | |
glyphs: json file with script information | |
""" | |
if glyphs == "en": | |
# Smallcase alone | |
self.glyphs = [chr(alpha) for alpha in range(97, 122 + 1)] | |
else: | |
self.dossier = json.load(open(glyphs, encoding="utf-8")) | |
self.glyphs = self.dossier["glyphs"] | |
self.numsym_map = self.dossier["numsym_map"] | |
self.char2idx = {} | |
self.idx2char = {} | |
self._create_index() | |
def _create_index(self): | |
self.char2idx["_"] = 0 # pad | |
self.char2idx["$"] = 1 # start | |
self.char2idx["#"] = 2 # end | |
self.char2idx["*"] = 3 # Mask | |
self.char2idx["'"] = 4 # apostrophe U+0027 | |
self.char2idx["%"] = 5 # unused | |
self.char2idx["!"] = 6 # unused | |
# letter to index mapping | |
for idx, char in enumerate(self.glyphs): | |
self.char2idx[char] = idx + 7 # +7 token initially | |
# index to letter mapping | |
for char, idx in self.char2idx.items(): | |
self.idx2char[idx] = char | |
def size(self): | |
return len(self.char2idx) | |
def word2xlitvec(self, word): | |
"""Converts given string of gyphs(word) to vector(numpy) | |
Also adds tokens for start and end | |
""" | |
try: | |
vec = [self.char2idx["$"]] # start token | |
for i in list(word): | |
vec.append(self.char2idx[i]) | |
vec.append(self.char2idx["#"]) # end token | |
vec = np.asarray(vec, dtype=np.int64) | |
return vec | |
except Exception as error: | |
print("XlitError: In word:", word, "Error Char not in Token:", error) | |
sys.exit() | |
def xlitvec2word(self, vector): | |
"""Converts vector(numpy) to string of glyphs(word)""" | |
char_list = [] | |
for i in vector: | |
char_list.append(self.idx2char[i]) | |
word = "".join(char_list).replace("$", "").replace("#", "") # remove tokens | |
word = word.replace("_", "").replace("*", "") # remove tokens | |
return word | |
class VocabSanitizer: | |
def __init__(self, data_file): | |
""" | |
data_file: path to file conatining vocabulary list | |
""" | |
extension = os.path.splitext(data_file)[-1] | |
if extension == ".json": | |
self.vocab_set = set(json.load(open(data_file, encoding="utf-8"))) | |
elif extension == ".csv": | |
self.vocab_df = pd.read_csv(data_file).set_index("WORD") | |
self.vocab_set = set(self.vocab_df.index) | |
else: | |
print("XlitError: Only Json/CSV file extension supported") | |
def reposition(self, word_list): | |
"""Reorder Words in list""" | |
new_list = [] | |
temp_ = word_list.copy() | |
for v in word_list: | |
if v in self.vocab_set: | |
new_list.append(v) | |
temp_.remove(v) | |
new_list.extend(temp_) | |
return new_list | |
##=============== INSTANTIATION ================================================ | |
class XlitPiston: | |
""" | |
For handling prediction & post-processing of transliteration for a single language | |
Class dependency: Seq2Seq, GlyphStrawboss, VocabSanitizer | |
Global Variables: F_DIR | |
""" | |
def __init__( | |
self, | |
weight_path, | |
vocab_file, | |
tglyph_cfg_file, | |
iglyph_cfg_file="en", | |
device="cpu", | |
): | |
self.device = device | |
self.in_glyph_obj = GlyphStrawboss(iglyph_cfg_file) | |
self.tgt_glyph_obj = GlyphStrawboss(glyphs=tglyph_cfg_file) | |
self.voc_sanity = VocabSanitizer(vocab_file) | |
self._numsym_set = set( | |
json.load(open(tglyph_cfg_file, encoding="utf-8"))["numsym_map"].keys() | |
) | |
self._inchar_set = set("abcdefghijklmnopqrstuvwxyz") | |
self._natscr_set = set().union( | |
self.tgt_glyph_obj.glyphs, sum(self.tgt_glyph_obj.numsym_map.values(), []) | |
) | |
## Model Config Static TODO: add defining in json support | |
input_dim = self.in_glyph_obj.size() | |
output_dim = self.tgt_glyph_obj.size() | |
enc_emb_dim = 300 | |
dec_emb_dim = 300 | |
enc_hidden_dim = 512 | |
dec_hidden_dim = 512 | |
rnn_type = "lstm" | |
enc2dec_hid = True | |
attention = True | |
enc_layers = 1 | |
dec_layers = 2 | |
m_dropout = 0 | |
enc_bidirect = True | |
enc_outstate_dim = enc_hidden_dim * (2 if enc_bidirect else 1) | |
enc = Encoder( | |
input_dim=input_dim, | |
embed_dim=enc_emb_dim, | |
hidden_dim=enc_hidden_dim, | |
rnn_type=rnn_type, | |
layers=enc_layers, | |
dropout=m_dropout, | |
device=self.device, | |
bidirectional=enc_bidirect, | |
) | |
dec = Decoder( | |
output_dim=output_dim, | |
embed_dim=dec_emb_dim, | |
hidden_dim=dec_hidden_dim, | |
rnn_type=rnn_type, | |
layers=dec_layers, | |
dropout=m_dropout, | |
use_attention=attention, | |
enc_outstate_dim=enc_outstate_dim, | |
device=self.device, | |
) | |
self.model = Seq2Seq(enc, dec, pass_enc2dec_hid=enc2dec_hid, device=self.device) | |
self.model = self.model.to(self.device) | |
weights = torch.load(weight_path, map_location=torch.device(self.device)) | |
self.model.load_state_dict(weights) | |
self.model.eval() | |
def character_model(self, word, beam_width=1): | |
in_vec = torch.from_numpy(self.in_glyph_obj.word2xlitvec(word)).to(self.device) | |
## change to active or passive beam | |
p_out_list = self.model.active_beam_inference(in_vec, beam_width=beam_width) | |
p_result = [ | |
self.tgt_glyph_obj.xlitvec2word(out.cpu().numpy()) for out in p_out_list | |
] | |
result = self.voc_sanity.reposition(p_result) | |
# List type | |
return result | |
def numsym_model(self, seg): | |
"""tgt_glyph_obj.numsym_map[x] returns a list object""" | |
if len(seg) == 1: | |
return [seg] + self.tgt_glyph_obj.numsym_map[seg] | |
a = [self.tgt_glyph_obj.numsym_map[n][0] for n in seg] | |
return [seg] + ["".join(a)] | |
def _word_segementer(self, sequence): | |
sequence = sequence.lower() | |
accepted = set().union(self._numsym_set, self._inchar_set, self._natscr_set) | |
# sequence = ''.join([i for i in sequence if i in accepted]) | |
segment = [] | |
idx = 0 | |
seq_ = list(sequence) | |
while len(seq_): | |
# for Number-Symbol | |
temp = "" | |
while len(seq_) and seq_[0] in self._numsym_set: | |
temp += seq_[0] | |
seq_.pop(0) | |
if temp != "": | |
segment.append(temp) | |
# for Target Chars | |
temp = "" | |
while len(seq_) and seq_[0] in self._natscr_set: | |
temp += seq_[0] | |
seq_.pop(0) | |
if temp != "": | |
segment.append(temp) | |
# for Input-Roman Chars | |
temp = "" | |
while len(seq_) and seq_[0] in self._inchar_set: | |
temp += seq_[0] | |
seq_.pop(0) | |
if temp != "": | |
segment.append(temp) | |
temp = "" | |
while len(seq_) and seq_[0] not in accepted: | |
temp += seq_[0] | |
seq_.pop(0) | |
if temp != "": | |
segment.append(temp) | |
return segment | |
def inferencer(self, sequence, beam_width=10): | |
seg = self._word_segementer(sequence[:120]) | |
lit_seg = [] | |
p = 0 | |
while p < len(seg): | |
if seg[p][0] in self._natscr_set: | |
lit_seg.append([seg[p]]) | |
p += 1 | |
elif seg[p][0] in self._inchar_set: | |
lit_seg.append(self.character_model(seg[p], beam_width=beam_width)) | |
p += 1 | |
elif seg[p][0] in self._numsym_set: # num & punc | |
lit_seg.append(self.numsym_model(seg[p])) | |
p += 1 | |
else: | |
lit_seg.append([seg[p]]) | |
p += 1 | |
## IF segment less/equal to 2 then return combinotorial, | |
## ELSE only return top1 of each result concatenated | |
if len(lit_seg) == 1: | |
final_result = lit_seg[0] | |
elif len(lit_seg) == 2: | |
final_result = [""] | |
for seg in lit_seg: | |
new_result = [] | |
for s in seg: | |
for f in final_result: | |
new_result.append(f + s) | |
final_result = new_result | |
else: | |
new_result = [] | |
for seg in lit_seg: | |
new_result.append(seg[0]) | |
final_result = ["".join(new_result)] | |
return final_result | |
from collections.abc import Iterable | |
from pydload import dload | |
import zipfile | |
MODEL_DOWNLOAD_URL_PREFIX = "https://github.com/AI4Bharat/IndianNLP-Transliteration/releases/download/xlit_v0.5.0/" | |
def is_folder_writable(folder): | |
try: | |
os.makedirs(folder, exist_ok=True) | |
tmp_file = os.path.join(folder, ".write_test") | |
with open(tmp_file, "w") as f: | |
f.write("Permission Check") | |
os.remove(tmp_file) | |
return True | |
except: | |
return False | |
def is_directory_writable(path): | |
if os.name == "nt": | |
return is_folder_writable(path) | |
return os.access(path, os.W_OK | os.X_OK) | |
class XlitEngine: | |
""" | |
For Managing the top level tasks and applications of transliteration | |
Global Variables: F_DIR | |
""" | |
def __init__( | |
self, lang2use="all", config_path="translit_models/default_lineup.json" | |
): | |
lineup = json.load(open(os.path.join(F_DIR, config_path), encoding="utf-8")) | |
self.lang_config = {} | |
if isinstance(lang2use, str): | |
if lang2use == "all": | |
self.lang_config = lineup | |
elif lang2use in lineup: | |
self.lang_config[lang2use] = lineup[lang2use] | |
else: | |
raise Exception( | |
"XlitError: The entered Langauge code not found. Available are {}".format( | |
lineup.keys() | |
) | |
) | |
elif isinstance(lang2use, Iterable): | |
for l in lang2use: | |
try: | |
self.lang_config[l] = lineup[l] | |
except: | |
print( | |
"XlitError: Language code {} not found, Skipping...".format(l) | |
) | |
else: | |
raise Exception( | |
"XlitError: lang2use must be a list of language codes (or) string of single language code" | |
) | |
if is_directory_writable(F_DIR): | |
models_path = os.path.join(F_DIR, "translit_models") | |
else: | |
user_home = os.path.expanduser("~") | |
models_path = os.path.join(user_home, ".AI4Bharat_Xlit_Models") | |
os.makedirs(models_path, exist_ok=True) | |
self.download_models(models_path) | |
self.langs = {} | |
self.lang_model = {} | |
for la in self.lang_config: | |
try: | |
print("Loading {}...".format(la)) | |
self.lang_model[la] = XlitPiston( | |
weight_path=os.path.join( | |
models_path, self.lang_config[la]["weight"] | |
), | |
vocab_file=os.path.join(models_path, self.lang_config[la]["vocab"]), | |
tglyph_cfg_file=os.path.join( | |
models_path, self.lang_config[la]["script"] | |
), | |
iglyph_cfg_file="en", | |
) | |
self.langs[la] = self.lang_config[la]["name"] | |
except Exception as error: | |
print("XlitError: Failure in loading {} \n".format(la), error) | |
print(XlitError.loading_err.value) | |
def download_models(self, models_path): | |
""" | |
Download models from GitHub Releases if not exists | |
""" | |
for l in self.lang_config: | |
lang_name = self.lang_config[l]["eng_name"] | |
lang_model_path = os.path.join(models_path, lang_name) | |
if not os.path.isdir(lang_model_path): | |
print("Downloading model for language: %s" % lang_name) | |
remote_url = MODEL_DOWNLOAD_URL_PREFIX + lang_name + ".zip" | |
downloaded_zip_path = os.path.join(models_path, lang_name + ".zip") | |
dload(url=remote_url, save_to_path=downloaded_zip_path, max_time=None) | |
if not os.path.isfile(downloaded_zip_path): | |
exit( | |
f"ERROR: Unable to download model from {remote_url} into {models_path}" | |
) | |
with zipfile.ZipFile(downloaded_zip_path, "r") as zip_ref: | |
zip_ref.extractall(models_path) | |
if os.path.isdir(lang_model_path): | |
os.remove(downloaded_zip_path) | |
else: | |
exit( | |
f"ERROR: Unable to find models in {lang_model_path} after download" | |
) | |
return | |
def translit_word(self, eng_word, lang_code="default", topk=7, beam_width=10): | |
if eng_word == "": | |
return [] | |
if lang_code in self.langs: | |
try: | |
res_list = self.lang_model[lang_code].inferencer( | |
eng_word, beam_width=beam_width | |
) | |
return res_list[:topk] | |
except Exception as error: | |
print("XlitError:", traceback.format_exc()) | |
print(XlitError.internal_err.value) | |
return XlitError.internal_err | |
elif lang_code == "default": | |
try: | |
res_dict = {} | |
for la in self.lang_model: | |
res = self.lang_model[la].inferencer( | |
eng_word, beam_width=beam_width | |
) | |
res_dict[la] = res[:topk] | |
return res_dict | |
except Exception as error: | |
print("XlitError:", traceback.format_exc()) | |
print(XlitError.internal_err.value) | |
return XlitError.internal_err | |
else: | |
print("XlitError: Unknown Langauge requested", lang_code) | |
print(XlitError.lang_err.value) | |
return XlitError.lang_err | |
def translit_sentence(self, eng_sentence, lang_code="default", beam_width=10): | |
if eng_sentence == "": | |
return [] | |
if lang_code in self.langs: | |
try: | |
out_str = "" | |
for word in eng_sentence.split(): | |
res_ = self.lang_model[lang_code].inferencer( | |
word, beam_width=beam_width | |
) | |
out_str = out_str + res_[0] + " " | |
return out_str[:-1] | |
except Exception as error: | |
print("XlitError:", traceback.format_exc()) | |
print(XlitError.internal_err.value) | |
return XlitError.internal_err | |
elif lang_code == "default": | |
try: | |
res_dict = {} | |
for la in self.lang_model: | |
out_str = "" | |
for word in eng_sentence.split(): | |
res_ = self.lang_model[la].inferencer( | |
word, beam_width=beam_width | |
) | |
out_str = out_str + res_[0] + " " | |
res_dict[la] = out_str[:-1] | |
return res_dict | |
except Exception as error: | |
print("XlitError:", traceback.format_exc()) | |
print(XlitError.internal_err.value) | |
return XlitError.internal_err | |
else: | |
print("XlitError: Unknown Langauge requested", lang_code) | |
print(XlitError.lang_err.value) | |
return XlitError.lang_err | |
if __name__ == "__main__": | |
available_lang = [ | |
"bn", | |
"gu", | |
"hi", | |
"kn", | |
"gom", | |
"mai", | |
"ml", | |
"mr", | |
"pa", | |
"sd", | |
"si", | |
"ta", | |
"te", | |
"ur", | |
] | |
reg = re.compile(r"[a-zA-Z]") | |
lang = "hi" | |
engine = XlitEngine( | |
lang | |
) # if you don't specify lang code here, this will give results in all langs available | |
sent = "Hello World! ABCD क्या हाल है आपका?" | |
words = [ | |
engine.translit_word(word, topk=1)[lang][0] if reg.match(word) else word | |
for word in sent.split() | |
] # only transliterated en words, leaves rest as it is | |
updated_sent = " ".join(words) | |
print(updated_sent) | |
# output : हेलो वर्ल्ड! क्या हाल है आपका? | |
# y = engine.translit_sentence("Hello World !")['hi'] | |
# print(y) | |