Spaces:
Sleeping
Sleeping
| #TODO: Quran results have numbers | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| import gradio as gr | |
| import torah | |
| import bible | |
| import quran | |
| import hindu | |
| import tripitaka | |
| from utils import number_to_ordinal_word, custom_normalize, date_to_words, translate_date_to_words | |
| from gematria import calculate_gematria, strip_diacritics | |
| import pandas as pd | |
| from deep_translator import GoogleTranslator | |
| from gradio_calendar import Calendar | |
| from datetime import datetime, timedelta | |
| import math | |
| import json | |
| import re | |
| import sqlite3 | |
| from collections import defaultdict | |
| from typing import List, Tuple | |
| import rich | |
| from fuzzywuzzy import fuzz | |
| import calendar | |
| import translation_utils | |
| import hashlib | |
| import time | |
| translation_utils.create_translation_table() | |
| # Create a translator instance *once* globally | |
| translator = GoogleTranslator(source='auto', target='auto') | |
| LANGUAGES_SUPPORTED = translator.get_supported_languages(as_dict=True) # Corrected dictionary name | |
| LANGUAGE_CODE_MAP = LANGUAGES_SUPPORTED # Use deep_translator's mapping directly | |
| # --- Constants --- | |
| DATABASE_FILE = 'gematria.db' | |
| MAX_PHRASE_LENGTH_LIMIT = 20 | |
| ELS_CACHE_DB = "els_cache.db" | |
| DATABASE_TIMEOUT = 60 | |
| # --- Database Initialization --- | |
| def initialize_database(): | |
| global conn | |
| conn = sqlite3.connect(DATABASE_FILE) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS results ( | |
| gematria_sum INTEGER, | |
| words TEXT, | |
| translation TEXT, | |
| book TEXT, | |
| chapter INTEGER, | |
| verse INTEGER, | |
| phrase_length INTEGER, | |
| word_position TEXT, | |
| PRIMARY KEY (gematria_sum, words, book, chapter, verse, word_position) | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE INDEX IF NOT EXISTS idx_results_gematria | |
| ON results (gematria_sum) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS processed_books ( | |
| book TEXT PRIMARY KEY, | |
| max_phrase_length INTEGER | |
| ) | |
| ''') | |
| conn.commit() | |
| # --- Initialize Database --- | |
| initialize_database() | |
| # --- ELS Cache Functions --- | |
| def create_els_cache_table(): | |
| with sqlite3.connect(ELS_CACHE_DB) as conn: | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS els_cache ( | |
| query_hash TEXT PRIMARY KEY, | |
| results TEXT | |
| ) | |
| ''') | |
| def get_query_hash(func, *args, **kwargs): | |
| key = (func.__name__, args, tuple(sorted(kwargs.items()))) | |
| return hashlib.sha256(json.dumps(key).encode()).hexdigest() | |
| def cached_process_json_files(func, *args, **kwargs): | |
| query_hash = get_query_hash(func, *args, **kwargs) | |
| try: | |
| with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT results FROM els_cache WHERE query_hash = ?", (query_hash,)) | |
| result = cursor.fetchone() | |
| if result: | |
| logger.info(f"Cache hit for query: {query_hash}") | |
| return json.loads(result[0]) | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error checking cache: {e}") | |
| logger.info(f"Cache miss for query: {query_hash}") | |
| results = func(*args, **kwargs) | |
| try: | |
| with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("INSERT INTO els_cache (query_hash, results) VALUES (?, ?)", (query_hash, json.dumps(results))) | |
| conn.commit() | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error caching results: {e}") | |
| return results | |
| # --- Helper Functions (from Network app.py) --- | |
| def flatten_text(text: List) -> str: | |
| if isinstance(text, list): | |
| return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) | |
| return text | |
| def search_gematria_in_db(gematria_sum: int, max_words: int) -> List[Tuple[str, str, int, int, int, str]]: | |
| global conn | |
| with sqlite3.connect(DATABASE_FILE) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT words, book, chapter, verse, phrase_length, word_position | |
| FROM results | |
| WHERE gematria_sum = ? AND phrase_length <= ? | |
| ''', (gematria_sum, max_words)) | |
| results = cursor.fetchall() | |
| return results | |
| def get_most_frequent_phrase(results): | |
| phrase_counts = defaultdict(int) | |
| for words, book, chapter, verse, phrase_length, word_position in results: | |
| phrase_counts[words] += 1 | |
| most_frequent_phrase = max(phrase_counts, key=phrase_counts.get) if phrase_counts else None # Handle empty results | |
| return most_frequent_phrase | |
| # --- Functions from BOS app.py --- | |
| def create_language_dropdown(label, default_value='English', show_label=True): # Default value must be in LANGUAGE_CODE_MAP | |
| return gr.Dropdown( | |
| choices=list(LANGUAGE_CODE_MAP.keys()), # Correct choices | |
| label=label, | |
| value=default_value, | |
| show_label=show_label | |
| ) | |
| def calculate_gematria_sum(text, date_words): | |
| if text or date_words: | |
| combined_input = f"{text} {date_words}" | |
| logger.info(f"searching for input: {combined_input}") | |
| numbers = re.findall(r'\d+', combined_input) | |
| text_without_numbers = re.sub(r'\d+', '', combined_input) | |
| number_sum = sum(int(number) for number in numbers) | |
| text_gematria = calculate_gematria(strip_diacritics(text_without_numbers)) | |
| total_sum = text_gematria + number_sum | |
| return total_sum | |
| else: | |
| return None | |
| def add_24h_projection(results_dict, date_str): # Add date_str as parameter | |
| combined_results = [] | |
| for book_name, results in results_dict.items(): | |
| combined_results.extend(results) | |
| num_results = len(combined_results) | |
| if num_results > 0: | |
| time_interval = timedelta(minutes=24 * 60 / num_results) | |
| current_datetime = datetime.combine(datetime.today(), datetime.min.time()) | |
| for i in range(num_results): | |
| next_datetime = current_datetime + time_interval | |
| time_range_str = f"{current_datetime.strftime('%H:%M')}-{next_datetime.strftime('%H:%M')}" | |
| combined_results[i]['24h Projection'] = time_range_str | |
| current_datetime = next_datetime | |
| # Re-organize results back into their book dictionaries | |
| reorganized_results = defaultdict(list) | |
| for result in combined_results: | |
| book_name = result.get('book', 'Unknown') #Get book name to reorganize | |
| reorganized_results[book_name].append(result) | |
| return reorganized_results | |
| def sort_results(results): | |
| def parse_time(time_str): | |
| try: | |
| hours, minutes = map(int, time_str.split(':')) | |
| return hours * 60 + minutes # Convert to total minutes | |
| except ValueError: | |
| return 24 * 60 # Sort invalid times to the end | |
| return sorted(results, key=lambda x: ( | |
| parse_time(x.get('24h Projection', '23:59').split('-')[0]), # Sort by start time first | |
| parse_time(x.get('24h Projection', '23:59').split('-')[1]) # Then by end time | |
| )) | |
| # --- Main Gradio App --- | |
| with gr.Blocks() as app: | |
| with gr.Column(): | |
| with gr.Row(): | |
| tlang = create_language_dropdown("Target Language for Result Translation", default_value='english') | |
| start_date_range = Calendar(type="datetime", label="Start Date for ELS") | |
| end_date_range = Calendar(type="datetime", label="End Date for ELS") | |
| use_day = gr.Checkbox(label="Use Day", info="Check to include day in search", value=True) | |
| use_month = gr.Checkbox(label="Use Month", info="Check to include month in search", value=True) | |
| use_year = gr.Checkbox(label="Use Year", info="Check to include year in search", value=True) | |
| date_language_input = create_language_dropdown("Language of the person/topic (optional) (Date Word Language)", default_value='english') | |
| with gr.Row(): | |
| gematria_text = gr.Textbox(label="Name and/or Topic (required)", value="Hans Albert Einstein Mileva Marity-Einstein") | |
| with gr.Row(): | |
| with gr.Column(): | |
| round_x = gr.Number(label="Round (1)", value=1) | |
| round_y = gr.Number(label="Round (2)", value=-1) | |
| rounds_combination = gr.Textbox(label="Combined Rounds", value="1,-1") | |
| with gr.Row(): | |
| include_torah_chk = gr.Checkbox(label="Include Torah", value=True) | |
| include_bible_chk = gr.Checkbox(label="Include Bible", value=True) | |
| include_quran_chk = gr.Checkbox(label="Include Quran", value=True) | |
| include_hindu_chk = gr.Checkbox(label="Include Rigveda", value=True) | |
| include_tripitaka_chk = gr.Checkbox(label="Include Tripitaka", value=True) | |
| merge_results_chk = gr.Checkbox(label="Merge Results (Torah-Bible-Quran)", value=True) | |
| strip_spaces = gr.Checkbox(label="Strip Spaces from Books", value=True) | |
| strip_in_braces = gr.Checkbox(label="Strip Text in Braces from Books", value=True) | |
| strip_diacritics_chk = gr.Checkbox(label="Strip Diacritics from Books", value=True) | |
| translate_btn = gr.Button("Search with ELS") | |
| # --- Output Components --- | |
| markdown_output = gr.Dataframe(label="ELS Results") | |
| most_frequent_phrase_output = gr.Textbox(label="Most Frequent Phrase in Network Search") | |
| json_output = gr.JSON(label="JSON Output") | |
| # --- Event Handlers --- | |
| def update_rounds_combination(round_x, round_y): | |
| return f"{int(round_x)},{int(round_y)}" | |
| def find_closest_phrase(target_phrase, phrases): | |
| best_match = None | |
| best_score = 0 | |
| logging.debug(f"Target phrase for similarity search: {target_phrase}") # Log target phrase | |
| for phrase, _, _, _, _, _ in phrases: | |
| word_length_diff = abs(len(target_phrase.split()) - len(phrase.split())) | |
| similarity_score = fuzz.ratio(target_phrase, phrase) | |
| combined_score = similarity_score - word_length_diff | |
| logging.debug(f"Comparing with phrase: {phrase}") # Log each phrase being compared | |
| logging.debug( | |
| f"Word Length Difference: {word_length_diff}, Similarity Score: {similarity_score}, Combined Score: {combined_score}") # Log scores | |
| if combined_score > best_score: | |
| best_score = combined_score | |
| best_match = phrase | |
| logging.debug(f"Closest phrase found: {best_match} with score: {best_score}") # Log the best match | |
| return best_match | |
| def perform_search(rounds_combination, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk, include_torah, include_bible, include_quran, include_hindu, include_tripitaka, gematria_text, start_date, end_date, date_language_input): | |
| overall_start_time = time.time() | |
| combined_and_sorted_results = [] | |
| most_frequent_phrases = {} | |
| current_date = start_date | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| date_words = translate_date_to_words(current_date, date_language_input) | |
| step = calculate_gematria_sum(gematria_text, date_words) | |
| logger.debug(f"Calculated step for {date_str}: {step}") | |
| if step != 0 and rounds_combination != "0,0": | |
| # Process for the current date | |
| els_results_single_date = {} | |
| if include_torah: | |
| els_results_single_date["Torah"] = cached_process_json_files(torah.process_json_files, 1, 39, step, | |
| rounds_combination, 0, tlang, strip_spaces, | |
| strip_in_braces, strip_diacritics_chk) | |
| if include_bible: | |
| els_results_single_date["Bible"] = cached_process_json_files(bible.process_json_files, 40, 66, step, | |
| rounds_combination, 0, tlang, strip_spaces, | |
| strip_in_braces, strip_diacritics_chk) | |
| if include_quran: | |
| els_results_single_date["Quran"] = cached_process_json_files(quran.process_json_files, 1, 114, step, | |
| rounds_combination, 0, tlang, strip_spaces, | |
| strip_in_braces, strip_diacritics_chk) | |
| if include_hindu: | |
| els_results_single_date["Rig Veda"] = cached_process_json_files(hindu.process_json_files, 1, 10, step, | |
| rounds_combination, 0, tlang, False, | |
| strip_in_braces, strip_diacritics_chk) | |
| if include_tripitaka: | |
| els_results_single_date["Tripitaka"] = cached_process_json_files(tripitaka.process_json_files, 1, 52, | |
| step, rounds_combination, 0, tlang, | |
| strip_spaces, strip_in_braces, | |
| strip_diacritics_chk) | |
| # Add 24h projection *before* iterating through books | |
| els_results_single_date = add_24h_projection(els_results_single_date, date_str) | |
| for book_name, book_results in els_results_single_date.items(): | |
| logger.debug(f"Processing results for book: {book_name}") | |
| if book_results: | |
| most_frequent_phrases[book_name] = "" | |
| for result in book_results: | |
| try: | |
| gematria_sum = calculate_gematria(result['result_text']) | |
| max_words = len(result['result_text'].split()) | |
| matching_phrases = search_gematria_in_db(gematria_sum, max_words) | |
| max_words_limit = 20 | |
| while not matching_phrases and max_words < max_words_limit: | |
| max_words += 1 | |
| matching_phrases = search_gematria_in_db(gematria_sum, max_words) | |
| if matching_phrases: | |
| most_frequent_phrase = get_most_frequent_phrase(matching_phrases) | |
| most_frequent_phrases[book_name] = most_frequent_phrase | |
| else: | |
| closest_phrase = find_closest_phrase(result['result_text'], | |
| search_gematria_in_db(gematria_sum, | |
| max_words_limit)) | |
| most_frequent_phrases[book_name] = closest_phrase or "" | |
| result['Most Frequent Phrase'] = most_frequent_phrases[book_name] | |
| result['date'] = date_str | |
| if 'book' in result: | |
| if isinstance(result['book'], int): | |
| result['book'] = f"{book_name} {result['book']}." | |
| except KeyError as e: | |
| print(f"DEBUG: KeyError - Key '{e.args[0]}' not found in result. Skipping this result.") | |
| continue | |
| combined_and_sorted_results.extend(book_results) | |
| current_date += timedelta(days=1) | |
| # --- Batch Translation --- | |
| translation_start_time = time.time() | |
| selected_language_long = tlang | |
| tlang_short = LANGUAGES_SUPPORTED.get(selected_language_long) | |
| if tlang_short is None: | |
| tlang_short = "en" | |
| logger.warning( | |
| f"Unsupported language selected: {selected_language_long}. Defaulting to English (en).") | |
| phrases_to_translate = [] | |
| phrases_source_langs = [] | |
| results_to_translate = [] | |
| results_source_langs = [] | |
| for result in combined_and_sorted_results: | |
| phrases_to_translate.append(result.get('Most Frequent Phrase', '')) | |
| phrases_source_langs.append(result.get("source_language", "auto")) | |
| results_to_translate.append(result.get('result_text', '')) | |
| results_source_langs.append(result.get("source_language", "auto")) | |
| translated_phrases = translation_utils.batch_translate(phrases_to_translate, tlang_short, phrases_source_langs) | |
| translated_result_texts = translation_utils.batch_translate(results_to_translate, tlang_short, results_source_langs) | |
| for i, result in enumerate(combined_and_sorted_results): | |
| result['translated_text'] = translated_result_texts.get(results_to_translate[i], result.get('result_text', '')) | |
| result['Translated Most Frequent Phrase'] = translated_phrases.get(phrases_to_translate[i], | |
| result.get('Most Frequent Phrase', '')) | |
| translation_end_time = time.time() | |
| logger.debug(f"Batch translation took: {translation_end_time - translation_start_time} seconds") | |
| # --- Time projections --- | |
| time_projections_start_time = time.time() | |
| for result in combined_and_sorted_results: | |
| selected_date = datetime.strptime(result['date'], '%Y-%m-%d') | |
| book_name = result.get('book', 'Unknown') | |
| projection_input = {book_name: [result]} | |
| updated_date_results = add_24h_projection(projection_input, result['date']) | |
| result.update(updated_date_results[book_name][0]) | |
| combined_and_sorted_results = sort_results(combined_and_sorted_results) | |
| time_projections_end_time = time.time() | |
| logger.debug( | |
| f"Time projections took: {time_projections_end_time - time_projections_start_time} seconds") | |
| # --- Dataframe and JSON creation --- | |
| dataframe_json_start_time = time.time() | |
| df = pd.DataFrame(combined_and_sorted_results) | |
| df.index = range(1, len(df) + 1) | |
| df.reset_index(inplace=True) | |
| df.rename(columns={'index': 'Result Number'}, inplace=True) | |
| search_config = { | |
| "rounds_combination": rounds_combination, # No more 'step' | |
| "target_language": tlang, | |
| "strip_spaces": strip_spaces, | |
| "strip_in_braces": strip_in_braces, | |
| "strip_diacritics": strip_diacritics_chk, | |
| "include_torah": include_torah, | |
| "include_bible": include_bible, | |
| "include_quran": include_quran, | |
| "include_hindu": include_hindu, | |
| "include_tripitaka": include_tripitaka, | |
| "gematria_text": gematria_text, | |
| "start_date": start_date.strftime("%Y-%m-%d"), | |
| "end_date": end_date.strftime("%Y-%m-%d") | |
| } | |
| output_data = { | |
| "search_configuration": search_config, | |
| "results": combined_and_sorted_results | |
| } | |
| json_data = output_data | |
| combined_most_frequent = "\n".join( | |
| f"{book}: {phrase}" for book, phrase in most_frequent_phrases.items() if phrase) | |
| dataframe_json_end_time = time.time() | |
| logger.debug( | |
| f"Dataframe and JSON creation took: {dataframe_json_end_time - dataframe_json_start_time} seconds") | |
| overall_end_time = time.time() | |
| logger.debug(f"Overall process took: {overall_end_time - overall_start_time} seconds") | |
| return df, combined_most_frequent, json_data | |
| # --- Event Triggers --- | |
| round_x.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) | |
| round_y.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) | |
| def update_rounds_combination(round_x, round_y): | |
| return f"{int(round_x)},{int(round_y)}" | |
| round_x.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) | |
| round_y.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) | |
| translate_btn.click( | |
| perform_search, | |
| inputs=[rounds_combination, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk, include_torah_chk, include_bible_chk, include_quran_chk, include_hindu_chk, include_tripitaka_chk, gematria_text, start_date_range, end_date_range, date_language_input], | |
| outputs=[markdown_output, most_frequent_phrase_output, json_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(share=False) | |