RAGOndevice / app.py
cutechicken's picture
Update app.py
cf528b4 verified
raw
history blame
15.4 kB
import os
from dotenv import load_dotenv
import gradio as gr
import pandas as pd
import json
from datetime import datetime
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import spaces
from threading import Thread
# ν™˜κ²½ λ³€μˆ˜ μ„€μ •
HF_TOKEN = os.getenv("HF_TOKEN")
MODEL_ID = "CohereForAI/c4ai-command-r7b-12-2024"
class ModelManager:
def __init__(self):
self.tokenizer = None
self.model = None
def ensure_model_loaded(self):
if self.model is None or self.tokenizer is None:
self.setup_model()
@spaces.GPU
def setup_model(self):
try:
print("ν† ν¬λ‚˜μ΄μ € λ‘œλ”© μ‹œμž‘...")
self.tokenizer = AutoTokenizer.from_pretrained(
MODEL_ID,
use_fast=True,
token=HF_TOKEN,
trust_remote_code=True
)
if not self.tokenizer.pad_token:
self.tokenizer.pad_token = self.tokenizer.eos_token
print("ν† ν¬λ‚˜μ΄μ € λ‘œλ”© μ™„λ£Œ")
print("λͺ¨λΈ λ‘œλ”© μ‹œμž‘...")
self.model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
token=HF_TOKEN,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True,
low_cpu_mem_usage=True
)
print("λͺ¨λΈ λ‘œλ”© μ™„λ£Œ")
except Exception as e:
print(f"λͺ¨λΈ λ‘œλ”© 쀑 였λ₯˜ λ°œμƒ: {e}")
raise Exception(f"λͺ¨λΈ λ‘œλ”© μ‹€νŒ¨: {e}")
@spaces.GPU
def generate_response(self, messages, max_tokens=4000, temperature=0.7, top_p=0.9):
try:
# λͺ¨λΈμ΄ λ‘œλ“œλ˜μ–΄ μžˆλŠ”μ§€ 확인
self.ensure_model_loaded()
# μž…λ ₯ ν…μŠ€νŠΈ μ€€λΉ„
prompt = ""
for msg in messages:
role = msg["role"]
content = msg["content"]
if role == "system":
prompt += f"System: {content}\n"
elif role == "user":
prompt += f"Human: {content}\n"
elif role == "assistant":
prompt += f"Assistant: {content}\n"
prompt += "Assistant: "
# ν† ν¬λ‚˜μ΄μ§• 및 device μ„€μ •
inputs = self.tokenizer(
prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=4096
)
# λͺ¨λ“  ν…μ„œλ₯Ό GPU둜 이동
input_ids = inputs.input_ids.to(self.model.device)
attention_mask = inputs.attention_mask.to(self.model.device)
# 생성
with torch.no_grad():
outputs = self.model.generate(
input_ids=input_ids,
attention_mask=attention_mask,
max_new_tokens=max_tokens,
do_sample=True,
temperature=temperature,
top_p=top_p,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
num_return_sequences=1
)
# λ””μ½”λ”© 전에 CPU둜 이동
outputs = outputs.cpu()
generated_text = self.tokenizer.decode(
outputs[0][input_ids.shape[1]:],
skip_special_tokens=True
)
# 단어 λ‹¨μœ„λ‘œ 슀트리밍
words = generated_text.split()
for word in words:
yield type('Response', (), {
'choices': [type('Choice', (), {
'delta': {'content': word + " "}
})()]
})()
except Exception as e:
print(f"응닡 생성 쀑 였λ₯˜ λ°œμƒ: {e}")
raise Exception(f"응닡 생성 μ‹€νŒ¨: {e}")
class ChatHistory:
def __init__(self):
self.history = []
self.history_file = "/tmp/chat_history.json"
self.load_history()
def add_conversation(self, user_msg: str, assistant_msg: str):
conversation = {
"timestamp": datetime.now().isoformat(),
"messages": [
{"role": "user", "content": user_msg},
{"role": "assistant", "content": assistant_msg}
]
}
self.history.append(conversation)
self.save_history()
def format_for_display(self):
formatted = []
for conv in self.history:
formatted.append([
conv["messages"][0]["content"],
conv["messages"][1]["content"]
])
return formatted
def get_messages_for_api(self):
messages = []
for conv in self.history:
messages.extend([
{"role": "user", "content": conv["messages"][0]["content"]},
{"role": "assistant", "content": conv["messages"][1]["content"]}
])
return messages
def clear_history(self):
self.history = []
self.save_history()
def save_history(self):
try:
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"νžˆμŠ€ν† λ¦¬ μ €μž₯ μ‹€νŒ¨: {e}")
def load_history(self):
try:
if os.path.exists(self.history_file):
with open(self.history_file, 'r', encoding='utf-8') as f:
self.history = json.load(f)
except Exception as e:
print(f"νžˆμŠ€ν† λ¦¬ λ‘œλ“œ μ‹€νŒ¨: {e}")
self.history = []
# μ „μ—­ μΈμŠ€ν„΄μŠ€ 생성
chat_history = ChatHistory()
model_manager = ModelManager()
def analyze_file_content(content, file_type):
"""Analyze file content and return structural summary"""
if file_type in ['parquet', 'csv']:
try:
lines = content.split('\n')
header = lines[0]
columns = header.count('|') - 1
rows = len(lines) - 3
return f"πŸ“Š 데이터셋 ꡬ쑰: {columns}개 컬럼, {rows}개 데이터"
except:
return "❌ 데이터셋 ꡬ쑰 뢄석 μ‹€νŒ¨"
lines = content.split('\n')
total_lines = len(lines)
non_empty_lines = len([line for line in lines if line.strip()])
if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']):
functions = len([line for line in lines if 'def ' in line])
classes = len([line for line in lines if 'class ' in line])
imports = len([line for line in lines if 'import ' in line or 'from ' in line])
return f"πŸ’» μ½”λ“œ ꡬ쑰: {total_lines}쀄 (ν•¨μˆ˜: {functions}, 클래슀: {classes}, μž„ν¬νŠΈ: {imports})"
paragraphs = content.count('\n\n') + 1
words = len(content.split())
return f"πŸ“ λ¬Έμ„œ ꡬ쑰: {total_lines}쀄, {paragraphs}단락, μ•½ {words}단어"
def read_uploaded_file(file):
if file is None:
return "", ""
try:
file_ext = os.path.splitext(file.name)[1].lower()
if file_ext == '.parquet':
df = pd.read_parquet(file.name, engine='pyarrow')
content = df.head(10).to_markdown(index=False)
return content, "parquet"
elif file_ext == '.csv':
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1']
for encoding in encodings:
try:
df = pd.read_csv(file.name, encoding=encoding)
content = f"πŸ“Š 데이터 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n"
content += f"\nπŸ“ˆ 데이터 정보:\n"
content += f"- 전체 ν–‰ 수: {len(df)}\n"
content += f"- 전체 μ—΄ 수: {len(df.columns)}\n"
content += f"- 컬럼 λͺ©λ‘: {', '.join(df.columns)}\n"
content += f"\nπŸ“‹ 컬럼 데이터 νƒ€μž…:\n"
for col, dtype in df.dtypes.items():
content += f"- {col}: {dtype}\n"
null_counts = df.isnull().sum()
if null_counts.any():
content += f"\n⚠️ 결츑치:\n"
for col, null_count in null_counts[null_counts > 0].items():
content += f"- {col}: {null_count}개 λˆ„λ½\n"
return content, "csv"
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"❌ μ§€μ›λ˜λŠ” μΈμ½”λ”©μœΌλ‘œ νŒŒμΌμ„ 읽을 수 μ—†μŠ΅λ‹ˆλ‹€ ({', '.join(encodings)})")
else:
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1']
for encoding in encodings:
try:
with open(file.name, 'r', encoding=encoding) as f:
content = f.read()
return content, "text"
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"❌ μ§€μ›λ˜λŠ” μΈμ½”λ”©μœΌλ‘œ νŒŒμΌμ„ 읽을 수 μ—†μŠ΅λ‹ˆλ‹€ ({', '.join(encodings)})")
except Exception as e:
return f"❌ 파일 읽기 였λ₯˜: {str(e)}", "error"
def chat(message, history, uploaded_file, system_message="", max_tokens=4000, temperature=0.7, top_p=0.9):
if not message:
return "", history
system_prefix = """μ €λŠ” μ—¬λŸ¬λΆ„μ˜ μΉœκ·Όν•˜κ³  지적인 AI μ–΄μ‹œμŠ€ν„΄νŠΈ 'GiniGEN'μž…λ‹ˆλ‹€.. λ‹€μŒκ³Ό 같은 μ›μΉ™μœΌλ‘œ μ†Œν†΅ν•˜κ² μŠ΅λ‹ˆλ‹€:
1. 🀝 μΉœκ·Όν•˜κ³  곡감적인 νƒœλ„λ‘œ λŒ€ν™”
2. πŸ’‘ λͺ…ν™•ν•˜κ³  μ΄ν•΄ν•˜κΈ° μ‰¬μš΄ μ„€λͺ… 제곡
3. 🎯 질문의 μ˜λ„λ₯Ό μ •ν™•νžˆ νŒŒμ•…ν•˜μ—¬ λ§žμΆ€ν˜• λ‹΅λ³€
4. πŸ“š ν•„μš”ν•œ 경우 μ—…λ‘œλ“œλœ 파일 λ‚΄μš©μ„ μ°Έκ³ ν•˜μ—¬ ꡬ체적인 도움 제곡
5. ✨ 좔가적인 톡찰과 μ œμ•ˆμ„ ν†΅ν•œ κ°€μΉ˜ μžˆλŠ” λŒ€ν™”
항상 예의 λ°”λ₯΄κ³  μΉœμ ˆν•˜κ²Œ μ‘λ‹΅ν•˜λ©°, ν•„μš”ν•œ 경우 ꡬ체적인 μ˜ˆμ‹œλ‚˜ μ„€λͺ…을 μΆ”κ°€ν•˜μ—¬
이해λ₯Ό λ•κ² μŠ΅λ‹ˆλ‹€."""
try:
# 첫 λ©”μ‹œμ§€μΌ λ•Œ λͺ¨λΈ λ‘œλ”©
model_manager.ensure_model_loaded()
if uploaded_file:
content, file_type = read_uploaded_file(uploaded_file)
if file_type == "error":
error_message = content
chat_history.add_conversation(message, error_message)
return "", history + [[message, error_message]]
file_summary = analyze_file_content(content, file_type)
if file_type in ['parquet', 'csv']:
system_message += f"\n\n파일 λ‚΄μš©:\n```markdown\n{content}\n```"
else:
system_message += f"\n\n파일 λ‚΄μš©:\n```\n{content}\n```"
if message == "파일 뢄석을 μ‹œμž‘ν•©λ‹ˆλ‹€...":
message = f"""[파일 ꡬ쑰 뢄석] {file_summary}
λ‹€μŒ κ΄€μ μ—μ„œ 도움을 λ“œλ¦¬κ² μŠ΅λ‹ˆλ‹€:
1. πŸ“‹ μ „λ°˜μ μΈ λ‚΄μš© νŒŒμ•…
2. πŸ’‘ μ£Όμš” νŠΉμ§• μ„€λͺ…
3. 🎯 μ‹€μš©μ μΈ ν™œμš© λ°©μ•ˆ
4. ✨ κ°œμ„  μ œμ•ˆ
5. πŸ’¬ μΆ”κ°€ μ§ˆλ¬Έμ΄λ‚˜ ν•„μš”ν•œ μ„€λͺ…"""
messages = [{"role": "system", "content": system_prefix + system_message}]
if history:
for user_msg, assistant_msg in history:
messages.append({"role": "user", "content": user_msg})
messages.append({"role": "assistant", "content": assistant_msg})
messages.append({"role": "user", "content": message})
partial_message = ""
for response in model_manager.generate_response(
messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p
):
token = response.choices[0].delta.get('content', '')
if token:
partial_message += token
current_history = history + [[message, partial_message]]
yield "", current_history
chat_history.add_conversation(message, partial_message)
except Exception as e:
error_msg = f"❌ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"
chat_history.add_conversation(message, error_msg)
yield "", history + [[message, error_msg]]
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", title="GiniGEN πŸ€–") as demo:
initial_history = chat_history.format_for_display()
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
value=initial_history,
height=600,
label="λŒ€ν™”μ°½ πŸ’¬",
show_label=True
)
msg = gr.Textbox(
label="λ©”μ‹œμ§€ μž…λ ₯",
show_label=False,
placeholder="무엇이든 λ¬Όμ–΄λ³΄μ„Έμš”... πŸ’­",
container=False
)
with gr.Row():
clear = gr.ClearButton([msg, chatbot], value="λŒ€ν™”λ‚΄μš© μ§€μš°κΈ°")
send = gr.Button("보내기 πŸ“€")
with gr.Column(scale=1):
gr.Markdown("### GiniGEN πŸ€– [파일 μ—…λ‘œλ“œ] πŸ“\n지원 ν˜•μ‹: ν…μŠ€νŠΈ, μ½”λ“œ, CSV, Parquet 파일")
file_upload = gr.File(
label="파일 선택",
file_types=["text", ".csv", ".parquet"],
type="filepath"
)
with gr.Accordion("κ³ κΈ‰ μ„€μ • βš™οΈ", open=False):
system_message = gr.Textbox(label="μ‹œμŠ€ν…œ λ©”μ‹œμ§€ πŸ“", value="")
max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="μ΅œλŒ€ 토큰 수 πŸ“Š")
temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="μ°½μ˜μ„± μˆ˜μ€€ 🌑️")
top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="응닡 λ‹€μ–‘μ„± πŸ“ˆ")
gr.Examples(
examples=[
["μ•ˆλ…•ν•˜μ„Έμš”! μ–΄λ–€ 도움이 ν•„μš”ν•˜μ‹ κ°€μš”? 🀝"],
["μ œκ°€ μ΄ν•΄ν•˜κΈ° μ‰½κ²Œ μ„€λͺ…ν•΄ μ£Όμ‹œκ² μ–΄μš”? πŸ“š"],
["이 λ‚΄μš©μ„ μ‹€μ œλ‘œ μ–΄λ–»κ²Œ ν™œμš©ν•  수 μžˆμ„κΉŒμš”? 🎯"],
["μΆ”κ°€λ‘œ μ‘°μ–Έν•΄ μ£Όμ‹€ λ‚΄μš©μ΄ μžˆμœΌμ‹ κ°€μš”? ✨"],
["κΆκΈˆν•œ 점이 더 μžˆλŠ”λ° 여쭀봐도 λ κΉŒμš”? πŸ€”"],
],
inputs=msg,
)
def clear_chat():
chat_history.clear_history()
return None, None
msg.submit(
chat,
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p],
outputs=[msg, chatbot]
)
send.click(
chat,
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p],
outputs=[msg, chatbot]
)
clear.click(
clear_chat,
outputs=[msg, chatbot]
)
file_upload.change(
lambda: "파일 뢄석을 μ‹œμž‘ν•©λ‹ˆλ‹€...",
outputs=msg
).then(
chat,
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p],
outputs=[msg, chatbot]
)
if __name__ == "__main__":
demo.launch(
share=False,
show_error=True,
server_name="0.0.0.0",
server_port=7860,
max_threads=1
)