Spaces:
Sleeping
Sleeping
import streamlit as st | |
import json | |
from peft import AutoPeftModelForCausalLM | |
from transformers import GenerationConfig, AutoTokenizer | |
import torch | |
import re | |
def process_data_sample(example): | |
# Convert the 'Instruction' dictionary to a JSON string if it's not already a string | |
instruction = json.dumps(example.get("Instruction", {})) if isinstance(example.get("Instruction"), dict) else example.get("Instruction", "No Instruction Provided") | |
processed_example = "You have to generate api developer documentation json object which helps the user to create api documentation. ### Instruction : " + instruction + ". ### Response :" | |
return processed_example | |
def processing_ouput(model_response): | |
pattern = r"### Response :(.*)" | |
# Find the first match | |
match = re.search(pattern, model_response, re.DOTALL) | |
# Extract and process the match | |
extracted_response = '' | |
if match: | |
extracted_response = match.group(1).strip() | |
print(extracted_response) | |
original_json_str = extracted_response | |
original_json = json.loads(original_json_str) | |
# New JSON structure | |
new_json = { | |
"Name": "API Name will come here", | |
"Endpoint": original_json["Endpoint"], | |
"Method": original_json["Method"], | |
"Description": original_json["Description"], | |
"Headers": original_json["Headers"], | |
"Request_Body": { | |
# Assuming this information needs to be manually added or transformed | |
"ProductID": "Unique identifier of the product for which the price is to be updated.", | |
"NewPrice": "New price to be set for the specified product." | |
}, | |
"Response_Body": original_json["Response_Body"], | |
"Steps_To_Use": original_json["Steps_To_Use"], | |
"Edge_Cases": { | |
# Assuming this information needs to be manually added or transformed | |
"Invalid_ProductID": "If the specified product ID is invalid, an error message will be returned.", | |
"Negative_Price": "If the new price is negative, an error message will be returned." | |
}, | |
"Exceptions": original_json["Exceptions"], | |
"Usage_Examples": original_json["Usage_Examples"] | |
} | |
# Convert the new JSON object to a string | |
new_json_str = json.dumps(new_json, indent=4) | |
return new_json_str | |
def model_function(input_data): | |
model_response = input_data | |
tokenizer = AutoTokenizer.from_pretrained("Shubhang999/shu3") | |
inp_str = process_data_sample( | |
{ | |
"Instruction": input_data, | |
} | |
) | |
inputs = tokenizer(inp_str, return_tensors="pt").to("cuda") | |
model = AutoPeftModelForCausalLM.from_pretrained( | |
"Shubhang999/shu3", | |
low_cpu_mem_usage=True, | |
return_dict=True, | |
torch_dtype=torch.float16, | |
device_map="cuda") | |
generation_config = GenerationConfig( | |
do_sample=True, | |
top_k=1, | |
temperature=0.1, | |
max_new_tokens=800, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
outputs = model.generate(**inputs, generation_config=generation_config) | |
#print(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
processed_ouput = processing_ouput(outputs) | |
return processed_ouput | |
# Streamlit UI code | |
def key_value_input(container, label, key, value, index): | |
col1, col2 = container.columns(2) | |
# Use a simple text_input for keys in Request Body and Response Object | |
if label in ["Request Body", "Response Object"]: | |
key_input = col1.text_input(f"{label} Key {index}", key, key=f"{label}_key_{index}") | |
else: | |
# Existing logic for Request Header with common headers | |
common_headers = [ | |
"Host", "User-Agent", "Accept", "Accept-Language", | |
"Accept-Encoding", "Connection", "Referer", "Cookie", | |
"Authorization", "Cache-Control", "Content-Type","API-Key" | |
] | |
key_input = col1.selectbox(f"{label} Key {index}", options=[''] + common_headers, index=common_headers.index(key) if key in common_headers else 0, key=f"{label}_key_{index}") | |
value_input = col2.text_input(f"{label} Value {index}", value, key=f"{label}_value_{index}") | |
return key_input, value_input | |
def dynamic_key_value_pairs(label): | |
container = st.container() | |
all_pairs = [] | |
if label not in st.session_state: | |
st.session_state[label] = [{'key': '', 'value': ''}] | |
for i, pair in enumerate(st.session_state[label]): | |
key, value = key_value_input(container, label, pair['key'], pair['value'], i) | |
all_pairs.append((key, value)) | |
if container.button(f"Add more to {label}"): | |
st.session_state[label].append({'key': '', 'value': ''}) | |
return all_pairs | |
# Streamlit UI layout | |
st.title('API Documentation Generator') | |
# Text input for API Endpoint | |
api_endpoint = st.text_input("API Endpoint", "https://example.com/api") | |
# Dropdown for API Method | |
api_methods = ["GET", "POST", "PUT", "DELETE", "PATCH"] | |
api_method = st.selectbox("API Method", api_methods) | |
# Dynamic key-value pairs for Request Header, Body, and Response Object | |
request_header_pairs = dynamic_key_value_pairs("Request Header") | |
request_body_pairs = dynamic_key_value_pairs("Request Body") | |
response_object_pairs = dynamic_key_value_pairs("Response Object") | |
# Button to Generate Documentation | |
if st.button('Generate Documentation'): | |
request_header = {k: v for k, v in request_header_pairs if k and v} | |
request_body = {k: v for k, v in request_body_pairs if k and v} | |
response_object = {k: v for k, v in response_object_pairs if k and v} | |
user_input = { | |
"API_Endpoint": api_endpoint, | |
"API_Method": api_method, | |
"Request_Object": request_header, | |
"Response_Object": response_object | |
} | |
# Call the model function with the processed input | |
documentation = model_function(user_input) | |
# Display the model output on the UI | |
st.write(documentation) | |