|
import functions as funky |
|
import pandas as pd |
|
import gradio as gr |
|
import os |
|
from datasets import load_dataset |
|
from huggingface_hub import login |
|
import numpy as np |
|
from fastapi import FastAPI, Request |
|
import uvicorn |
|
from starlette.middleware.sessions import SessionMiddleware |
|
import fastapi |
|
from datetime import datetime |
|
import re |
|
|
|
login(token = os.environ['HUB_TOKEN']) |
|
|
|
|
|
logger = gr.HuggingFaceDatasetSaver(os.environ['HUB_TOKEN'], dataset_name='illustration_gdrive_logging_main', organization=None, private=True) |
|
logger.setup([gr.Text(label="clicked_url"), gr.Text(label="seach_term"), gr.Text(label = 'sessionhash'), gr.Text(label = 'datetime')], './flagged_data_points') |
|
|
|
logging_js = ''' |
|
function magicFunc(x){ |
|
let script = document.createElement('script'); |
|
script.innerHTML = "async function magicFunc(x){let z = document.getElementById('search_term').getElementsByTagName('textarea')[0].value; await fetch('/track?url=' + x + '&q=' + z)}"; |
|
document.head.appendChild(script); |
|
} |
|
''' |
|
|
|
dataset = load_dataset("bradley6597/illustration-test") |
|
df = pd.DataFrame(dataset['train']).drop_duplicates() |
|
|
|
ill_links = df.copy() |
|
ill_links = ill_links[ill_links['Description'] != 'Moved'].copy() |
|
ill_links['code'] = ill_links['link'].str.replace("https://drive.google.com/file/d/", "", regex = False) |
|
ill_links['code'] = ill_links['code'].str.replace("/view?usp=drivesdk", "", regex = False) |
|
ill_links['filename'] = ill_links['file'].str.replace(".*\\/", "", regex = True) |
|
|
|
ill_links['image_code'] = 'https://lh3.google.com/u/0/d/' + ill_links['code'] + '=w320-h304' |
|
ill_links['image_code'] = '<center><a href="' + ill_links['link'] + '" target="_blank" onclick="magicFunc(\'' + ill_links['code'] + '\')"><img src="' + ill_links['image_code'] + '" style="max-height:400px; max-width:200px"> ' + ill_links['filename'] + '</a></center>' |
|
ill_links['shared_drive'] = ill_links['file'].str.replace("/content/drive/Shareddrives/", "", regex = False) |
|
ill_links['shared_drive'] = ill_links['shared_drive'].str.replace("(.*?)\\/.*", "\\1", regex = True) |
|
ill_links['Description'] = ill_links['Description'].str.replace("No Description", "", regex = False) |
|
|
|
ill_links_title = ill_links.copy() |
|
|
|
ill_links['ID'] = ill_links.index |
|
ill_links_title['ID'] = ill_links_title.index |
|
ill_links['title'] = ill_links['filename'] |
|
ill_links_title['title'] = ill_links_title['filename'] |
|
ill_links['url'] = ill_links['image_code'] |
|
ill_links_title['url'] = ill_links_title['image_code'] |
|
ill_links['abstract'] = ill_links['filename'].str.replace("\\-|\\_", " ", regex = True) + ' ' + ill_links['Description'].str.replace(",", " ", regex = False).astype(str) |
|
ill_links_title['abstract'] = ill_links_title['filename'].str.replace('\\-|\\_', " ", regex = True) |
|
ill_links['filepath'] = ill_links['file'] |
|
ill_links_title['filepath'] = ill_links_title['file'] |
|
ill_links['post_filepath'] = ill_links['filepath'].str.replace(".*?\\/KS1 EYFS\\/", "", regex = True) |
|
ill_links_title['post_filepath'] = ill_links_title['filepath'].str.replace(".*?\\/KS1 EYFS\\/", "", regex = True) |
|
ill_links = ill_links[['ID', 'title', 'url', 'abstract', 'filepath', 'Date Created', 'post_filepath']] |
|
ill_links_title = ill_links_title[['ID', 'title', 'url', 'abstract', 'filepath', 'Date Created', 'Description', 'post_filepath']] |
|
|
|
ill_check_lst = [] |
|
for i in range(0, 5): |
|
tmp_links = ill_links['url'].iloc[0].replace("/u/0/", f"/u/{i}/") |
|
tmp_links = tmp_links.replace('max-width:200px', 'max-width:25%') |
|
tmp_links = re.sub("(.*)>.*?<\\/a>", "\\1></a>", tmp_links) |
|
tmp_links = tmp_links.replace("<center>", "") |
|
tmp_links = tmp_links.replace("</center>", "") |
|
tmp_links = f'<p>{i}</p>' + tmp_links |
|
ill_check_lst.append(tmp_links) |
|
ill_check_df = pd.DataFrame(ill_check_lst).T |
|
ill_check_html = ill_check_df.to_html(escape = False, render_links = True, index = False, header = False) |
|
|
|
ind_main, doc_main, tf_main = funky.index_documents(ill_links) |
|
ind_title, doc_title, tf_title = funky.index_documents(ill_links_title) |
|
|
|
|
|
def same_auth(username, password): |
|
return(username == os.environ['username']) & (password == os.environ['password']) |
|
|
|
|
|
def search_index(search_text, sd, ks, sort_by, max_results, user_num, search_title): |
|
if search_title: |
|
output = funky.search(tf_title, doc_title, ind_title, search_text, search_type = 'AND', ranking = True) |
|
else: |
|
output = funky.search(tf_main, doc_main, ind_main, search_text, search_type='AND', ranking = True) |
|
output = [x for o in output for x in o if type(x) is not float] |
|
output_df = pd.DataFrame(output).reset_index(drop = True) |
|
|
|
if output_df.shape[0] > 0: |
|
|
|
output_df['url'] = output_df['url'].str.replace("/u/0/", f"/u/{int(user_num)}/", regex = False) |
|
if len(sd) == 1: |
|
output_df = output_df[(output_df['filepath'].str.contains(str(sd[0]), regex = False))] |
|
if len(ks) > 0: |
|
keystage_filter = '|'.join(ks).lower() |
|
if search_title: |
|
output_df['abstract'] = output_df['abstract'] + ' ' + output_df['Description'] |
|
|
|
output_df['abstract'] = output_df['abstract'].str.lower() |
|
output_df['post_filepath'] = output_df['post_filepath'].str.lower() |
|
output_df['missing_desc'] = np.where(output_df['abstract'].str.contains('eyfs|ks1|ks2', regex = True), 0, 1) |
|
output_df2 = output_df[(output_df['abstract'].str.contains(keystage_filter, regex = True) | (output_df['missing_desc'] == 1))].copy() |
|
output_df2 = output_df2[(output_df2['post_filepath'].str.contains(keystage_filter, regex = True))] |
|
if output_df2.shape[0] == 0: |
|
output_df2 = output_df[(output_df['post_filepath'].str.contains(keystage_filter, regex = True))] |
|
|
|
output_df2['ind'] = output_df2.index |
|
if sort_by == 'Relevance': |
|
output_df2 = output_df2.sort_values(by = ['missing_desc', 'ind'], ascending = [True, True]) |
|
elif sort_by == 'Date Created': |
|
output_df2 = output_df2.sort_values(by = ['Date Created'], ascending = False) |
|
elif sort_by == 'A-Z': |
|
output_df2 = output_df2.sort_values(by = ['title'], ascending = True) |
|
|
|
output_df2 = output_df2.head(int(max_results)) |
|
output_df2 = output_df2[['url']].reset_index(drop = True) |
|
|
|
max_cols = 5 |
|
output_df2['row'] = output_df2.index % max_cols |
|
for x in range(0, max_cols): |
|
tmp = output_df2[output_df2['row'] == x].reset_index(drop = True) |
|
tmp = tmp[['url']] |
|
if x == 0: |
|
final_df = tmp |
|
else: |
|
final_df = pd.concat([final_df, tmp], axis = 1) |
|
|
|
final_df = final_df.fillna('') |
|
else: |
|
final_df = pd.DataFrame(['<h3>No Results Found :(</h3>']) |
|
|
|
if final_df.shape[0] == 0 : |
|
final_df = pd.DataFrame(['<h3>No Results Found :(</h3>']) |
|
|
|
return('<center>' + |
|
final_df.to_html(escape = False, render_links = True, index = False, header = False) + |
|
'</center>') |
|
|
|
|
|
def search_logging(x: str, request: gr.Request): |
|
session_id = getattr(request.cookies, 'access-token') |
|
logger.flag(['', x, session_id, str(datetime.now())]) |
|
|
|
|
|
with gr.Blocks(css="style.css") as app: |
|
with gr.Row(): |
|
with gr.Column(min_width = 10): |
|
with gr.Row(): |
|
gr.HTML("<center><p>If you can't see the images please make sure you are signed in to your Twinkl account on Google & you have access to the Shared Drives you are searching :)</p></center>") |
|
gr.HTML(ill_check_html) |
|
user_num = gr.Number(value = 0, label = 'Put lowest number of the alarm clock you can see') |
|
with gr.Row(): |
|
search_prompt = gr.Textbox(placeholder = 'search for an illustration', label = 'Search', elem_id = 'search_term') |
|
title_search = gr.Checkbox(label = 'Search title only') |
|
|
|
shared_drive = gr.Dropdown(choices = ['Illustrations - 01-10 to 07-22', 'Illustrations - Now'], multiselect = True, label = 'Shared Drive', value = ['Illustrations - 01-10 to 07-22', 'Illustrations - Now']) |
|
key_stage = gr.Dropdown(choices = ['EYFS', 'KS1', 'KS2'], multiselect = True, label = 'Key Stage', value = ['EYFS', 'KS1', 'KS2']) |
|
sort_by = gr.Dropdown(choices = ['Relevance', 'Date Created', 'A-Z'], value = 'Relevance', multiselect = False, label = 'Sort By') |
|
max_return = gr.Dropdown(choices = ['10', '25', '50', '75', '100', '250', '500'], value = '10', multiselect = False, label = 'No. of Results to Return') |
|
with gr.Row(): |
|
search_button = gr.Button(value="Search!") |
|
with gr.Row(): |
|
output_df = gr.HTML() |
|
search_button.click(search_index, inputs=[search_prompt, shared_drive, key_stage, sort_by, max_return, user_num, title_search], outputs=output_df) |
|
search_prompt.submit(search_index, inputs=[search_prompt, shared_drive, key_stage, sort_by, max_return, user_num, title_search], outputs=output_df) |
|
search_button.click(search_logging, inputs=[search_prompt], outputs=None) |
|
search_prompt.submit(search_logging, inputs=[search_prompt], outputs=None) |
|
|
|
app.load(_js = logging_js) |
|
|
|
app.auth = (same_auth) |
|
app.auth_message = '' |
|
|
|
|
|
fapi = FastAPI() |
|
|
|
fapi.add_middleware(SessionMiddleware, secret_key=os.environ['session_key']) |
|
|
|
@fapi.middleware("http") |
|
async def add_session_hash(request: Request, call_next): |
|
response = await call_next(request) |
|
session = request.cookies.get('session') |
|
if session: |
|
response.set_cookie(key='session', value=request.cookies.get('session'), httponly=True) |
|
return response |
|
|
|
|
|
@ fapi.get("/track") |
|
async def track(url: str, q: str, request: Request): |
|
|
|
if q is None: |
|
q = '' |
|
|
|
logger.flag([url, q, request.cookies['access-token'], str(datetime.now())]) |
|
return {"message": "ok"} |
|
|
|
|
|
|
|
app2 = gr.mount_gradio_app(fapi, app, path="/") |
|
|
|
if __name__ == "__main__": |
|
uvicorn.run(app2, host="0.0.0.0", port=7860) |
|
|