Spaces:
Running
Running
import gradio as gr | |
import random | |
import time | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from langchain.chat_models import ChatOpenAI | |
from langchain.llms import OpenAI | |
import os | |
import json | |
import openai | |
import random | |
import asyncio | |
from langchain.prompts import ( | |
ChatPromptTemplate, | |
HumanMessagePromptTemplate | |
) | |
from ast import literal_eval | |
from langchain.chat_models import ChatOpenAI | |
from langchain.schema import ( | |
HumanMessage, | |
) | |
import json | |
from langchain.chat_models import ChatOpenAI | |
from langchain.llms import OpenAI | |
from langchain.chat_models import ChatAnthropic | |
from langchain.output_parsers import PydanticOutputParser | |
from pydantic import BaseModel, Field | |
from typing import List | |
import json | |
from ast import literal_eval | |
import os | |
import openai | |
import random | |
import time | |
import copy | |
import asyncio | |
from prompts import system_structure_template, system_epics_template, story_cards_template, schema_template, entities_template, update_schema_template, check_message_template, update_story_cards_template | |
os.environ['OPENAI_API_KEY'] = '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' | |
openai.api_key = "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" | |
llm=ChatOpenAI( | |
temperature=.7, | |
model='gpt-4', | |
) | |
# SYSTEM ACTORS | |
class SystemActor(BaseModel): | |
actor: str = Field(description="name of the system actor") | |
desc: str = Field(description="role description of the system actor") | |
class SystemActors(BaseModel): | |
actors: List[SystemActor] | |
parser_actors = PydanticOutputParser(pydantic_object=SystemActors) | |
# ACTOR EPICS | |
class SystemEPIC(BaseModel): | |
epic: str = Field(description="system epic for the given actor") | |
features: List | |
class SystemEPICs(BaseModel): | |
epics: List[SystemEPIC] | |
parser_epics = PydanticOutputParser(pydantic_object=SystemEPICs) | |
class Entities(BaseModel): | |
entities: List[str] = Field(description="the entity name.") | |
entities_schema = PydanticOutputParser(pydantic_object=Entities) | |
class Attrib(BaseModel): | |
system_attrib_id: str = Field(description="the id of the system data model followed by the this property id (e.g., 54-3)") | |
system_attrib_name: str = Field(description="this property name.") | |
system_attrib_datatype: str = Field(description="this property datatype. It must only be among these values: text, number, date)") | |
class DataModel(BaseModel): | |
data_model_id: int = Field(description="an id of the corresponding data model (patient, room, booking, etc.)") | |
data_model_name: str = Field(description="the name of the system data model") | |
user_story_card_ids: List[int] = Field(description="a list of related user story card ids. Should be an empty list.") | |
data_model_attribs: List[Attrib] | |
parser_schema = PydanticOutputParser(pydantic_object=DataModel) | |
class Attrib(BaseModel): | |
system_attrib_id: str = Field(description="the id of the system data model followed by the this property id (e.g., 54-3)") | |
system_attrib_name: str = Field(description="this property name.") | |
system_attrib_datatype: str = Field(description="this property datatype. It must only be among these values: text, number, date.") | |
class DataModel(BaseModel): | |
data_model_id: int = Field(description="an id of the corresponding data model (patient, room, booking, etc.)") | |
data_model_name: str = Field(description="the name of the system data model") | |
user_story_card_ids: List[int] = Field(description="a list of related user story card ids. Should be an empty list.") | |
data_model_attribs: List[Attrib] | |
parser_schema = PydanticOutputParser(pydantic_object=DataModel) | |
class Message(BaseModel): | |
action_id: str = Field(description="Action unique ID") | |
action_case: str = Field(description="Explaination of the action case") | |
action_text: str = Field(description="The text body of the message to the user") | |
action_type: str = Field(description="Has to be one of the following: Email Message, SMS message, Alerting text") # Feature ID in data shema | |
class FlowStep(BaseModel): | |
step: int = Field(description="step number") | |
actor: str = Field(description="The actor performing the action e.g., User, System") | |
action: str = Field(description="The action description to be performed") | |
class DataField(BaseModel): | |
field_id: int = Field(description="the field id") | |
field_name: str = Field(description="the name of the field") | |
field_type: str = Field(description="the type of the field only from the following: text, number, date") | |
required: bool = Field(description="whether field is required or not") | |
default_value: str = Field(description="the default value of the field") | |
# STORY CARD | |
class StoryCard(BaseModel): | |
user_story_id: str = Field(description="Unique User Story ID") # Feature ID in data shema | |
user_story: str = Field(description="story title in the format of <as a user, I would be able to ..>") | |
affected_components: List[str] = Field(description="a list of affected components in the system by this features") | |
acceptance_criteria: List[str] = Field(description="a list of acceptance criteria") | |
preconditions: List[str] = Field(description="a list of preconditions with the corresponding acceptance criteria (if any)") | |
main_flow: List[FlowStep] = Field(description="Main Flow details the actions taken by the user and the system in a step-by-step manner. Make sure you differentiate the user and system steps.") | |
alternative_flow: List[FlowStep] = Field(description="Alternative Flow that covers the scenarios in which the main flow might fail the actions taken by the user and the system in a step-by-step manner. Make sure you differentiate the user and system steps.") | |
data_sections: List[DataField] = Field(description="A list contains Field Name, Field type, Required or not required or filed by the system and you can’t edit it, Default value, Note explains the filed input data type. You must use the provided schema to generate data section.") | |
messages: List[Message] = Field(description="A mandatory field that contains a list of system messages (both succesful and unsuccesfull flows) that result from the main flow.") | |
related_data_shema_ids: List[int] = Field(description="A list of IDs contains relevant Schema Entity IDs for each entity used in the story card, Only show the Entity IDs. Rely on the privded system schema only.") | |
parser_cards = PydanticOutputParser(pydantic_object=StoryCard) | |
def str_to_json(text): | |
updated_schemas = [] | |
try: | |
print("GGGGGGGGGGGGG") | |
updated_schemas.append(parser_schema.parse(text).dict()) | |
updated_schemas = check_schema(updated_schemas) | |
print("GGGGGGGGGGGGG") | |
except: | |
if '```' in text: | |
resp = text.split('```')[1] # Assume only one json object in the response | |
resp = resp.replace('json', ' ').strip() # Remove the word json that comes after ``` | |
if resp[0] == '[' and resp[-1] == ']': # If it has brackets, i.e. multiple instances | |
for obj in literal_eval(resp): # For each instance | |
updated_schemas.append(eval(str(obj))) # Add schema to updated schemas | |
else: # If it does not have brackets, i.e. multiple instances | |
obj = literal_eval(resp) | |
updated_schemas.append(eval(str(obj))) # Add schema to updated schemas | |
updated_schemas = check_schema(updated_schemas) | |
else: | |
print('unable to parse...') | |
return updated_schemas | |
def update_data_schema(schemas, card, file_path): | |
for s in schemas: | |
print(f'processing {s["name"]}') | |
if s['schema']['data_model_id'] in card['related_data_shema_ids']: | |
if card['user_story_id'] not in s['schema']['user_story_card_ids']: | |
print(f"adding id to {s['name']}") | |
s['schema']['user_story_card_ids'].append(card['user_story_id']) | |
with open(file_path, 'w') as json_file: | |
json.dump(schemas, json_file) | |
def ensure_max_three_actors(actors_list): | |
print("hi inside three actor") | |
return actors_list[:3] | |
def ensure_max_three_entities(data): | |
if 'entities' in data: | |
data['entities'] = data['entities'][:3] | |
return data | |
def updating_cards(card, card_path, list_cards_path, list_of_cards=None): | |
if card is None: | |
print("card is non case") | |
with open(list_cards_path, 'w') as json_file: | |
json.dump(list_of_cards, json_file) | |
else: | |
print(card, card_path, list_cards_path, list_of_cards) | |
with open(card_path, 'w') as json_file: | |
json.dump(card, json_file) | |
if os.path.exists(list_cards_path): | |
with open(list_cards_path, 'r') as json_file: | |
list_cards = json.load(json_file) | |
list_cards.append(card) | |
with open(list_cards_path, 'w') as json_file: | |
json.dump(list_cards, json_file) | |
else: | |
with open(list_cards_path, 'w') as json_file: | |
json.dump([card], json_file) | |
def check_schema(updated_schemas): | |
if type(updated_schemas)==dict: | |
u_shem = {} | |
if 'name' not in updated_schemas.keys(): | |
u_shem['name'] = updated_schemas['data_model_name'] | |
u_shem['schema'] = copy.deepcopy(updated_schemas) | |
return u_shem | |
elif type(updated_schemas)==list: | |
u_shem = [] | |
for updated_schema in updated_schemas: | |
temp_shem = {} | |
if 'name' not in updated_schema.keys(): | |
temp_shem['name'] = updated_schema['data_model_name'] | |
temp_shem['schema'] = copy.deepcopy(updated_schema) | |
u_shem.append(temp_shem) | |
else: | |
u_shem.append(updated_schema) | |
return u_shem | |
return updated_schemas | |
def logs(log,filePath): | |
if os.path.exists(filePath): | |
with open(filePath, 'r') as json_file: | |
logs = json.load(json_file) | |
logs.append(log) | |
with open(filePath, 'w') as json_file: | |
json.dump(logs, json_file) | |
else: | |
with open(filePath, 'w') as json_file: | |
json.dump([log], json_file) | |
def delete(json_file): | |
if os.path.exists(json_file): | |
os.remove(json_file) | |
print(f"The file {json_file} has been deleted.") | |
else: | |
print("The file does not exist.") | |
def check_None(the_list): | |
while None in the_list: | |
the_list.remove(None) | |
return the_list | |
def delete(json_file): | |
if os.path.exists(json_file): | |
os.remove(json_file) | |
print(f"The file {json_file} has been deleted.") | |
else: | |
print("The file does not exist.") | |
def is_all_english_alpha_numeric_or_space(text): | |
allowed_chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \".,:'!?+=" | |
return all(char in allowed_chars for char in text) | |
def Validating_Input(text): | |
""" | |
Check if the text contains "GPT" as a prefix in any substring, regardless of case. | |
Also, checks for the presence of "OpenAI" or "Open AI". | |
Parameters: | |
- text (str): The text to search within. | |
Returns: | |
- bool: True if the "GPT" or "OpenAI" conditions are met, False otherwise. | |
""" | |
text_lower = text.lower() | |
print("HI FROM VALIDATING INPUT =========") | |
# Check if "GPT" is a prefix in any substring | |
if "gpt" in text_lower: | |
return True | |
# Check for the "OpenAI" condition | |
if "openai" in text_lower or "open ai" in text_lower: | |
return True | |
if "llm" in text_lower or "llms" in text_lower: | |
return True | |
print("HI FROM VALIDATING INPUT RETURNNING FALSE =========") | |
return False | |
def check_message(user_message): | |
llm=ChatOpenAI( | |
temperature=.7, | |
model='gpt-4', | |
) | |
prompt = check_message_template.format_prompt( | |
message=user_message, | |
) | |
response = llm(prompt.to_messages()) | |
return response.content.lower() | |
def apologise(step): | |
if step ==0: | |
return "Your request is unclear. Please provide additional information or try using different terms.\n\n" | |
if step ==1: | |
return "Oops! Your input is a bit ambiguous. Can you please provide more clarity or details?\n\n" | |
if step ==2: | |
return "We're not sure what you meant by that. Could you rephrase or be more specific?\n\n" | |
if step ==3: | |
return "Sorry, we didn't quite get that. Can you clarify or provide more context?" | |
def apologise_lang(step): | |
if step ==0: | |
return "Apologies, but at this moment, we only support English for 'فزاع'. We appreciate your understanding.\n\n" | |
if step ==1: | |
return "Thank you for your interest in 'فزاع'. Currently, we are offering support exclusively in English.\n\n" | |
if step ==2: | |
return "Please note that 'فزاع' is presently supported in English only. We're working to expand our language offerings in the future.\n\n" | |
if step ==3: | |
return "We value our diverse user base, but as of now, our support for 'فزاع' is limited to English. Thanks for your patience.\n\n" | |
def first_prompt(system_name): | |
prompt = system_structure_template.format_prompt( | |
system_name= system_name, | |
format_instructions= parser_actors.get_format_instructions() | |
) | |
response = llm(prompt.to_messages()) | |
return response | |
async def aget_epics(actor_obj, system_name): | |
print(f'processing actor {actor_obj["actor"]}') | |
prompt = system_epics_template.format_prompt( | |
system_name= system_name, | |
format_instructions= parser_epics.get_format_instructions(), | |
system_actor = actor_obj['actor'], | |
actor_description = actor_obj['desc'] | |
) | |
response = await llm.apredict(prompt.to_string()) | |
aepics = parser_epics.parse(response).dict() | |
aepics['system_name'] = system_name | |
aepics['system_actor'] = actor_obj['actor'] | |
aepics['actor_description'] = actor_obj['desc'] | |
return aepics | |
async def second_prompt(response, system_name): | |
actors = parser_actors.parse(response.content).dict()['actors'] | |
actors=ensure_max_three_actors(actors) | |
epics = await asyncio.gather(*[aget_epics(actor,system_name) for actor in actors]) | |
with open('epics_gradio.json', 'w') as json_file: | |
json.dump(epics, json_file) | |
return epics | |
def third_prompt(epics): | |
prompt = entities_template.format_prompt( | |
system_context = str(epics), | |
format_instructions= entities_schema.get_format_instructions(), | |
) | |
response = llm(prompt.to_messages()) | |
entities = entities_schema.parse(response.content).dict() | |
entities = ensure_max_three_entities(entities) | |
with open('entities_gradio.json', 'w') as json_file: | |
json.dump(entities, json_file) | |
return entities['entities'] | |
async def fetch_schemas(entity,epics ): | |
try: | |
print(f" Generating {entity}") | |
prompt = schema_template.format_prompt( | |
system_entity= entity, | |
system_context = str(epics), | |
format_instructions= parser_schema.get_format_instructions(), | |
) | |
response = await llm.apredict_messages(messages=prompt.to_messages()) | |
schema = parser_schema.parse(response.content).dict() | |
return check_schema(schema) | |
except Exception as e: | |
print(f"Error processing"+str(e)) | |
return None | |
async def fourth_prompt(entities,epics): | |
all_schemas = [] | |
all_schemas = await asyncio.gather(*[fetch_schemas(entity,epics) for entity in entities]) | |
# all_schemas = check_None(all_schemas) | |
for ind,s in enumerate(all_schemas): | |
s['schema']['data_model_id'] = (ind+1) | |
with open('schemas_gradio.json', 'w') as json_file: | |
json.dump(all_schemas, json_file) | |
logs(all_schemas,'schemas_logs.json') | |
return all_schemas | |
def json_to_markdown_table(data): | |
md_table = "| System Name | System Actor | Actor Description | Epic | Features |\n" | |
md_table += "|------------|--------------|--------------------|------|----------|\n" | |
for entry in data: | |
system_name = entry['system_name'] | |
system_actor = entry['system_actor'] | |
actor_description = entry['actor_description'] | |
for epic in entry['epics']: | |
epic_name = epic['epic'] | |
features = ', '.join(epic['features']) | |
md_table += f"| {system_name} | {system_actor} | {actor_description} | {epic_name} | {features} |\n" | |
return md_table | |
def markdown(file_name): | |
with open(file_name, 'r') as json_file: | |
json_data = json.load(json_file) | |
return json_to_markdown_table(json_data) | |
def create_user_story(first = False): | |
if not first: | |
return "\n\n\n Please write a short description for a new user story card\n\n\n" | |
return "\n\n\n Please write a short description for a user story card\n\n\n" | |
def affected_parts(user_request): | |
with open('schemas_gradio.json', 'r') as json_file: | |
schemas = json.load(json_file) | |
with open('epics_gradio.json', 'r') as json_file: | |
epics = json.load(json_file) | |
prompt = update_schema_template.format_prompt( | |
schema= str(schemas), | |
system_context = str(epics), | |
new_usecase = user_request, | |
format_instructions= parser_schema.get_format_instructions(), | |
) | |
response = llm(prompt.to_messages()) | |
updated_schemas = str_to_json(response.content) | |
unlucky = [] | |
updated_schemas = check_schema(updated_schemas) | |
for u in updated_schemas: | |
check_var = True | |
for s in schemas: | |
print(s,u) | |
if s['name'] == u['name']: | |
s['schema'] = u['schema'] | |
check_var = False | |
if check_var: | |
unlucky.append(u) | |
for u in unlucky: | |
schemas.append(u) | |
with open('schemas_updated_part_gradio.json', 'w') as json_file: | |
json.dump(updated_schemas, json_file) | |
logs(updated_schemas,"schemas_updated_part_logs.json") | |
with open('schemas_gradio.json', 'w') as json_file: | |
json.dump(schemas, json_file) | |
logs(schemas,'schemas_logs.json') | |
def generate_story_cards(user_request): | |
with open('schemas_gradio.json', 'r') as json_file: | |
schemas = json.load(json_file) | |
with open('epics_gradio.json', 'r') as json_file: | |
epics = json.load(json_file) | |
prompt = story_cards_template.format_prompt( | |
format_instructions= parser_cards.get_format_instructions(), | |
schema = str(schemas), | |
system_epic = str(epics), | |
epic_feature = user_request # use the same as above | |
) | |
response = llm(prompt.to_messages()) | |
card = parser_cards.parse(response.content).dict() | |
updating_cards(card, 'card_gradio.json', 'cards_gradio.json') | |
logs(card,'cards_logs.json') | |
update_data_schema(schemas, card, 'schemas_gradio.json') | |
logs(schemas,'schemas_logs.json') | |
return card | |
def check_affected_story_ids(schemas_affected_part_path = 'schemas_updated_part_gradio.json'): | |
with open(schemas_affected_part_path, 'r') as json_file: | |
schemas_affected_part = json.load(json_file) | |
story_ids = [] | |
for item in schemas_affected_part: | |
for userStory_id in item['schema']['user_story_card_ids']: | |
if userStory_id not in story_ids: | |
story_ids.append(userStory_id) | |
logs(json.dumps([{'Story_ids': story_ids}]).replace('\"',''),"Story_ids_logs.json") | |
return story_ids | |
async def fetch_cards(cards, card, schemas, ix, story_ids): | |
print(f"Card details {card}") | |
try: | |
if card['user_story_id'] not in story_ids: | |
print(f'skipping ..') | |
return | |
prompt = update_story_cards_template.format_prompt( | |
format_instructions= parser_cards.get_format_instructions(), | |
schema = str(schemas), | |
story_card = str(card), | |
) | |
response = await llm.apredict_messages(messages=prompt.to_messages()) | |
try: | |
new_card = parser_cards.parse(response.content).dict() | |
except Exception as e: | |
print(e) | |
new_card = json.loads(response.content) | |
print(new_card) | |
# override old card | |
cards[ix] = new_card | |
print("Affected User Story: \n\n", new_card) | |
return new_card | |
except Exception as e: | |
print(f"Error processing"+str(e)) | |
return None | |
async def update_affected_story_cards(story_ids): | |
with open('schemas_gradio.json', 'r') as json_file: | |
schemas = json.load(json_file) | |
with open('cards_gradio.json', 'r') as json_file: | |
cards = json.load(json_file) | |
updated_cards = [] | |
updated_cards = await asyncio.gather(*[fetch_cards(cards, card, schemas, ix, story_ids) for ix, card in enumerate(cards)]) | |
updated_cards = check_None(updated_cards) | |
logs(cards,'affected_Cards_logs.json') | |
updating_cards(card = None, card_path = None, list_cards_path = 'cards.json', list_of_cards=cards) | |
return updated_cards | |
def user_story_to_markdown(story): | |
markdown = "" | |
# Title and ID | |
markdown += f"## User Story {story['user_story_id']}\n\n" | |
# Table headers and data | |
markdown += "| Attribute | Information |\n" | |
markdown += "| --------- | ----------- |\n" | |
markdown += f"| User Story | {story['user_story']} |\n" | |
markdown += f"| Affected Components | {', '.join(story['affected_components'])} |\n" | |
markdown += f"| Acceptance Criteria | {'<br>'.join(story['acceptance_criteria'])} |\n" | |
markdown += f"| Preconditions | {'<br>'.join(story['preconditions'])} |\n" | |
main_flow = '<br>'.join([f"{step['step']}. **{step['actor']}** - {step['action']}" for step in story['main_flow']]) | |
markdown += f"| Main Flow | {main_flow} |\n" | |
alt_flow = '<br>'.join([f"{step['step']}. **{step['actor']}** - {step['action']}" for step in story['alternative_flow']]) | |
markdown += f"| Alternative Flow | {alt_flow} |\n" | |
data_sections = '<br>'.join([f"**{section['field_name']}** (Type: {section['field_type']}, Required: {'Yes' if section['required'] else 'No'}, Default: {section['default_value']})" for section in story['data_sections']]) | |
markdown += f"| Data Sections | {data_sections} |\n" | |
messages = '<br>'.join([f"**{msg['action_case']}** - {msg['action_text']} ({msg['action_type']})" for msg in story['messages']]) | |
markdown += f"| Messages | {messages} |\n" | |
markdown += f"| Related Data Schema IDs | {', '.join(map(str, story['related_data_shema_ids']))} |\n" | |
return markdown | |
def choose(): | |
return "\n\n\n\nPlease Choose one of the following options:\n\n 1- Adding user story to your existing system.\n\n 2- Update existing user story.\n\n\n" | |
for file in os.listdir(): | |
if file.endswith('_gradio.json'): | |
delete(file) | |