|
import os |
|
import pandas as pd |
|
import re |
|
import requests |
|
import time |
|
|
|
from collections import defaultdict |
|
from io import StringIO |
|
|
|
|
|
mistakes = { |
|
'1': ['7', 'I', 'L', 'T'], |
|
'7': ['1', 'I', 'L', 'T'], |
|
'I': ['1', '7', 'L', 'T'], |
|
'L': ['1', '7', 'I', 'T'], |
|
'T': ['1', '7', 'I', 'L'], |
|
'0': ['D', 'O', 'V'], |
|
'D': ['0', 'O', 'V'], |
|
'O': ['0', 'D', 'V'], |
|
'V': ['0', 'D', 'O'], |
|
'4': ['A', 'X'], |
|
'A': ['4', 'X'], |
|
'X': ['4', 'A'], |
|
'5': ['S'], |
|
'S': ['5'], |
|
'F': ['H'], |
|
'H': ['F'], |
|
'9': ['P'], |
|
'P': ['9'] |
|
} |
|
|
|
raw_url = "https://www.ebi.ac.uk/gwas/api/search/downloads/alternative" |
|
gwas_path = "resources/gwas_catalog.tsv" |
|
|
|
def permutate(word): |
|
|
|
if len(word) == 0: |
|
return [''] |
|
|
|
change = [] |
|
res = permutate(word[1:]) |
|
|
|
if word[0] in mistakes: |
|
for m in mistakes[word[0]]: |
|
change.extend([m + r for r in res]) |
|
|
|
return [word[0] + r for r in res] + change |
|
|
|
def call(url): |
|
|
|
while True: |
|
try: |
|
res = requests.get(url) |
|
time.sleep(1) |
|
break |
|
except Exception as e: |
|
print(e) |
|
|
|
return res |
|
|
|
def generate_raw_files(): |
|
|
|
|
|
if os.path.exists(gwas_path): |
|
gwas = pd.read_csv(gwas_path, delimiter='\t')[['DISEASE/TRAIT', 'CHR_ID', 'MAPPED_GENE', 'SNPS', 'P-VALUE', 'OR or BETA']] |
|
else: |
|
data = requests.get(raw_url).content.decode('utf-8') |
|
gwas = pd.read_csv(StringIO(data), delimiter='\t')[['DISEASE/TRAIT', 'CHR_ID', 'MAPPED_GENE', 'SNPS', 'P-VALUE', 'OR or BETA']] |
|
|
|
|
|
gwas_gene_rsid = gwas[['MAPPED_GENE', 'SNPS']] |
|
gwas_gene_rsid.dropna(inplace=True, ignore_index=True) |
|
gwas_gene_rsid['MAPPED_GENE'] = gwas_gene_rsid['MAPPED_GENE'].apply(lambda x: x.replace(' ', '').upper()) |
|
|
|
|
|
ground_truth = defaultdict(list) |
|
for i in gwas_gene_rsid.index: |
|
gene = gwas_gene_rsid.loc[i, 'MAPPED_GENE'] |
|
snp = gwas_gene_rsid.loc[i, 'SNPS'] |
|
|
|
pattern = r"[,x\-]" |
|
genes = re.split(pattern, gene) |
|
snps = re.split(pattern, snp) |
|
|
|
for gene in genes: |
|
for snp in snps: |
|
ground_truth[gene].append(snp) |
|
ground_truth[snp].append(gene) |
|
|
|
return gwas, ground_truth |
|
|
|
gwas, ground_truth = generate_raw_files() |
|
|
|
def integrate(df): |
|
|
|
|
|
df_db = pd.DataFrame() |
|
for i in df.index: |
|
gene, snp = df.loc[i, 'Genes'], df.loc[i, 'rsID'] |
|
df_gwas = gwas[(gwas['MAPPED_GENE'].str.contains(gene, na=False)) & \ |
|
(gwas['SNPS'].str.contains(snp, na=False))] |
|
|
|
df_db = pd.concat([df_db, df_gwas]) |
|
|
|
|
|
df_db.rename(columns={ |
|
'DISEASE/TRAIT': 'Traits', |
|
'MAPPED_GENE': 'Genes', |
|
'SNPS': 'rsID', |
|
'P-VALUE': 'P Value', |
|
'OR or BETA': 'OR Value' |
|
}, inplace=True) |
|
df_db.drop(columns=['CHR_ID'], inplace=True, errors='ignore') |
|
df_db['Beta Value'] = df_db.get('OR Value') |
|
df_db['Source'] = 'Database' |
|
|
|
|
|
df_db = df_db.get(df.columns) |
|
df = pd.concat([df, df_db]) |
|
df.reset_index(drop=True, inplace=True) |
|
|
|
return df |
|
|