import logging from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM from imdb import Cinemagoer import asyncio from pyrogram.types import Message from typing import Union import re import os from datetime import datetime from typing import List from pyrogram.types import InlineKeyboardButton from database.users_chats_db import db from database.tvseriesfilters import find_tvseries_filter from bs4 import BeautifulSoup import requests import json import aiohttp from database.ia_filterdb import get_search_results from database.notification import remove_notification import pyshorteners shortner = pyshorteners.Shortener() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BTN_URL_REGEX = re.compile( r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))" ) imdb = Cinemagoer() BANNED = {} SMART_OPEN = '“' SMART_CLOSE = '”' START_CHAR = ('\'', '"', SMART_OPEN) btns = [] # temp db for banned class temp(object): BANNED_USERS = [] BANNED_CHATS = [] ME = None CURRENT = int(os.environ.get("SKIP", 2)) CANCEL = False MELCOW = {} U_NAME = None B_NAME = None SETTINGS = {} async def is_subscribed(bot, query): try: user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) except UserNotParticipant: pass except Exception as e: logger.exception(e) else: if user.status != 'kicked': return True return False async def get_poster(query, bulk=False, id=False, file=None): if not id: query = query.strip().lower() title = query year = re.findall('[1-2]\d{3}$', query, re.IGNORECASE) if year: year = list_to_str(year[:1]) title = query.replace(year, "").strip() elif file is not None: year = re.findall('[1-2]\d{3}', file, re.IGNORECASE) if year: year = list_to_str(year[:1]) else: year = None try: movieid = imdb.search_movie(title.lower(), results=10) except Exception: return None if not movieid: return None if year: filtered = list(filter(lambda k: str( k.get('year')) == str(year), movieid)) if not filtered: filtered = movieid else: filtered = movieid movieid = list(filter(lambda k: k.get('kind') in [ 'movie', 'tv series'], filtered)) if not movieid: movieid = filtered if bulk: return movieid movieid = movieid[0].movieID else: movieid = query movie = imdb.get_movie(movieid) if movie.get("original air date"): date = movie["original air date"] elif movie.get("year"): date = movie.get("year") else: date = "N/A" plot = "" if not LONG_IMDB_DESCRIPTION: plot = movie.get('plot') if plot and len(plot) > 0: plot = plot[0] else: plot = movie.get('plot outline') if plot and len(plot) > 800: plot = f"{plot[:800]}..." return {'title': movie.get('title'), 'votes': movie.get('votes'), "aka": list_to_str(movie.get("akas")), "seasons": movie.get("number of seasons"), "box_office": movie.get('box office'), 'localized_title': movie.get('localized title'), 'kind': movie.get("kind"), "imdb_id": f"tt{movie.get('imdbID')}", "cast": list_to_str(movie.get("cast")), "runtime": list_to_str(movie.get("runtimes")), "countries": list_to_str(movie.get("countries")), "certificates": list_to_str(movie.get("certificates")), "languages": list_to_str(movie.get("languages")), "director": list_to_str(movie.get("director")), "writer": list_to_str(movie.get("writer")), "producer": list_to_str(movie.get("producer")), "composer": list_to_str(movie.get("composer")), "cinematographer": list_to_str(movie.get("cinematographer")), "music_team": list_to_str(movie.get("music department")), "distributors": list_to_str(movie.get("distributors")), 'release_date': date, 'year': movie.get('year'), 'genres': list_to_str(movie.get("genres")), 'poster': movie.get('full-size cover url'), 'plot': plot, 'rating': str(movie.get("rating")), 'url': f'https://www.imdb.com/title/tt{movieid}'} # https://github.com/odysseusm async def broadcast_notification(user_id, message): try: await message.copy(chat_id=user_id) return True, "Succes" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_notification(user_id, message) except InputUserDeactivated: await remove_notification(user_id) logging.info( f"{user_id}-Removed from Database, since deleted account.") return False, "Deleted" except UserIsBlocked: logging.info(f"{user_id} -Blocked the bot.") return False, "Blocked" except PeerIdInvalid: await remove_notification(user_id) logging.info(f"{user_id} - PeerIdInvalid") return False, "Error" except Exception as e: return False, "Error" async def broadcast_messages(user_id, message): try: await message.copy(chat_id=user_id) return True, "Succes" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_messages(user_id, message) except InputUserDeactivated: await db.delete_user(int(user_id)) logging.info( f"{user_id}-Removed from Database, since deleted account.") return False, "Deleted" except UserIsBlocked: logging.info(f"{user_id} -Blocked the bot.") return False, "Blocked" except PeerIdInvalid: await db.delete_user(int(user_id)) logging.info(f"{user_id} - PeerIdInvalid") return False, "Error" except Exception as e: return False, "Error" async def search_gagala(text): usr_agent = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/61.0.3163.100 Safari/537.36' } text = text.replace(" ", '+') url = f'https://www.google.com/search?q={text}' response = requests.get(url, headers=usr_agent) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all('h3') return [title.getText() for title in titles] async def get_settings(group_id): settings = temp.SETTINGS.get(group_id) if not settings: settings = await db.get_settings(group_id) temp.SETTINGS[group_id] = settings return settings async def save_group_settings(group_id, key, value): current = await get_settings(group_id) current[key] = value temp.SETTINGS[group_id] = current await db.update_settings(group_id, current) async def send_more_files(name): name = get_name(name) name = name.split(".")[:3] name = ' '.join(name) name = name.split(" ")[:3] name = ' '.join(name) files, offset, total_results = await get_search_results(name.lower(), offset=0, filter=True) if len(files) > 15: files = files[:15] if files: return files def get_size(size): """Get size in readable format""" units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] size = float(size) i = 0 while size >= 1024.0 and i < len(units): i += 1 size /= 1024.0 return "%.2f %s" % (size, units[i]) def get_name(name): name = name.lower() name = name.replace("@cc", '') name = name.replace("telegram", '') name = name.replace("www", '') name = name.replace("join", '') name = name.replace("tg", '') name = name.replace("link", '') name = name.replace("@", '') name = name.replace("Team_Tony", '') name = name.replace("massmovies0", '') name = name.replace("bullmoviee", '') name = name.replace("massmovies", '') name = name.replace("filmy4cab", '') name = name.replace("maassmovies", '') name = name.replace("theproffesorr", '') name = name.replace("primeroom", '') name = name.replace("team_hdt", '') name = name.replace("Pulikesi_Meme", '') name = name.replace("telugudubbing", '') name = name.replace("rickychannel", '') name = name.replace("tif", '') name = name.replace("cvm", '') name = name.replace("playtk", '') name = name.replace("tel", '') name = name.replace("hw", '') name = name.replace("f&t", '') name = name.replace("fimy", '') name = name.replace("film", '') name = name.replace("xyz", '') name = name.replace("fbm", '') name = name.replace("mwkott", '') name = name.replace("team_hdt", '') name = name.replace("worldcinematoday", '') name = name.replace("cinematic_world", '') name = name.replace("cinema", '') name = name.replace("hotstar", '') name = name.replace("jesseverse", '') name = name.replace("apdackup", '') name = name.replace("streamersHub", '') name = name.replace("tg", '') name = name.replace("movies", '') name = name.replace("[ava]", '') name = name.replace("tamilrockers", '') name = name.replace("imax5", '') name = name.replace("kerala rock", '') name = name.replace("ott", '') name = name.replace("rarefilms", '') name = name.replace("linkzz", '') name = name.replace("movems", '') name = name.replace("moviezz", '') name = name.replace("clipmate", '') name = name.replace("southtamilall", '') name = name.replace("apdbackup", '') name = name.replace("wmr", '') name = name.replace("web", '') name = name.replace("rowdystudios", '') name = name.replace("alpacinodump", '') name = name.replace("fans", '') name = name.replace("movie", '') name = name.replace("mlf", '') name = name.replace("[rmk]", '') name = name.replace("[mc]", '') name = name.replace("[mfa]", '') name = name.replace("[mm]", '') name = name.replace("[me]", '') name = name.replace("[", '') name = name.replace("]", '') name = name.replace("mlm", '') name = name.replace("RMK", '') name = name.replace("1tamilmv", '') name = name.replace("linkz", '') name = name.replace("tamilMob", '') name = name.replace("tg", '') name = name.replace("bollyarchives", '') name = name.replace("🎞", '') name = name.replace("🎬", '') name = name.replace("(", '') name = name.replace(")", '') name = name.replace(" ", '.') name = name.replace("_", '.') name = name.replace("...", '.') name = name.replace("..", '.') if name[0] == '.': name = name[1:] name = name.capitalize() return name def getseries(name): name = name.lower() name = name.replace("season", "") name = name.replace("series", "") name = name.replace("tv", "") name = name.replace("episode", "") name = name.replace("480p", "") name = name.replace("720p", "") name = name.replace("1080p", "") name = name.replace("hindi", "") name = name.replace("tamil", "") name = name.replace("english", "") name = name.replace("web", "") # name = ''.join([i for i in name if not i.isdigit()]) name = name.replace(" ", "") return name def gen_url(link): link = f"https://rocklinks.net/st?api=85b949240ee33cb797db1efc7aa94cb265c6ad35&url={link}" try: urllink = shortner.tinyurl.short(link) except Exception: urllink = link return urllink def split_list(l, n): for i in range(0, len(l), n): yield l[i:i + n] def get_file_id(msg: Message): if msg.media: for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"): obj = getattr(msg, message_type) if obj: setattr(obj, "message_type", message_type) return obj def extract_user(message: Message) -> Union[int, str]: """extracts the user from a message""" # https://github.com/SpEcHiDe/PyroGramBot/blob/f30e2cca12002121bad1982f68cd0ff9814ce027/pyrobot/helper_functions/extract_user.py#L7 user_id = None user_first_name = None if message.reply_to_message: user_id = message.reply_to_message.from_user.id user_first_name = message.reply_to_message.from_user.first_name elif len(message.command) > 1: if ( len(message.entities) > 1 and message.entities[1].type == "text_mention" ): required_entity = message.entities[1] user_id = required_entity.user.id user_first_name = required_entity.user.first_name else: user_id = message.command[1] # don't want to make a request -_- user_first_name = user_id try: user_id = int(user_id) except ValueError: pass else: user_id = message.from_user.id user_first_name = message.from_user.first_name return (user_id, user_first_name) def list_to_str(k): if not k: return "N/A" elif len(k) == 1: return str(k[0]) elif MAX_LIST_ELM: k = k[:int(MAX_LIST_ELM)] return ' '.join(f'{elem}, ' for elem in k) else: return ' '.join(f'{elem}, ' for elem in k) def last_online(from_user): time = "" if from_user.is_bot: time += "🤖 Bot :(" elif from_user.status == 'recently': time += "Recently" elif from_user.status == 'within_week': time += "Within the last week" elif from_user.status == 'within_month': time += "Within the last month" elif from_user.status == 'long_time_ago': time += "A long time ago :(" elif from_user.status == 'online': time += "Currently Online" elif from_user.status == 'offline': time += datetime.fromtimestamp( from_user.last_online_date).strftime("%a, %d %b %Y, %H:%M:%S") return time def split_quotes(text: str) -> List: if not any(text.startswith(char) for char in START_CHAR): return text.split(None, 1) counter = 1 # ignore first char -> is some kind of quote while counter < len(text): if text[counter] == "\\": counter += 1 elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): break counter += 1 else: return text.split(None, 1) # 1 to avoid starting quote, and counter is exclusive so avoids ending key = remove_escapes(text[1:counter].strip()) # index will be in range, or `else` would have been executed and returned rest = text[counter + 1:].strip() if not key: key = text[0] + text[0] return list(filter(None, [key, rest])) def parser(text, keyword): if "buttonalert" in text: text = text.replace("\n", "\\n").replace("\t", "\\t") buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if match.group(3) == "buttonalert": if bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton(text=match.group(2), callback_data=f"alertmessage:{i}:{keyword}")) else: buttons.append([InlineKeyboardButton(text=match.group( 2), callback_data=f"alertmessage:{i}:{keyword}")]) i += 1 alerts.append(match.group(4)) elif bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton(text=match.group(2), url=match.group(4).replace(" ", ""))) else: buttons.append([InlineKeyboardButton( text=match.group(2), url=match.group(4).replace(" ", ""))]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 note_data += text[prev:] try: return note_data, buttons, alerts except Exception: return note_data, buttons, None def remove_escapes(text: str) -> str: res = "" is_escaped = False for counter in range(len(text)): if is_escaped: res += text[counter] is_escaped = False elif text[counter] == "\\": is_escaped = True else: res += text[counter] return res def humanbytes(size): if not size: return "" power = 2**10 n = 0 Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} while size > power: size /= power n += 1 return f"{str(round(size, 2))} {Dic_powerN[n]}B"