Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2020-2024 (c) Randy W @xtdevs, @xtsea | |
# | |
# from : https://github.com/TeamKillerX | |
# Channel : @RendyProjects | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU Affero General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Affero General Public License for more details. | |
# | |
# You should have received a copy of the GNU Affero General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
import requests | |
import time | |
import json | |
import asyncio | |
import logging | |
import io | |
import os | |
import re | |
from os import getenv | |
from dotenv import load_dotenv | |
from PIL import Image | |
from pyrogram import * | |
from pyrogram import Client, filters | |
from pyrogram.types import * | |
from pyrogram.errors import * | |
from RyuzakiLib import FaceAI, FullStackDev, GeminiLatest, RendyDevChat | |
import google.generativeai as genai | |
from google.api_core.exceptions import InvalidArgument | |
from database import db | |
from logger import LOGS | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger("pyrogram.syncer").setLevel(logging.WARNING) | |
logging.getLogger("pyrogram.client").setLevel(logging.WARNING) | |
loop = asyncio.get_event_loop() | |
load_dotenv() | |
API_ID = os.environ["API_ID"] | |
API_HASH = os.environ["API_HASH"] | |
BOT_TOKEN = os.environ["BOT_TOKEN"] | |
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] | |
client = Client( | |
"chatbotai", | |
api_id=API_ID, | |
api_hash=API_HASH, | |
bot_token=BOT_TOKEN | |
) | |
async def geni_files_delete(name: str): | |
url = f"https://generativelanguage.googleapis.com/v1beta/{name}" | |
params = {"key": GOOGLE_API_KEY} | |
response = requests.delete(url, params=params) | |
if response.status_code != 200: | |
return None | |
return response.text | |
async def chatbot_talk(client: Client, message: Message): | |
chat_user = await db.get_chatbot(message.chat.id) | |
genai.configure(api_key=GOOGLE_API_KEY) | |
if message.photo: | |
file_path = await message.download() | |
caption = message.caption or "What's this?" | |
x = GeminiLatest(api_keys=GOOGLE_API_KEY) | |
if client.me.is_premium: | |
ai_reply = await message.reply_text(f"{custom_loading}Processing...") | |
else: | |
ai_reply = await message.reply_text(f"Processing...") | |
try: | |
backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) | |
backup_chat.append({"role": "user", "parts": [{"text": caption}]}) | |
response_reads = x.get_response_image(caption, file_path) | |
if len(response_reads) > 4096: | |
with open("chat.txt", "w+", encoding="utf8") as out_file: | |
out_file.write(response_reads) | |
await message.reply_document( | |
document="chat.txt", | |
disable_notification=True | |
) | |
await ai_reply.delete() | |
os.remove("chat.txt") | |
else: | |
await ai_reply.edit_text(response_reads) | |
backup_chat.append({"role": "model", "parts": [{"text": response_reads}]}) | |
await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) | |
os.remove(file_path) | |
return | |
except InvalidArgument as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
except Exception as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
if message.audio or message.voice: | |
if client.me.is_premium: | |
ai_reply = await message.reply_text(f"{custom_loading}Processing...") | |
else: | |
ai_reply = await message.reply_text(f"Processing...") | |
if message.audio: | |
audio_file_name = await message.download() | |
if message.voice: | |
audio_file_name = await message.download() | |
caption = message.caption or "What's this?" | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-flash", | |
safety_settings={ | |
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
} | |
) | |
backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) | |
backup_chat.append({"role": "user", "parts": [{"text": caption}]}) | |
if client.me.is_premium: | |
await ai_reply.edit_text(f"{custom_loading}Uploading file..") | |
else: | |
await ai_reply.edit_text("Uploading file..") | |
audio_file = genai.upload_file(path=audio_file_name) | |
while audio_file.state.name == "PROCESSING": | |
await asyncio.sleep(10) | |
audio_file = genai.get_file(audio_file.name) | |
if audio_file.state.name == "FAILED": | |
return await ai_reply.edit_text(f"Error: {audio_file.state.name}") | |
try: | |
response = model.generate_content( | |
[audio_file, caption], | |
request_options={"timeout": 600} | |
) | |
if len(response.text) > 4096: | |
with open("chat.txt", "w+", encoding="utf8") as out_file: | |
out_file.write(response.text) | |
await message.reply_document( | |
document="chat.txt", | |
disable_notification=True | |
) | |
await ai_reply.delete() | |
os.remove("chat.txt") | |
else: | |
await ai_reply.edit_text(response.text) | |
backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) | |
await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) | |
audio_file.delete() | |
os.remove(audio_file_name) | |
return | |
except InvalidArgument as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
except Exception as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
if message.video: | |
if client.me.is_premium: | |
ai_reply = await message.reply_text(f"{custom_loading}Processing...") | |
else: | |
ai_reply = await message.reply_text(f"Processing...") | |
video_file_name = await message.download(file_name="newvideo.mp4") | |
caption = message.caption or "What's this?" | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-pro", | |
safety_settings={ | |
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, | |
} | |
) | |
backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) | |
backup_chat.append({"role": "user", "parts": [{"text": caption}]}) | |
if client.me.is_premium: | |
await ai_reply.edit_text(f"{custom_loading}Uploading file..") | |
else: | |
await ai_reply.edit_text("Uploading file..") | |
video_file = genai.upload_file(path=video_file_name) | |
while video_file.state.name == "PROCESSING": | |
await asyncio.sleep(10) | |
video_file = genai.get_file(video_file.name) | |
if video_file.state.name == "FAILED": | |
return await ai_reply.edit_text(f"Error: {video_file.state.name}") | |
try: | |
response = model.generate_content( | |
[video_file, caption], | |
request_options={"timeout": 600} | |
) | |
if len(response.text) > 4096: | |
with open("chat.txt", "w+", encoding="utf8") as out_file: | |
out_file.write(response.text) | |
await message.reply_document( | |
document="chat.txt", | |
disable_notification=True | |
) | |
await ai_reply.delete() | |
os.remove("chat.txt") | |
else: | |
await ai_reply.edit_text(response.text) | |
backup_chat.append({"role": "model", "parts": [{"text": response.text}]}) | |
await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) | |
video_file.delete() | |
os.remove(video_file_name) | |
return | |
except InvalidArgument as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
except Exception as e: | |
return await ai_reply.edit_text(f"Error: {e}") | |
if message.text: | |
query = message.text.strip() | |
match = re.search(r"\b(Randy|Rendi)\b(.*)", query, flags=re.IGNORECASE) | |
if match: | |
rest_of_sentence = match.group(2).strip() | |
query_base = rest_of_sentence if rest_of_sentence else query | |
else: | |
query_base = query | |
parts = query.split(maxsplit=1) | |
command = parts[0].lower() | |
pic_query = parts[1].strip() if len(parts) > 1 else "" | |
try: | |
model_flash = genai.GenerativeModel( | |
model_name="gemini-1.5-flash" | |
) | |
backup_chat = await db._get_chatbot_chat_from_db(message.from_user.id) | |
backup_chat.append({"role": "user", "parts": [{"text": query_base}]}) | |
chat_session = model_flash.start_chat(history=backup_chat) | |
response_data = chat_session.send_message(query_base) | |
output = response_data.text | |
if len(output) > 4096: | |
with open("chat.txt", "w+", encoding="utf8") as out_file: | |
out_file.write(output) | |
await message.reply_document( | |
document="chat.txt", | |
disable_notification=True | |
) | |
os.remove("chat.txt") | |
else: | |
await message.reply_text(output) | |
backup_chat.append({"role": "model", "parts": [{"text": output}]}) | |
await db._update_chatbot_chat_in_db(message.from_user.id, backup_chat) | |
except Exception as e: | |
return await message.reply_text(str(e)) | |
async def main(): | |
await db.connect() | |
await client.start() | |
if __name__ == "__main__": | |
asyncio.run(main()) |