Spaces:
Runtime error
Runtime error
import asyncio | |
import csv | |
import glob | |
import json | |
import shutil | |
from datetime import datetime | |
from typing import Optional | |
from langchain import PromptTemplate | |
from langchain.chains import LLMChain, MapReduceChain | |
from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain, ReduceDocumentsChain | |
from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.document_loaders import DirectoryLoader, UnstructuredHTMLLoader | |
from langchain.output_parsers import PydanticOutputParser | |
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, Language | |
from pathvalidate import sanitize_filename | |
from pydantic import BaseModel, Field | |
from tqdm import tqdm | |
import os | |
class Grader: | |
def __init__(self, model): | |
print("Setting up environment for grading") | |
os.environ["LANGCHAIN_TRACING"] = "true" | |
self.title = None | |
self.model = model | |
self.rubric_file = 'docs/rubric_data.json' | |
self.discussions_file_path = "docs/discussion_entries.json" | |
self.fieldnames = ['student_name', 'total_score', 'student_feedback', 'grader_comments', 'summary'] | |
self.docs = self.get_html_files() | |
self.llm = ChatOpenAI(temperature=0, model_name=model) | |
self.parser: PydanticOutputParser = self.create_parser() | |
self.rubric_text = self.create_rubric_text() | |
self.prompt = self.create_prompt() | |
self.splitter = None | |
self.tokens = self.get_num_tokens() | |
self.llm_chain = self.create_llm_chain(model) | |
self.csv = self.get_csv_file_name() | |
self.outputs = [] | |
self.completed = 0 | |
self.lock = asyncio.Lock() | |
class ToolArgsSchema(BaseModel): | |
student_name: Optional[str] = Field(description="The name of the student") | |
total_score: int = Field(description="The grade of the student's answer") | |
student_feedback: Optional[str] = Field( | |
description="The developmental feedback from Grader's point of view to the student, some examples are: 'Great work, ...', 'Although, your submission is relevant to the question, it doesn't answer the question entirely...'. Give customized feedback based on student's answer") | |
grader_comments: Optional[str] = Field( | |
description="The grade split breakup based on rubric added as grader's one liner customized comments to explain how the grade was calculated for that particular student's answer") | |
summary: Optional[str] = Field( | |
description="The overall summary of the student's answer outlining key points from the student's answer based on the rubric which can be used as a portion of a vectorstore, used to answer summary based questions about all the discussions") | |
class Config: | |
schema_extra = { | |
"required": ["student_name", "total_score", "student_feedback", "grader_comments", "summary"] | |
} | |
def create_parser(self): | |
# print("in parser") | |
return PydanticOutputParser(pydantic_object=self.ToolArgsSchema) | |
def create_rubric_text(self): | |
with open(self.rubric_file, 'r') as file: | |
rubric = json.load(file) | |
rubric_text = [] | |
self.title = None # Initialize title | |
for r in rubric: | |
if 'description' in r and 'ratings' in r: | |
rubric_text.append(f"description:{r['description']}\n" + "\n".join( | |
[f"points:{rating['points']} points: {rating['description']}" for rating in r['ratings']])) | |
elif 'points_possible' in r: | |
print("added points_possible") | |
elif 'title' in r: # Check if title exists in rubric | |
self.title = r['title'] # Save title for later use | |
rubric_text.append(f"title:{self.title}") | |
elif 'instruction' in r: | |
rubric_text.append(f"instruction:{r['instruction']}") | |
rubric_text = "\n".join(rubric_text) | |
# print(rubric_text) Add this to log when moving to application | |
return rubric_text | |
def create_map_prompt(self): | |
map_template_string = f"""I am an expert concise Canvas Discussion Summarizer! I am here to concisely summarize the following sections of a long canvas discussion responses of this student on the basis of instructions and rubric provided. | |
The aim is to capture the important and key points on the basis of instructions and rubric provided and create a short summary, so that grading can be done on all the summarized sections of canvas discussion of a student's response. | |
-------------------- | |
Following is the canvas instruction and rubric: | |
{self.rubric_text} | |
-------------------- | |
I will summarize this extracted part of a long canvas discussion: | |
{{input_documents}} | |
""" | |
return PromptTemplate(template=map_template_string, input_variables=["input_documents"]) | |
def create_reduce_prompt(self): | |
reduce_template_string = f"""I am a Canvas Discussion Grader! I am here to grade the following summarized sections of canvas discussion responses of the student on the basis of instructions and rubric provided. | |
-------------------- | |
To grade student discussion, I will follow the rubric below. I will not deviate from the grading scheme. | |
{self.rubric_text} | |
-------------------- | |
I will be able to identify each student by name, their key interests, key features pertinent to the discussion intruction and rubric. | |
I will be able to summarize the entire discussion in concise manner including key points from each student's answer. | |
-------------------- | |
I will grade the following summarized canvas discussion: {{input_documents}} | |
-------------------- | |
My grading results will ALWAYS be in following format: | |
Format instructions: {{format_instructions}} | |
""" | |
return PromptTemplate( | |
template=reduce_template_string, | |
input_variables=["input_documents"], | |
output_parser=self.parser, | |
partial_variables={"format_instructions": self.parser.get_format_instructions()} | |
) | |
def create_map_llm_chain(self): | |
print("Ready to grade!") | |
map_llm_chain = LLMChain( | |
llm=self.llm, | |
prompt=self.map_prompt, | |
verbose=True, | |
) | |
return map_llm_chain | |
def create_reduce_llm_chain(self): | |
reduce_llm_chain = LLMChain( | |
llm=self.llm, | |
prompt=self.reduce_prompt, | |
verbose=True, | |
) | |
return reduce_llm_chain | |
async def process_file(self, file, pbar): | |
if self.model == 'gpt-4': | |
await asyncio.sleep(10) # Add a 3-second delay before each request | |
result = await self.llm_chain.arun(file) | |
output: self.ToolArgsSchema = self.parser.parse(result) | |
async with self.lock: | |
self.completed += 1 | |
pbar.update(1) | |
return result | |
async def run_chain(self): | |
print("Grading Started! Now sit back and get a coffee \u2615") | |
total = len(self.docs) | |
pbar = tqdm(total=total) | |
# if model is gpt-4, batch size is 2, else batch size is 5 | |
batch_size = 2 if self.model == 'gpt-4' else 5 | |
batches = [self.docs[i:i + batch_size] for i in range(0, len(self.docs), batch_size)] | |
for batch in batches: | |
tasks = [self.process_file(file, pbar) for file in batch] | |
results = await asyncio.gather(*tasks) | |
for result in results: | |
output: self.ToolArgsSchema = self.parser.parse(result) | |
self.outputs.append(output) | |
if self.model == 'gpt-4': | |
await asyncio.sleep(3) # Add a delay between each batch | |
pbar.close() | |
self.save_csv() | |
return True | |
def create_csv(self): | |
# remove existing csvs in output folder | |
if os.path.exists('output'): | |
shutil.rmtree('output') | |
os.mkdir('output') | |
now = datetime.now() # current date and time | |
date_time = now.strftime("%m-%d-%Y_%H-%M-%S") | |
if self.title: # If title exists, use it in the filename | |
file_name = f"{self.title}-{self.llm.model_name}-{date_time}.csv" | |
else: # If title doesn't exist, use 'output' in the filename | |
file_name = f"output-{self.llm.model_name}-{date_time}.csv" | |
# Sanitize the entire filename | |
sanitized_file_name = sanitize_filename(file_name) | |
sanitized_file_name = os.path.join('output', sanitized_file_name) | |
with open(sanitized_file_name, 'w', newline='') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames) | |
writer.writeheader() | |
return sanitized_file_name | |
def save_csv(self): | |
# Use the filename created in create_csv method | |
self.csv = self.create_csv() | |
with open(self.csv, 'a', newline='') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames) | |
rows = [output.dict() for output in self.outputs] # Convert each output to a dictionary | |
writer.writerows(rows) # Write all rows to the CSV | |
print(f"Saved grades for {len(self.outputs)} students in {self.csv}") | |
return True | |
return False | |
def get_html_files(self): | |
loader = DirectoryLoader('docs', glob="**/*.html", loader_cls=UnstructuredHTMLLoader, recursive=True) | |
document_list = loader.load() | |
for document in document_list: | |
document.metadata["name"] = document.metadata["source"].split("/")[-1].split(".")[0] | |
break | |
return document_list | |
def create_prompt(self): | |
# print("in prompt") | |
prompt_template = f"""I am a Canvas Discussion Grader! I am here to grade the following canvas discussion on the basis of instructions and rubric provided. | |
To grade student discussion, I will follow the rubric below. I will not deviate from the grading scheme. | |
{self.rubric_text} | |
I will be able to identify each student by name, identify their key interests, key features of the responses pertinent to the discussion intruction and rubric. | |
I will be able to summarize the entire discussion in concise manner including key points from each student's answer. | |
I will grade the following canvas discussion: {{input_documents}} | |
My grading results will ALWAYS be in following format: | |
Format instructions: {{format_instructions}} | |
""" | |
return PromptTemplate(template=prompt_template, input_variables=["input_documents"], output_parser=self.parser, | |
partial_variables={"format_instructions": self.parser.get_format_instructions()}) | |
def create_llm_chain(self, model): | |
print("Ready to grade!") | |
return LLMChain( | |
llm=self.llm, | |
prompt=self.prompt, | |
) | |
def get_num_tokens(self): | |
total_tokens = 0 | |
for doc in self.docs: | |
summary_prompt = self.prompt.format(input_documents=doc) | |
num_tokens = self.llm.get_num_tokens(summary_prompt) | |
total_tokens += num_tokens | |
# summary = self.llm(summary_prompt) | |
# print (f"Summary: {summary.strip()}") | |
# print ("\n") | |
return total_tokens | |
def get_csv_file_name(self): | |
output_dir = 'output' | |
if os.path.exists(output_dir): | |
csv_files = glob.glob(os.path.join(output_dir, '*.csv')) | |
if csv_files: | |
return csv_files[0] # return the first csv file found | |
return None | |
def run(model): | |
grader = Grader(model) | |
asyncio.run(grader.run_chain()) | |
print("Grading successful") | |