Spaces:
Sleeping
Sleeping
File size: 9,410 Bytes
2d00e5a 16939ac 2d00e5a 16939ac 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a 850fcc9 2d00e5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
import json
import nltk
import pandas as pd
import re
nltk.download('punkt')
from dataclasses import dataclass
from nltk.tokenize import sent_tokenize
from typing import Dict, List, Sequence
from utils_report_parser import get_section_from_report
from transformers import (
AutoModelForTokenClassification,
AutoTokenizer,
pipeline,
)
@dataclass
class Report:
patient_id: str | int
text: str
date: str
summary: str | None = None
def clean(s: str) -> str:
s = s.replace("\n", " ") # Concatenate into one string
s = s.replace("_", "") # Remove long lines and underscores
s = re.sub(r"\[.*?\]", "", s) # Remove brackets and parentheses
s = re.sub(r"\(.*?\)", "", s)
s = " ".join(s.split()) # Replace multiple white spaces
return s
def split_paragraphs(text: str) -> List[str]:
paragraphs = text.split("\n\n")
paragraphs = list(map(clean, paragraphs))
paragraphs = list(filter(lambda s: len(s.split()) > 10, paragraphs))
return paragraphs
def format_casemaker_data(
df: pd.DataFrame, patient_id_column: str, text_column: str, date_column: str
):
"""Take in a pandas dataframe where each row corresponds to one report for a patient,
and output a dataframe where each row corresponds to a patient, and the "records" column
contains a list of dictionaries of all their reports sorted by date
Args:
df (pd.DataFrame): Input dataframe on report level
patient_id_column (str): Patient ID
text_column (str): Text/Report
date_column (str): Date (will be used to sort)
"""
df = df.rename(
columns={
patient_id_column: "patient_id",
text_column: "text",
date_column: "date",
}
)
df = (
df.sort_values(by=["patient_id", "date"])
.groupby("patient_id")
.apply(lambda df: df[["patient_id", "text", "date"]].to_dict("records"))
)
reports_by_patient = dict[str, Sequence[Report]]()
for patient_id, report_list in zip(df.index, df):
patient_id = str(patient_id)
report_list = [Report(**report) for report in report_list]
reports_by_patient[patient_id] = report_list
return reports_by_patient
class CaseMaker:
def __init__(self, organ_keywords_dict_path: str = "../assets/terms.json"):
self.organ_keyword_dict = json.load(open(organ_keywords_dict_path, "r"))
self.ner_pipe = pipeline(
"ner",
model=AutoModelForTokenClassification.from_pretrained(
"d4data/biomedical-ner-all"
),
tokenizer=AutoTokenizer.from_pretrained("d4data/biomedical-ner-all"),
aggregation_strategy="simple",
device_map="auto",
)
# self.summ_pipe = pipeline(
# "text2text-generation", model="starmpcc/Asclepius-7B", device_map="auto"
# )
def standardize_organ(self, organ_entity: Dict) -> Dict:
"""Given an entity, map its name to a set of recognized entities provided in
organ_keyword_dict if it matches any of the keywords; otherwise set it as "Other"
Args:
organ_entity (Dict): Dictionary corresponding to entity; should contain "word" key
which is the entity
Returns:
Dict: Same dictionary where the "word" key has been updated to either a set of standard
body organs or "Other"
"""
# If the organ matches any of the keys or their synonyms, replace the name and return
for key in self.organ_keyword_dict:
if (organ_entity["word"].lower() == key.lower()) or (
organ_entity["word"].lower() in self.organ_keyword_dict[key]
):
organ_entity["word"] = key
return organ_entity
# Otherwise, it's a bad match so set the score to 0 and return other
organ_entity["word"] = "Other"
organ_entity["score"] = 0.0
return organ_entity
def pick_organ_by_keyword(self, s: str):
words = s.lower()
for organ in self.organ_keyword_dict.keys():
if any(
[
keyword.lower() in words
for keyword in [organ] + self.organ_keyword_dict[organ]
]
):
return organ
return "other"
def parse_report_by_organ(self, report: str):
"""Take in a text report and output a dictionary of body organs
and a list of all the sentences corresponding to that organ
Args:
report (str): Input report
"""
report_string_by_organ = dict[str, str]()
# Split the report into a list of paragraphs
paragraphs = split_paragraphs(report)
# Collect a list of paragraphs related to each organ
for p in paragraphs:
# Figure out which organ is being referenced
selected_organ = self.pick_organ_by_keyword(p)
# Concatenate the report to its corresponding organ
if selected_organ not in report_string_by_organ:
report_string_by_organ[selected_organ] = p
else:
report_string_by_organ[selected_organ] += p
return report_string_by_organ
def trim_to_relevant_portion(self, report: str):
# Only keep sentences with symptoms and disease descriptions
relevant_sentences = list[str]()
for sentence in sent_tokenize(report):
if any(
[
ent["entity_group"] in ["Sign_symptom", "Disease_disorder"]
for ent in self.ner_pipe(sentence)
]
):
relevant_sentences.append(str(sentence))
return "\n".join(relevant_sentences)
def summarize_report(self, text: str) -> str:
"""Format text into prompt and summarize clinical text
Args:
text (str): Input report
Returns:
str: Output
"""
question = (
"Can you provide a succinct summary of the key clinical findings "
"and treatment recommendations outlined in this discharge summary?"
)
prompt = """
You are an intelligent clinical languge model.
Below is a snippet of patient's discharge summary and a following instruction from healthcare professional.
Write a response that appropriately completes the instruction.
The response should provide the accurate answer to the instruction, while being concise.
[Discharge Summary Begin]
{note}
[Discharge Summary End]
[Instruction Begin]
{question}
[Instruction End]
""".format(
question=question, note=text
)
output = self.summ_pipe(prompt, max_new_tokens=len(text.split()) // 2)[0][
"generated_text"
]
answer = output.split("[Instruction End]")[-1]
answer = clean(answer)
return answer
def parse_records(
self,
reports: Sequence[Report],
):
"""Given a list of reports (represented by dictionaries), split each of them
by body part using parse_report_by_organ, then compile all the text for the same
organ across different reports
(i.e. for each body part, have a list of dicts which contain the text from various reports)
Args:
records (Sequence[Report]): List of reports represented by dictionaries; each dictionary
must contain "text" and "date" keys
"""
# Split the reports by organ
reports_by_organ = dict[str, Sequence[Report]]()
for report in reports:
# Cut the report to the findings
report_findings = get_section_from_report(report.text, "findings")
# For each organ, collect a list of relevant records containing the text and date
report_by_organ = self.parse_report_by_organ(report_findings)
for organ, report_text in report_by_organ.items():
organ_level_record = Report(
text=report_text, date=report.date, patient_id=report.patient_id
)
if organ in reports_by_organ:
reports_by_organ[organ].append(organ_level_record)
else:
reports_by_organ[organ] = [organ_level_record]
# For each organ, then filter only to the relevant reports and summarize them
summarized_reports_by_organ = dict[str, Sequence[Report]]()
for organ in reports_by_organ.keys():
cleaned_reports = list[Report]()
for report in reports_by_organ[organ]:
# Trim the report
report_text = self.trim_to_relevant_portion(report.text)
if report_text:
report.summary = report_text
cleaned_reports.append(report)
summarized_reports_by_organ[organ] = cleaned_reports
return summarized_reports_by_organ
def format_reports(self, all_reports: Dict[str, List[Dict]]):
new_reports = {}
for organ, organ_reports in all_reports.items():
new_reports[organ] = "\n\n".join(
[f"**Report {str(r.date)}**\n\n{str(r.summary)}" for r in organ_reports]
)
return new_reports
|