import uuid import zipfile from datetime import datetime from pathlib import Path from time import sleep, time import torch from email_validator import validate_email, EmailNotValidError from Bio import SeqIO import gradio as gr from gradio_rangeslider import RangeSlider from omegaconf import OmegaConf import pandas as pd from rdkit import Chem from rdkit.Chem import PandasTools from inference import (read_fragment_library, process_fragment_library, extract_pockets, dock_fragments, generate_linkers, select_fragment_pairs) from app import static, fn, db gr.set_static_paths(paths=["data/", "results/"]) job_db = db.init_job_db() FRAG_LIBS = { lib_path.stem.replace('_', ' '): str(lib_path) for lib_path in Path('data/fragment_libraries').glob('*') } FRAG_LIB_PROCESS_OPTS = { 'Dehalogenate Fragments': 'dehalogenate', 'Discard Inorganic Fragments': 'discard_inorganic' } POCKET_EXTRACT_OPTS = { 'Topological Prediction with Fpocket': { 'name': 'fpocket', 'info': 'If your protein structure contains co-crystallized ligands, you may CLICK ON ' 'the ligand with your desired binding pose to predict its corresponding pocket. ' 'Otherwise, pockets will be predicted based on the protein structure alone. After extracting ' 'the pocket(s), CLICK ON your desired pocket to SELECT ONE for fragment linking.', 'params': {} }, 'Fragment Conformer Clustering': { 'name': 'clustering', 'info': 'Conformers of docked fragments will be clustered based on their spatial similarity, and conformers ' 'within a cluster will be selected for linking. This strategy takes delayed effect AFTER DOCKING.' } } # TODO import from inference def process_drug_library_upload(library_upload): if library_upload.endswith('.csv'): df = pd.read_csv(library_upload) elif library_upload.endswith('.sdf'): df = PandasTools.LoadSDF( library_upload, smilesName='X1', molColName='mol', ) else: raise gr.Error('Current supported fragment library formats only include CSV and SDF files.') fn.validate_columns(df, ['X1']) return df def query_job_status(job_id): gr.Info('Start querying the job database...') stop = False retry = 0 while not stop: try: sleep(5) job = job_db.job_lookup(job_id) if job: if job['status'] == "RUNNING": yield { pred_lookup_status: f''' Your job (ID: **{job['id']}**) started at **{job['start_time']}** and is **RUNNING...** It might take a few minutes up to a few hours depending on the input size and the queue status. You may keep the page open and wait for job completion, or close the page and revisit later to look up the job status using the job id. You will also receive an email notification once the job is done. ''', pred_lookup_btn: gr.Button(visible=False), pred_lookup_stop_btn: gr.Button(visible=True) } if job['status'] == "COMPLETED": stop = True msg = f"Your GenFBDD job (ID: {job['id']}) has been **COMPLETED**" msg += f" at {job['end_time']}" if job.get('end_time') else "" msg += f" and the results will expire by {job['expiry_time']}." if job.get('expiry_time') else "." msg += f' Redirecting to the Results page...' gr.Info(msg) yield { pred_lookup_status: msg, pred_lookup_btn: gr.Button(visible=True), pred_lookup_stop_btn: gr.Button(visible=False), tabs: gr.Tabs(selected='result'), result_state: job } if job['status'] == "FAILED": stop = True msg = f'Your GenFBDD job (ID: {job_id}) has **FAILED**' msg += f" at {job['end_time']}" if job.get('end_time') else '' msg += f" due to error: {job['error']}." if job.get('expiry_time') else '.' gr.Info(msg) yield { pred_lookup_status: msg, pred_lookup_btn: gr.Button(visible=True), pred_lookup_stop_btn: gr.Button(visible=False), tabs: gr.Tabs(selected='job'), } else: stop = (retry > 3) if not stop: msg = f'Job ID {job_id} not found. Retrying... ({retry})' else: msg = f'Job ID {job_id} not found after {retry} retries. Please double-check the job ID.' gr.Info(msg) retry += 1 yield { pred_lookup_status: msg, pred_lookup_btn: gr.Button(visible=True), pred_lookup_stop_btn: gr.Button(visible=False), tabs: gr.Tabs(selected='job'), } except Exception as e: raise gr.Error(f'Failed to retrieve job status due to error: {str(e)}') def checkbox_group_selections_to_kwargs(selected_options, option_mapping): kwargs = { option_mapping[label]: label in selected_options for label in option_mapping } return kwargs def job_validate( frag_file, frag_df, prot_file, pocket_name, pocket_method, pocket_fs, email, run_info, session_info: gr.Request ): if len(frag_df) == 0 or not frag_file: raise gr.Error("Please provide a valid fragment library.") if not prot_file: raise gr.Error("Please provide a valid protein structure.") pocket_extraction_method = POCKET_EXTRACT_OPTS[pocket_method]['name'] pocket_path_dict = {} if pocket_extraction_method == 'fpocket': if not pocket_name or not pocket_fs: raise gr.Error("If you wish to use a protein pocket predicted by Fpocket, " "please select a pocket after clicking on 'Extract Pocket'.") else: for pocket_file in pocket_fs: if Path(pocket_file).stem.startswith(pocket_name): pocket_path_dict[pocket_name] = pocket_file if email: try: email_info = validate_email(email, check_deliverability=False) email = email_info.normalized except EmailNotValidError as e: raise gr.Error(f"Invalid email address: {str(e)}.") if run_info: raise gr.Error(f"You already have a running prediction job (ID: {run_info['id']}) under this session. " "Please wait for it to complete before submitting another job.") if check := job_db.check_user_running_job(email, session_info): raise gr.Error(check) gr.Info('Finished processing inputs. Initiating the GenFBDD job... ' 'You will be redirected to Job Status page.') job_id = str(uuid.uuid4()) job_info = { 'id': job_id, 'status': 'RUNNING', 'fragment_library_file': frag_file, 'protein_structure_file': prot_file, 'pocket_extraction_method': pocket_extraction_method, 'protein_pocket_files': pocket_path_dict, 'email': email, 'ip': session_info.headers.get('x-forwarded-for', session_info.client.host), 'cookies': dict(session_info.cookies), 'start_time': time(), 'end_time': None, 'expiry_time': None, 'error': None } job_db.insert(job_info) return job_info def dock_link( frag_lib, prot, dock_n_steps, dock_n_poses, dock_confidence_threshold, linker_frag_dist, linker_strategy, linker_n_mols, linker_size, linker_steps, job_info ): job_id = job_info['id'] pocket_extract_method = job_info['pocket_extraction_method'] pocket_path_dict = job_info['protein_pocket_files'] update_info = {} config = OmegaConf.load('configs/gen_fbdd_v1.yaml') device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f'Using device: {device}') date_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f") out_dir = f'results/{date_time}' frag_lib['X2'] = prot frag_lib['ID2'] = Path(prot).stem try: docking_df = dock_fragments( df=frag_lib, out_dir=out_dir, score_ckpt=config.score_ckpt, confidence_ckpt=config.confidence_ckpt, inference_steps=dock_n_steps, n_poses=dock_n_poses, docking_batch_size=config.docking_batch_size, initial_noise_std_proportion=config.initial_noise_std_proportion, no_final_step_noise=config.no_final_step_noise, temp_sampling_tr=config.temp_sampling_tr, temp_sampling_rot=config.temp_sampling_rot, temp_sampling_tor=config.temp_sampling_tor, temp_psi_tr=config.temp_psi_tr, temp_psi_rot=config.temp_psi_rot, temp_psi_tor=config.temp_psi_tor, temp_sigma_data_tr=config.temp_sigma_data_tr, temp_sigma_data_rot=config.temp_sigma_data_rot, temp_sigma_data_tor=config.temp_sigma_data_tor, save_docking=pocket_extract_method == 'clustering', device=device, ) linking_df = select_fragment_pairs( docking_df, method=pocket_extract_method, pocket_path_dict=pocket_path_dict, frag_dist_range=linker_frag_dist, confidence_threshold=dock_confidence_threshold, rmsd_threshold=1.5, out_dir=out_dir, ) if linking_df is not None and len(linking_df) > 0: # Generate linkers generate_linkers( linking_df, backbone_atoms_only=True, output_dir=out_dir, n_samples=linker_n_mols, n_steps=linker_steps, linker_size=linker_size, anchors=None, max_batch_size=config.linker_batch_size, random_seed=None, robust=False, linker_ckpt=config.linker_ckpt, size_ckpt=config.size_ckpt, linker_condition=None, device=device, ) job_type = 'linking' else: gr.Warning('No fragment-conformer pairs found for linking. Please adjust the docking / linking settings.') job_type = 'docking' update_info = { 'status': "COMPLETED", 'error': None, 'output_dir': out_dir, 'type': job_type, } return {result_state: job_info | update_info, run_state: {}} except Exception as e: gr.Warning(f"Job failed due to error: {str(e)}") update_info = { 'status': "FAILED", 'error': str(e), 'output_dir': None } return {result_state: {}, run_state: {}} finally: job_db.job_update( job_id=job_id, update_info=update_info ) def get_session_state(request: gr.Request): return request THEME = gr.themes.Base( spacing_size="sm", text_size='md', font=gr.themes.GoogleFont("Roboto"), primary_hue='emerald', secondary_hue='emerald', neutral_hue='slate', ).set( body_background_fill='*primary_50' # background_fill_primary='#eef3f9', # background_fill_secondary='white', # checkbox_label_background_fill='#eef3f9', # checkbox_label_background_fill_hover='#dfe6f0', # checkbox_background_color='white', # checkbox_border_color='#4372c4', # border_color_primary='#4372c4', # border_color_accent='#2e6ab5', # button_primary_background_fill='#2e6ab4', # button_primary_text_color='white', # body_text_color='#28496F', # block_background_fill='#fbfcfd', # block_title_text_color='#28496F', # block_label_text_color='#28496F', # block_info_text_color='#505358', # block_border_color=None, # input_border_color='#4372c4', # panel_border_color='#4372c4', # input_background_fill='#F1F2F4', ) with gr.Blocks(theme=THEME, title='GenFBDD', css=static.CSS, delete_cache=(3600, 48 * 3600)) as demo: run_state = gr.State(value={}) session_state = gr.State(value={}) # script_init_frame = gr.HTML(static.PROTEIN_VIEW_IFRAME) with gr.Tabs() as tabs: with gr.Tab(label='Home', id='home'): gr.Markdown(''' # GenFBDD - A Fragment-Based Drug Design Protocol Based on SOTA Molecular Generative Models Given a fragment library and a target protein, GenFBDD blindly docks the fragments to the protein and generates linkers connecting the selected fragments, generating novel scaffolds or drug-like molecules with desirable binding conformations. ''') with gr.Row(): with gr.Column(variant='panel'): gr.Markdown('## Chemical Fragment Library') # Fragment settings frag_lib_dropdown = gr.Dropdown( label='Select a Preset Fragment Library', choices=list(FRAG_LIBS.keys()), value=None, ) # with gr.Row(): # gr.File(label='Example SDF fragment library', # value='data/examples/fragment_library.sdf', interactive=False) # gr.File(label='Example CSV fragment library', # value='data/examples/fragment_library.csv', interactive=False) frag_lib_upload_btn = gr.UploadButton( label='OR Upload Your Own Library', variant='primary' ) frag_lib_file = gr.File( label='Fragment Library File (Original)', file_count='single', interactive=False, visible=False ) frag_lib_orig_df = gr.State(value=pd.DataFrame()) frag_lib_mod_df = gr.State(value=pd.DataFrame()) # TODO: Tabulator with gr.HTML() for fragment library preview frag_lib_view = gr.DataFrame( visible=True, interactive=False, elem_id='frag_lib_view', ) with gr.Group(): frag_lib_process_opts = gr.CheckboxGroup( label='Fragment Preparation Options', info='1) All fragments consisting of multiple fragments will be split into individual ' 'fragments. 2) All fragments consisting of a single heavy atom will be discarded. ' '3) All fragments will then be processed in the order of the selected options. ' '4) Finally, fragments will be deduplicated based on their SMILES.', choices=list(FRAG_LIB_PROCESS_OPTS.keys()), value=['Dehalogenate Fragments', 'Discard Inorganic Fragments'], interactive=True, ) frag_lib_process_btn = gr.Button(value='Process Fragments', variant='primary') # Fragment library preview with gr.Column(variant='panel'): gr.Markdown('## Target Protein Structure') # Protein settings with gr.Row(equal_height=True): prot_query_dropdown = gr.Dropdown( label='Select a Protein Structure Query Strategy', choices=[ 'PDB ID', 'UniProt ID', 'FASTA Sequence', ], interactive=True, scale=4 ) prot_query_input = gr.Textbox( show_label=False, placeholder='Enter the protein query here', scale=3, ) with gr.Row(): prot_query_btn = gr.Button(value='Query', variant='primary', scale=1) prot_upload_btn = gr.UploadButton( label='OR Upload Your PDB/FASTA File', variant='primary', file_types=['.pdb', '.fasta'], scale=2 ) input_prot_file = gr.File( label='Protein Structure File (Original)', file_count='single', interactive=False, visible=False ) input_prot_view = gr.HTML('
') with gr.Group(): pocket_extract_dropdown = gr.Dropdown( label='Select a Pocket Extraction Method', choices=list(POCKET_EXTRACT_OPTS.keys()), info=POCKET_EXTRACT_OPTS[list(POCKET_EXTRACT_OPTS.keys())[0]]['info'], value=list(POCKET_EXTRACT_OPTS.keys())[0], interactive=True, ) selected_pocket = gr.Textbox(visible=False) selected_ligand = gr.Textbox(visible=False) pocket_files = gr.Files(visible=False) pocket_extract_btn = gr.Button(value='Extract Pocket', variant='primary') # Target protein preview with gr.Row(): with gr.Column(variant='panel'): gr.Markdown('## Dock Phase Settings') n_confs_per_frag = gr.Slider( value=5, minimum=1, maximum=20, step=1, label="Number of conformers to generate per fragment", interactive=True ) dock_confidence_cutoff = gr.Slider( value=-1.0, minimum=-2.0, maximum=0, step=0.1, label="Confidence cutoff for filtering conformers of docked fragments (>0: high, <=-1.5: low)", interactive=True ) with gr.Accordion(label='Advanced Options', open=False): dock_model = gr.Dropdown( label='Select a Fragment Docking Model', choices=['DiffDock-L'], interactive=True, ) dock_steps = gr.Slider( minimum=20, maximum=40, step=1, label="Number of Denoising Steps for Docking Fragments", interactive=True ) with gr.Column(variant='panel'): gr.Markdown('## Link Phase Settings') frag_conf_combo_strategy = gr.Radio( label='Select a Fragment-Conformer Linking Strategy', choices=[ 'Link Pairs of Fragment-Conformers Contacting the Pocket', # 'Link Maximal Fragment-Conformers Spanning the Entire Pocket', ], value='Link Pairs of Fragment-Conformers Contacting the Pocket', ) frag_dist_range_slider = RangeSlider( value=[2, 8], minimum=1, maximum=10, step=1, label="Fragment-Conformer Distance Range (Å) Eligible for Linking", interactive=True ) n_mols_per_combo_slider = gr.Slider( value=10, minimum=1, maximum=20, step=1, label="Number of molecules to generate per fragment conformer combination", interactive=True ) with gr.Accordion(label='Advanced Options', open=False): link_model = gr.Dropdown( label='Select a Linker Generation Model', choices=['DiffLinker'], interactive=True, ) linker_size_slider = gr.Slider( minimum=0, maximum=20, step=1, label="Linker Size", info="0: automatically predicted; >=1: fixed size", interactive=True ) linker_steps_slider = gr.Slider( minimum=100, maximum=500, step=10, label="Number of Denoising Steps for Generating Linkers", interactive=True ) with gr.Row(equal_height=True): email_input =gr.Textbox( label='Email Address (Optional)', info="Your email address will be used to notify you of the status of your job. " "If you cannot receive the email, please check your spam/junk folder." ) with gr.Column(): clr_btn = gr.ClearButton( value='Reset Inputs', components=[] ) run_btn = gr.Button(value='Run GenFBDD', variant='primary') with gr.Tab(label='Results', id='result'): # Results result_state = gr.State(value={}) result_table_orig_df = gr.State(value=pd.DataFrame()) result_table_mod_df = gr.State(value=pd.DataFrame()) result_protein_file = gr.File(visible=False, interactive=False) with gr.Column(variant='panel'): with gr.Row(): scores = gr.CheckboxGroup(list(fn.SCORE_MAP.keys()), label='Compound Scores') filters = gr.CheckboxGroup(list(fn.FILTER_MAP.keys()), label='Compound Filters') with gr.Row(): prop_clr_btn = gr.ClearButton(value='Clear Properties', interactive=False) prop_calc_btn = gr.Button(value='Calculate Properties', interactive=False) with gr.Row(): result_table_view = gr.HTML('') with gr.Column(): result_prot_view = gr.HTML('') result_file_btn = gr.Button(value='Create Result File', visible=False) result_download_file = gr.File(label='Download Result File', visible=False) with gr.Tab(label='Job Status', id='job'): gr.Markdown(''' To check the status of an in-progress or historical job using the job ID and retrieve the predictions if the job has completed. Note that predictions are only kept for 48 hours upon job completion. You will be redirected to `Results` for carrying out further analysis and generating the full report when the job is done. If the the query fails to respond, please wait for a few minutes and refresh the page to try again. ''') with gr.Row(): with gr.Column(scale=1): loader_html = gr.HTML('', visible=False) with gr.Column(scale=4): pred_lookup_id = gr.Textbox( label='Input Your Job ID', placeholder='e.g., e9dfd149-3f5c-48a6-b797-c27d027611ac', info="Your job ID is a UUID4 string that you receive after submitting a job on the " "page or in the email notification.") pred_lookup_btn = gr.Button(value='Lookup the Job Status', variant='primary', visible=True) pred_lookup_stop_btn = gr.Button(value='Stop Tracking', variant='stop', visible=False) pred_lookup_status = gr.Markdown() # Event handlers ## Home tab ### Fragment Library frag_lib_dropdown.change( fn=lambda lib: gr.File(FRAG_LIBS[lib], visible=True), inputs=[frag_lib_dropdown], outputs=[frag_lib_file], ) frag_lib_upload_btn.upload( fn=lambda file: gr.File(str(Path(file)), visible=True), inputs=[frag_lib_upload_btn], outputs=[frag_lib_file], ) # Changing the file updates the original df, the modified df, and the view frag_lib_file.change( fn=read_fragment_library, inputs=[frag_lib_file], outputs=[frag_lib_orig_df], ).success( fn=lambda df: [df, gr.DataFrame(df.drop(columns='mol'), visible=True)], inputs=[frag_lib_orig_df], outputs=[frag_lib_mod_df, frag_lib_view], ) # Processing the fragment library updates the modified df frag_lib_process_btn.click( fn=lambda: gr.Info('Processing fragment library...'), ).then( fn=lambda df, opts: [ new_df:=process_fragment_library( df, **checkbox_group_selections_to_kwargs(opts, FRAG_LIB_PROCESS_OPTS) ), gr.DataFrame(new_df.drop(columns='mol'), visible=True) ], inputs=[frag_lib_orig_df, frag_lib_process_opts], outputs=[frag_lib_mod_df, frag_lib_view], ) def preprocess_protein_file(file): filepath = Path(file.name) if filepath.suffix == '.pdb': return { input_prot_file: gr.File(str(filepath), visible=True), } elif filepath.suffix == '.fasta': seq = next(SeqIO.parse(file, 'fasta')).seq filepath = fn.pdb_query(seq, method='FASTA Sequence') return { input_prot_file: gr.File(str(filepath), visible=True), prot_query_input: seq, prot_query_dropdown: 'FASTA Sequence', } ### Protein Structure # prot_upload_btn.upload( # fn=lambda file: gr.File(str(Path(file)), visible=True), # inputs=[prot_upload_btn], # outputs=[prot_file], # ) # prot_file.change( # fn=lambda file: gr.HTML(fn.create_complex_view_html(file), visible=True), # inputs=[prot_file], # outputs=[input_prot_view], # ) prot_upload_btn.upload( fn=preprocess_protein_file, inputs=[prot_upload_btn], outputs=[input_prot_file, prot_query_dropdown, prot_query_input], ) prot_query_btn.click( fn=fn.pdb_query, inputs=[prot_query_input, prot_query_dropdown], outputs=[input_prot_file], ) input_prot_file.change( fn=lambda x, y: [gr.File(str(x), visible=True)], inputs=[input_prot_file, input_prot_view], outputs=[input_prot_file], js=static.CREATE_MOL_VIEW, ) #### Pocket Extraction pocket_extract_dropdown.select( fn=lambda method: gr.Button(visible=False) if POCKET_EXTRACT_OPTS[method] == 'clustering' else gr.Button(visible=True), inputs=[pocket_extract_dropdown], outputs=[pocket_extract_btn], ) # pocket_extract_btn.click( # fn=lambda: gr.Info('Extracting pocket...'), # ).then( # fn=fn.extract_pockets_and_update_view, # js=static.RETURN_LIGAND_SELECTION_JS, # inputs=[prot_file, selected_ligand], # outputs=[input_prot_view, pocket_path_dict, selected_ligand, selected_pocket], # ) pocket_extract_btn.click( fn=lambda: gr.Info('Extracting pocket...') ).success( fn=lambda x, y: [x, y], js=static.RETURN_SELECTION, inputs=[selected_ligand, selected_pocket], outputs=[selected_ligand, selected_pocket], ).then( fn=lambda prot, lig: [list(extract_pockets(prot, lig).values()), '', ''], inputs=[input_prot_file, selected_ligand], outputs=[pocket_files, selected_ligand, selected_pocket], ).success( fn=lambda x, y: gr.Info('Pocket extraction completed.'), js=static.UPDATE_PROT_VIEW, inputs=[pocket_files, input_prot_view], ) ### Dock-Link Pipeline job_valid = run_btn.click( fn=lambda x, y: [x, y], js=static.RETURN_SELECTION, inputs=[selected_ligand, selected_pocket], outputs=[selected_ligand, selected_pocket], ).success( fn=job_validate, inputs=[ frag_lib_file, frag_lib_mod_df, input_prot_file, selected_pocket, pocket_extract_dropdown, pocket_files, email_input, run_state ], outputs=[run_state], ) job_valid.success( fn=dock_link, inputs=[ frag_lib_mod_df, input_prot_file, dock_steps, n_confs_per_frag, dock_confidence_cutoff, frag_dist_range_slider, frag_conf_combo_strategy,n_mols_per_combo_slider, linker_size_slider, linker_steps_slider, run_state ], outputs=[result_state, run_state], concurrency_limit=1, concurrency_id="gpu_queue" ) ### Job Status user_job_lookup = pred_lookup_btn.click( lambda: '