import os
from pyrogram import Client, filters
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant, MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty
from info import IMDB_TEMPLATE
from utils import extract_user, get_file_id, get_poster, last_online
import time
from datetime import datetime
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
@Client.on_message(filters.command('id'))
async def showid(client, message):
chat_type = message.chat.type
if chat_type == "private":
user_id = message.chat.id
first = message.from_user.first_name
last = message.from_user.last_name or ""
username = message.from_user.username or ""
dc_id = message.from_user.dc_id or ""
await message.reply_text(
f"➲ First Name: {first}\n➲ Last Name: {last}\n➲ Username: {username}\n➲ Telegram ID: {user_id}
\n➲ Data Centre: {dc_id}
",
quote=True
)
elif chat_type in ["group", "supergroup"]:
_id = ""
_id += (
"➲ Chat ID: "
f"{message.chat.id}
\n"
)
if message.reply_to_message:
_id += (
"➲ User ID: "
f"{message.from_user.id if message.from_user else 'Anonymous'}
\n"
"➲ Replied User ID: "
f"{message.reply_to_message.from_user.id if message.reply_to_message.from_user else 'Anonymous'}
\n"
)
file_info = get_file_id(message.reply_to_message)
else:
_id += (
"➲ User ID: "
f"{message.from_user.id if message.from_user else 'Anonymous'}
\n"
)
file_info = get_file_id(message)
if file_info:
_id += (
f"{file_info.message_type}: "
f"{file_info.file_id}
\n"
)
await message.reply_text(
_id,
quote=True
)
@Client.on_message(filters.command(["info"]))
async def who_is(client, message):
# https://github.com/SpEcHiDe/PyroGramBot/blob/master/pyrobot/plugins/admemes/whois.py#L19
status_message = await message.reply_text(
"`Fetching user info...`"
)
await status_message.edit(
"`Processing user info...`"
)
from_user = None
from_user_id, _ = extract_user(message)
try:
from_user = await client.get_users(from_user_id)
except Exception as error:
await status_message.edit(str(error))
return
if from_user is None:
return await status_message.edit("no valid user_id / message specified")
message_out_str = ""
message_out_str += f"➲First Name: {from_user.first_name}\n"
last_name = from_user.last_name or "None"
message_out_str += f"➲Last Name: {last_name}\n"
message_out_str += f"➲Telegram ID: {from_user.id}
\n"
username = from_user.username or "None"
dc_id = from_user.dc_id or "[User Doesn't Have A Valid DP]"
message_out_str += f"➲Data Centre: {dc_id}
\n"
message_out_str += f"➲User Name: @{username}\n"
message_out_str += f"➲User 𝖫𝗂𝗇𝗄: Click Here\n"
if message.chat.type in (("supergroup", "channel")):
try:
chat_member_p = await message.chat.get_member(from_user.id)
joined_date = datetime.fromtimestamp(
chat_member_p.joined_date or time.time()
).strftime("%Y.%m.%d %H:%M:%S")
message_out_str += (
"➲Joined this Chat on: "
f"{joined_date}"
"
\n"
)
except UserNotParticipant:
pass
if chat_photo := from_user.photo:
local_user_photo = await client.download_media(
message=chat_photo.big_file_id
)
buttons = [[
InlineKeyboardButton('🔐 Close', callback_data='close_data')
]]
reply_markup = InlineKeyboardMarkup(buttons)
await message.reply_photo(
photo=local_user_photo,
quote=True,
reply_markup=reply_markup,
caption=message_out_str,
disable_notification=True
)
os.remove(local_user_photo)
else:
buttons = [[
InlineKeyboardButton('🔐 Close', callback_data='close_data')
]]
reply_markup = InlineKeyboardMarkup(buttons)
await message.reply_text(
text=message_out_str,
reply_markup=reply_markup,
quote=True,
disable_notification=True
)
await status_message.delete()
@Client.on_message(filters.command(["imdb", 'search']))
async def imdb_search(client, message):
if ' ' in message.text:
k = await message.reply('Searching ImDB')
r, title = message.text.split(None, 1)
movies = await get_poster(title, bulk=True)
if not movies:
return await message.reply("No results Found")
btn = [
[
InlineKeyboardButton(
text=f"{movie.get('title')} - {movie.get('year')}",
callback_data=f"imdb#{movie.movieID}",
)
]
for movie in movies
]
await k.edit('Here is what i found on IMDb', reply_markup=InlineKeyboardMarkup(btn))
else:
await message.reply('Give me a movie / series Name')
@Client.on_callback_query(filters.regex('^imdb'))
async def imdb_callback(bot: Client, quer_y: CallbackQuery):
i, movie = quer_y.data.split('#')
imdb = await get_poster(query=movie, id=True)
btn = [
[
InlineKeyboardButton(
text=f"{imdb.get('title')}",
url=imdb['url'],
)
]
]
message = quer_y.message.reply_to_message or quer_y.message
if imdb:
caption = IMDB_TEMPLATE.format(
query=imdb['title'],
title=imdb['title'],
votes=imdb['votes'],
aka=imdb["aka"],
seasons=imdb["seasons"],
box_office=imdb['box_office'],
localized_title=imdb['localized_title'],
kind=imdb['kind'],
imdb_id=imdb["imdb_id"],
cast=imdb["cast"],
runtime=imdb["runtime"],
countries=imdb["countries"],
certificates=imdb["certificates"],
languages=imdb["languages"],
director=imdb["director"],
writer=imdb["writer"],
producer=imdb["producer"],
composer=imdb["composer"],
cinematographer=imdb["cinematographer"],
music_team=imdb["music_team"],
distributors=imdb["distributors"],
release_date=imdb['release_date'],
year=imdb['year'],
genres=imdb['genres'],
poster=imdb['poster'],
plot=imdb['plot'],
rating=imdb['rating'],
url=imdb['url'],
**locals()
)
else:
caption = "No Results"
if imdb.get('poster'):
try:
await quer_y.message.reply_photo(photo=imdb['poster'], caption=caption, reply_markup=InlineKeyboardMarkup(btn))
except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty):
pic = imdb.get('poster')
poster = pic.replace('.jpg', "._V1_UX360.jpg")
await quer_y.message.reply_photo(photo=poster, caption=caption, reply_markup=InlineKeyboardMarkup(btn))
except Exception as e:
logger.exception(e)
await quer_y.message.reply(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False)
await quer_y.message.delete()
else:
await quer_y.message.edit(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False)
await quer_y.answer()