Spaces:
Running
Running
import os | |
import re | |
import uuid | |
import json | |
import argparse | |
import torch | |
import gradio as gr | |
import pandas as pd | |
import plotly.express as px | |
import numpy as np | |
from data import load_tokenizer | |
from model import load_model | |
from datetime import datetime | |
from dateutil import parser | |
from demo_assets import * | |
from typing import List, Dict, Any, Optional, Tuple | |
from dataclasses import dataclass | |
from collections import defaultdict | |
def get_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--data_dir', default='/data/mohamed/data') | |
parser.add_argument('--aim_repo', default='/data/mohamed/') | |
parser.add_argument('--ckpt', default='electra-base.pt') | |
parser.add_argument('--aim_exp', default='mimic-decisions-1215') | |
parser.add_argument('--label_encoding', default='multiclass') | |
parser.add_argument('--multiclass', action='store_true') | |
parser.add_argument('--debug', action='store_true') | |
parser.add_argument('--save_losses', action='store_true') | |
parser.add_argument('--task', default='token', choices=['seq', 'token']) | |
parser.add_argument('--max_len', type=int, default=512) | |
parser.add_argument('--num_layers', type=int, default=3) | |
parser.add_argument('--kernels', nargs=3, type=int, default=[1,2,3]) | |
parser.add_argument('--model', default='roberta-base',) | |
parser.add_argument('--model_name', default='google/electra-base-discriminator',) | |
parser.add_argument('--gpu', default='0') | |
parser.add_argument('--grad_accumulation', default=2, type=int) | |
parser.add_argument('--pheno_id', type=int) | |
parser.add_argument('--unseen_pheno', type=int) | |
parser.add_argument('--text_subset') | |
parser.add_argument('--pheno_n', type=int, default=500) | |
parser.add_argument('--hidden_size', type=int, default=100) | |
parser.add_argument('--emb_size', type=int, default=400) | |
parser.add_argument('--total_steps', type=int, default=5000) | |
parser.add_argument('--train_log', type=int, default=500) | |
parser.add_argument('--val_log', type=int, default=1000) | |
parser.add_argument('--seed', default = '0') | |
parser.add_argument('--num_phenos', type=int, default=10) | |
parser.add_argument('--num_decs', type=int, default=9) | |
parser.add_argument('--num_umls_tags', type=int, default=33) | |
parser.add_argument('--batch_size', type=int, default=8) | |
parser.add_argument('--pos_weight', type=float, default=1.25) | |
parser.add_argument('--alpha_distil', type=float, default=1) | |
parser.add_argument('--distil', action='store_true') | |
parser.add_argument('--distil_att', action='store_true') | |
parser.add_argument('--distil_ckpt') | |
parser.add_argument('--use_umls', action='store_true') | |
parser.add_argument('--include_nolabel', action='store_true') | |
parser.add_argument('--truncate_train', action='store_true') | |
parser.add_argument('--truncate_eval', action='store_true') | |
parser.add_argument('--load_ckpt', action='store_true') | |
parser.add_argument('--gradio', action='store_true') | |
parser.add_argument('--optuna', action='store_true') | |
parser.add_argument('--mimic_data', action='store_true') | |
parser.add_argument('--eval_only', action='store_true') | |
parser.add_argument('--lr', type=float, default=4e-5) | |
parser.add_argument('--resample', default='') | |
parser.add_argument('--verbose', type=bool, default=True) | |
parser.add_argument('--use_crf', type=bool) | |
parser.add_argument('--print_spans', action='store_true') | |
return parser.parse_args() | |
args = get_args() | |
if args.task == 'seq' and args.pheno_id is not None: | |
args.num_labels = 1 | |
elif args.task == 'seq': | |
args.num_labels = args.num_phenos | |
elif args.task == 'token': | |
if args.use_umls: | |
args.num_labels = args.num_umls_tags | |
else: | |
args.num_labels = args.num_decs | |
if args.label_encoding == 'multiclass': | |
args.num_labels = args.num_labels * 2 + 1 | |
elif args.label_encoding == 'bo': | |
args.num_labels *= 2 | |
elif args.label_encoding == 'boe': | |
args.num_labels *= 3 | |
class KeyDef: | |
key: str | |
name: str | |
desc: str = '' | |
color: str = 'lightblue' | |
symbol: str = '' | |
class AnnotationState: | |
def __init__(self): | |
self.entity_regex = r'\[\@.*?\#.*?\*\](?!\#)' | |
self.recommend_regex = r'\[\$.*?\#.*?\*\](?!\#)' | |
self.history = [] | |
self.config_file = "configs/default.config" | |
self.press_commands = self.read_config() | |
# Internal state holds the actual annotations | |
self.annotations = [] | |
self.raw_text = "" | |
def read_config(self) -> List[KeyDef]: | |
if not os.path.exists(self.config_file): | |
default_config = [{ | |
'key': key, | |
'name': name, | |
'color': color, | |
'symbol': symbol | |
} | |
for key, name, color, symbol in zip(keys, categories, colors, unicode_symbols) | |
] | |
os.makedirs("configs", exist_ok=True) | |
with open(self.config_file, 'w') as fp: | |
json.dump(default_config, fp) | |
with open(self.config_file, 'r') as fp: | |
config_dict = json.load(fp) | |
return [KeyDef(**entry) for entry in config_dict] | |
def get_cmd_by_key(self, key: str) -> Optional[KeyDef]: | |
return next((cmd for cmd in self.press_commands if cmd.key == key), None) | |
def set_text(self, text: str): | |
"""Initialize with new text, clearing annotations""" | |
self.raw_text = text | |
self.annotations = [] | |
self.history = [] | |
def add_annotation(self, start: int, end: int, entity_type: str) -> str: | |
"""Add new annotation and return display text""" | |
# Save current state to history | |
self.history.append((self.raw_text, list(self.annotations))) | |
if len(self.history) > 20: | |
self.history.pop(0) | |
# Add new annotation | |
self.annotations.append((start, end, entity_type)) | |
return self.get_display_text() | |
def remove_annotation(self, start: int, end: int) -> str: | |
"""Remove annotation at position if it exists, splitting spans if needed""" | |
self.history.append((self.raw_text, list(self.annotations))) | |
new_annotations = [] | |
for a in self.annotations: | |
annotation_start, annotation_end, entity_type = a | |
# If the current annotation does not overlap, keep it as is | |
if annotation_end < start or annotation_start > end: | |
new_annotations.append(a) | |
# If the removed span is a proper subset, split the annotation | |
elif annotation_start < start and annotation_end > end: | |
new_annotations.append((annotation_start, start - 1, entity_type)) | |
new_annotations.append((end + 1, annotation_end, entity_type)) | |
# If there's overlap with the start, but not the end | |
elif annotation_start < start <= annotation_end: | |
new_annotations.append((annotation_start, start - 1, entity_type)) | |
# If there's overlap with the end, but not the start | |
elif annotation_start <= end < annotation_end: | |
new_annotations.append((end + 1, annotation_end, entity_type)) | |
self.annotations = new_annotations | |
return self.get_display_text() | |
def undo(self) -> str: | |
"""Undo last annotation action""" | |
if not self.history: | |
return self.get_display_text() | |
self.raw_text, self.annotations = self.history.pop() | |
return self.get_display_text() | |
def get_display_text(self) -> str: | |
"""Generate display text with HTML formatting for annotations""" | |
if not self.annotations: | |
return f'<div id="annotated-text">{self.raw_text}</div> <div id="legend"></div>' | |
# Sort annotations by start position | |
sorted_annotations = sorted(self.annotations, key=lambda x: (x[0], -x[1])) | |
# Build display text with HTML spans | |
result = ['<div id="annotated-text">'] | |
last_end = 0 | |
for start, end, entity_type in sorted_annotations: | |
if start < last_end and end > last_end: | |
start = last_end | |
elif start < last_end: | |
continue | |
# Add text before annotation | |
result.append(self.raw_text[last_end:start]) | |
# Add annotated text with highlighting | |
text = self.raw_text[start:end] | |
cmd = self.get_cmd_by_key(entity_type) | |
color = cmd.color | |
result.append(f'<span style="background-color: {color};" title="{cmd.name}">{text}</span>') # Nicer tooltip | |
last_end = end | |
# Add remaining text | |
result.append(self.raw_text[last_end:]) | |
result.append('</div>') | |
# Generate legend | |
legend = ['<div id="legend" style="margin-top: 10px;"><span style="font-weight: bold;">Legend:</span > '] # Margin and bold legend title | |
used_categories = sorted(list(set([a[2] for a in self.annotations]))) | |
for cat in used_categories: | |
cmd = self.get_cmd_by_key(cat) | |
legend.append(f'<span style="background-color: {cmd.color}; padding: 3px 5px; border-radius: 3px; margin-right: 5px; font-size:0.9em; display: inline-block; vertical-align: middle; color: black; font-family: sans-serif;">{cmd.name}</span>') # Improved legend item styling | |
legend.append('</div>') | |
result.extend(legend) | |
return "".join(result) | |
def get_annotated_text(self, annotator_id=None, discharge_summary_id=None) -> dict: | |
"""Generate a dictionary containing annotation data.""" | |
unique_id = str(uuid.uuid4())[:8] | |
annotations = [] | |
if self.annotations: | |
sorted_annotations = sorted(self.annotations, key=lambda x: (x[0], -x[1])) | |
for idx, (start, end, entity_type) in enumerate(sorted_annotations): | |
cmd = self.get_cmd_by_key(entity_type) | |
annotations.append({ | |
"decision": self.raw_text[start:end], | |
"category": f'Category {categories.index(cmd.name) + 1}: {cmd.name}', | |
"start_offset": start, | |
"end_offset": end, | |
"annotation_id": f'{unique_id}_{idx}' | |
}) | |
return { | |
"annotator_id": annotator_id if annotator_id else None, | |
"discharge_summary_id": discharge_summary_id if discharge_summary_id else None, | |
"annotations": annotations | |
} | |
def init_text(text): | |
if text: | |
state.set_text(text) | |
return state.get_display_text() | |
return "<div id='annotated-text'>Enter text to begin...</div>" | |
def add_entity(cmd_key, start: int, end: int): | |
"""Handle adding new entity annotations""" | |
if start == end: | |
return state.get_display_text(), "No text selected" | |
cmd = state.get_cmd_by_key(cmd_key) | |
if not cmd: | |
return state.get_display_text(), "Invalid command" | |
new_text = state.add_annotation(start, end, cmd.key) | |
return new_text, f"Added {cmd.name} entity" | |
def remove_entity(start: int, end: int): | |
"""Handle removal of annotations""" | |
if start == end: | |
return state.get_display_text(), "No text selected" | |
return state.remove_annotation(start, end), "Removed annotation" | |
def undo(): | |
"""Handle undoing the last action""" | |
return state.undo(), "Undid last action" | |
def download_annotations(annotator_id, discharge_summary_id): | |
"""Generates and provides annotation data for download.""" | |
annotation_data = state.get_annotated_text(annotator_id, discharge_summary_id) | |
with open(OUTPUT_PATH, 'w') as f: | |
json.dump(annotation_data, f, indent=4) | |
return OUTPUT_PATH | |
def refresh_annotations(annotator_id, discharge_summary_id): | |
"""Refreshes the displayed annotation JSON.""" | |
return state.get_annotated_text(annotator_id, discharge_summary_id) | |
def clear_annotations(): | |
state.set_text(state.raw_text) # Clears annotations by setting empty text | |
return gr.update(interactive=True, elem_classes=[]), state.get_display_text() # added value | |
def model_predict(text): | |
"""Placeholder for model prediction logic""" | |
output, t2c = predict(text) | |
spans = indicators_to_spans(output.argmax(-1), t2c) | |
spans = [(s, e, keys[c]) for c, s, e in spans] | |
return spans | |
def apply_predictions(text): | |
predictions = model_predict(text) | |
state.set_text(text) | |
for start, end, entity_type in predictions: | |
state.add_annotation(start, end, entity_type) | |
return state.get_display_text() | |
state = AnnotationState() | |
all_keys = [f'"{cmd.key}"' for cmd in state.press_commands] | |
key_list_str = f'[{", ".join(all_keys)}]' | |
shortcut_js = shortcut_js_template%key_list_str | |
def postprocess_labels(text, logits, t2c): | |
tags = [None for _ in text] | |
labels = logits.argmax(-1) | |
for i,cat in enumerate(labels): | |
if cat != OTHERS_ID: | |
char_ids = t2c(i) | |
if char_ids is None: | |
continue | |
for idx in range(char_ids.start, char_ids.end): | |
if tags[idx] is None and idx < len(text): | |
tags[idx] = categories[cat // 2] | |
for i in range(len(text)-1): | |
if text[i] == ' ' and (text[i+1] == ' ' or tags[i-1] == tags[i+1]): | |
tags[i] = tags[i-1] | |
return tags | |
def indicators_to_spans(labels, t2c = None): | |
def add_span(c, start, end): | |
if t2c(start) is None or t2c(end) is None: | |
start, end = -1, -1 | |
else: | |
start = t2c(start).start | |
end = t2c(end).end | |
span = (c, start, end) | |
spans.add(span) | |
spans = set() | |
num_tokens = len(labels) | |
num_classes = OTHERS_ID // 2 | |
start = None | |
cls = None | |
for t in range(num_tokens): | |
if start and labels[t] == cls + 1: | |
continue | |
elif start: | |
add_span(cls // 2, start, t - 1) | |
start = None | |
# if not start and labels[t] in [2*x for x in range(num_classes)]: | |
if not start and labels[t] != OTHERS_ID: | |
start = t | |
cls = int(labels[t]) // 2 * 2 | |
return spans | |
def extract_date(text): | |
pattern = r'(?<=Date: )\s*(\[\*\*.*?\*\*\]|\d{1,4}[-/]\d{1,2}[-/]\d{1,4})' | |
match = re.search(pattern, text).group(1) | |
start, end = None, None | |
for i, c in enumerate(match): | |
if start is None and c.isnumeric(): | |
start = i | |
elif c.isnumeric(): | |
end = i + 1 | |
match = match[start:end] | |
return match | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tokenizer = load_tokenizer(args.model_name) | |
model = load_model(args, device)[0] | |
model.eval() | |
torch.set_grad_enabled(False) | |
def predict(text): | |
encoding = tokenizer.encode_plus(text) | |
x = torch.tensor(encoding['input_ids']).unsqueeze(0).to(device) | |
mask = torch.ones_like(x) | |
output = model.generate(x, mask)[0] | |
return output, encoding.token_to_chars | |
def process(text): | |
if text is not None: | |
output, t2c = predict(text) | |
tags = postprocess_labels(text, output, t2c) | |
with open('log.csv', 'a') as f: | |
f.write(f'{datetime.now()},{text}\n') | |
return list(zip(text, tags)) | |
else: | |
return text | |
def process_sum(*inputs): | |
global sum_c | |
dates = {} | |
for i in range(sum_c): | |
text = inputs[i] | |
output, t2c = predict(text) | |
spans = indicators_to_spans(output.argmax(-1), t2c) | |
date = extract_date(text) | |
present_decs = set(cat for cat, _, _ in spans) | |
decs = {k: [] for k in sorted(present_decs)} | |
for c, s, e in spans: | |
decs[c].append(text[s:e]) | |
dates[date] = decs | |
out = "" | |
for date in sorted(dates.keys(), key = lambda x: parser.parse(x)): | |
out += f'## **[{date}]**\n\n' | |
decs = dates[date] | |
for c in decs: | |
out += f'### {unicode_symbols[c]} ***{categories[c]}***\n\n' | |
for dec in decs[c]: | |
out += f'{dec}\n\n' | |
return out | |
def get_structured_data(*inputs): | |
global sum_c | |
data = [] | |
for i in range(sum_c): | |
text = inputs[i] | |
output, t2c = predict(text) | |
spans = indicators_to_spans(output.argmax(-1), t2c) | |
date = extract_date(text) | |
for c, s, e in spans: | |
data.append({ | |
'date': date, | |
'timestamp': parser.parse(date), | |
'decision_cat': c, | |
'decision_type': categories[c], 'details': text[s:e]}) | |
return data | |
def update_inputs(inputs): | |
outputs = [] | |
if inputs is None: | |
c = 0 | |
else: | |
inputs = [open(f.name).read() for f in inputs] | |
for i, text in enumerate(inputs): | |
outputs.append(gr.update(value=text, visible=True)) | |
c = len(inputs) | |
n = SUM_INPUTS | |
for i in range(n - c): | |
outputs.append(gr.update(value='', visible=False)) | |
global sum_c; sum_c = c | |
global structured_data | |
structured_data = get_structured_data(*inputs) if inputs is not None else [] | |
return outputs | |
def add_ex(*inputs): | |
global sum_c | |
new_idx = sum_c | |
if new_idx < SUM_INPUTS: | |
out = inputs[:new_idx] + (gr.update(visible=True),) + inputs[new_idx+1:] | |
sum_c += 1 | |
else: | |
out = inputs | |
return out | |
def sub_ex(*inputs): | |
global sum_c | |
new_idx = sum_c - 1 | |
if new_idx > 0: | |
out = inputs[:new_idx] + (gr.update(visible=False),) + inputs[new_idx+1:] | |
sum_c -= 1 | |
else: | |
out = inputs | |
return out | |
def create_timeline_plot(data: List[Dict[str, Any]]): | |
df = pd.DataFrame(data) | |
# df['int_cat'] = pd.factorize(df['decision_type'])[0] | |
# df['int_cat_jittered'] = df['int_cat'] + np.random.uniform(-0.1, 0.1, size=len(df)) | |
# fig = px.scatter(df, x='date', y='int_cat_jittered', color='decision_type', hover_data=['details'], | |
# title='Patient Timeline') | |
# fig.update_layout( | |
# yaxis=dict( | |
# tickmode='array', | |
# tickvals=df['int_cat'].unique(), | |
# ticktext=df['decision_type'].unique()), | |
# xaxis_title='Date', | |
# yaxis_title='Category') | |
fig = px.strip(df, x='date', y='decision_type', color='decision_type', hover_data=['details'], | |
stripmode = "overlay", | |
title='Patient Timeline') | |
fig.update_traces(jitter=1.0, marker=dict(size=10, opacity=0.6)) | |
fig.update_layout(height=600) | |
return fig | |
def filter_timeline(decision_types: str, start_date: str, end_date: str) -> px.scatter: | |
global structured_data | |
filtered_data = structured_data | |
if 'All' not in decision_types: | |
filtered_data = [event for event in filtered_data if event['decision_type'] in decision_types] | |
start = parser.parse(start_date) | |
end = parser.parse(end_date) | |
filtered_data = [event for event in filtered_data if start <= event['timestamp'] <= end] | |
return create_timeline_plot(filtered_data) | |
def generate_summary(*inputs) -> str: | |
global structured_data | |
structured_data = get_structured_data(*inputs) | |
dates = defaultdict(lambda: defaultdict(list)) | |
for event in structured_data: | |
dates[event['date']][event['decision_cat']].append(event['details']) | |
out = "" | |
for date in sorted(dates.keys(), key = lambda x: parser.parse(x)): | |
out += f'## **[{date}]**\n\n' | |
decs = dates[date] | |
for c in decs: | |
out += f'### {unicode_symbols[c]} ***{categories[c]}***\n\n' | |
for dec in decs[c]: | |
out += f'{dec}\n\n' | |
return out, create_timeline_plot(structured_data) | |
global sum_c | |
sum_c = 1 | |
structured_data = [] | |
device = model.backbone.device | |
with gr.Blocks(head=shortcut_js, | |
title='MedDecXtract', css=css) as demo: | |
gr.Image('assets/logo.png', height=100, container=False, show_download_button=False) | |
gr.Markdown(title) | |
with gr.Tab("Label a Clinical Note"): | |
gr.Markdown(label_desc) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Enter a Discharge Summary or Clinical Note"), | |
text_input = gr.Textbox( | |
# value=examples[0], | |
label="", | |
placeholder="Enter text here...") | |
text_btn = gr.Button('Run') | |
with gr.Column(): | |
gr.Markdown("## Labeled Summary or Note"), | |
text_out = gr.Highlight(label="", combine_adjacent=True, show_legend=False, color_map=color_map) | |
gr.Examples(text_examples, inputs=text_input) | |
with gr.Tab("Patient History Visualization"): | |
gr.Markdown(vis_desc) | |
with gr.Column(): | |
sum_inputs = [gr.Text(label='Clinical Note 1', elem_classes='text-limit')] | |
sum_inputs.extend([gr.Text(label='Clinical Note %d'%i, visible=False, elem_classes='text-limit') | |
for i in range(2, SUM_INPUTS + 1)]) | |
with gr.Row(): | |
ex_add = gr.Button("+") | |
ex_sub = gr.Button("-") | |
upload = gr.File(label='Upload clinical notes', file_types=['text'], file_count='multiple') | |
gr.Examples(sum_examples, inputs=upload, | |
fn = update_inputs, outputs=sum_inputs, run_on_click=True) | |
with gr.Column(): | |
with gr.Row(): | |
decision_type = gr.Dropdown(["All"] + categories, | |
multiselect=True, | |
label="Decision Type", value="All") | |
start_date = gr.Textbox(label="Start Date (MM/DD/YYYY)", value="01/01/2006") | |
end_date = gr.Textbox(label="End Date (MM/DD/YYYY)", value="12/31/2024") | |
filter_button = gr.Button("Filter Timeline") | |
timeline_plot = gr.Plot() | |
summary_button = gr.Button("Generate Summary") | |
with gr.Accordion('Summary'): | |
summary_output = gr.Markdown(elem_id='sum-out') #gr.Textbox(label="Summary") | |
with gr.Tab("Interactive Text Annotator"): | |
gr.Markdown(annotator_desc) | |
with gr.Row(): | |
with gr.Column(): | |
annot_text_input = gr.Textbox( | |
label="Enter Text to Annotate", | |
placeholder="Enter or paste text here...", | |
lines=5, | |
elem_id='annot_text_input' | |
) | |
gr.Examples(text_examples, inputs=annot_text_input) | |
msg_output = gr.Textbox(label="Status Messages", interactive=False) | |
display_area = gr.HTML( | |
label="Annotated Text", | |
value="<div id='annotated-text'><i>Output box</i></div>" | |
) | |
k = 3 # Set the maximum number of buttons per row | |
num_buttons = len(state.press_commands) | |
rows = (num_buttons + k - 1) // k | |
entity_buttons = [] | |
with gr.Group(): | |
predict_btn = gr.Button("Generate Predictions", size='lg', variant='primary') | |
for i in range(rows): | |
with gr.Row(): | |
for j in range(min(k, num_buttons - i * k)): | |
real_idx = i * k + j | |
cmd = state.press_commands[real_idx] | |
entity_buttons.append( | |
gr.Button(f"{cmd.symbol} {cmd.name} ({cmd.key})", | |
elem_id=f'btn_{cmd.key}', | |
size='sm')) | |
if i == (rows - 1): | |
remove_btn = gr.Button("Remove (q)", size='sm', variant='secondary', elem_id='btn_q') | |
undo_btn = gr.Button("Undo (z)", size='sm', elem_id='btn_z') | |
clear_btn = gr.Button("Clear Annotations", size='lg', variant='stop') | |
with gr.Accordion("Download/View Annotations \U0001F4BE", open=False): # Combined Accordion | |
with gr.Row(): | |
annotator_id = gr.Textbox(label="Annotator ID", placeholder="Enter your annotator ID") | |
discharge_summary_id = gr.Textbox(label="Discharge Summary ID", placeholder="Enter the discharge summary ID") | |
with gr.Row(): | |
download_file = gr.File(interactive=False, visible=True, label="Download") # download_btn renamed and made into gr.File | |
annotations_json = gr.JSON(label="Annotations JSON") | |
refresh_btn = gr.Button("🔄 Refresh Annotations", elem_id="refresh_btn") # Renamed for clarity | |
download_btn = gr.Button("Download Annotated Text", elem_id="download_btn") # Added a button to trigger download | |
# Hidden state components for selection | |
selection_start = gr.Number(value=0, visible=False) | |
selection_end = gr.Number(value=0, visible=False) | |
gr.Markdown(desc) | |
# Functions | |
# Wire up event handlers | |
annot_text_input.change(init_text, annot_text_input, display_area) | |
# Wire up the buttons with the selection JavaScript | |
for btn, cmd in zip(entity_buttons, state.press_commands): | |
btn.click(lambda s=None, e=None, c=cmd.key: add_entity(c, s, e),[selection_start, selection_end], [display_area, msg_output], js=select_js).then( | |
lambda: gr.update(interactive=state.annotations == [], elem_classes=[] if state.annotations == [] else ['locked-input']), # Disable input if annotations exist | |
outputs=annot_text_input | |
) | |
remove_btn.click( remove_entity, [selection_start, selection_end], [display_area, msg_output], js=select_js).then( | |
lambda: gr.update(interactive=state.annotations == [], elem_classes=[] if state.annotations == [] else ['locked-input']), | |
outputs=annot_text_input | |
) | |
undo_btn.click(undo, None, [display_area, msg_output]).then( | |
lambda: gr.update(interactive=state.annotations == [], elem_classes=[] if state.annotations == [] else ['locked-input']), | |
outputs=annot_text_input | |
) | |
download_btn.click(download_annotations, [annotator_id, discharge_summary_id], download_file) # Output to download_file | |
refresh_btn.click(refresh_annotations, [annotator_id, discharge_summary_id], annotations_json) # No change in functionality | |
clear_btn.click(clear_annotations, outputs=[annot_text_input, display_area]) | |
predict_btn.click(apply_predictions, annot_text_input, display_area).then( | |
lambda: gr.update(interactive=state.annotations == [], elem_classes=[] if state.annotations == [] else ['locked-input']), | |
outputs=text_input | |
) | |
text_input.submit(process, inputs=text_input, outputs=text_out) | |
text_btn.click(process, inputs=text_input, outputs=text_out) | |
upload.change(update_inputs, inputs=upload, outputs=sum_inputs) | |
ex_add.click(add_ex, inputs=sum_inputs, outputs=sum_inputs) | |
ex_sub.click(sub_ex, inputs=sum_inputs, outputs=sum_inputs) | |
filter_button.click(filter_timeline, inputs=[decision_type, start_date, end_date], outputs=timeline_plot) | |
summary_button.click(generate_summary, inputs=sum_inputs, outputs=[summary_output, timeline_plot]) | |
demo.launch(share=True) | |