import os import re import json import requests from flask import Flask, request from simpleeval import simple_eval app = Flask(__name__) GROUP_ID = os.environ.get("GROUP_ID") LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN") message_list = {} @app.route('/', methods=['GET']) def index(): return {}, 200 @app.route("/api/", methods=["POST"]) def api(): global message_list try: payload = get_payload_dict(request.get_json()) if payload.get("group_id") != GROUP_ID: raise ValueError("Invalid Group") if payload.get("unsend_msg_id"): unsend_msg_id = payload.get("unsend_msg_id") message_list.pop(unsend_msg_id, None) message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id} raise ValueError("Unsend Success") if "$$$" in payload.get("msg_text"): if "結算" in payload.get("msg_text"): users_number = get_users_number() users = list({item["user_id"] for item in message_list.values()}) if len(users) != users_number: users.append("others") print(users) matrix = [[0 for _ in range(len(users))] for _ in range(len(users))] for msg_id, data in message_list.items(): quoted_msg_id = data.get("quoted_msg_id") quoted_number = len([key for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id]) if quoted_msg_id: fr = users.index(data.get("user_id")) to = users.index(message_list.get(quoted_msg_id).get("user_id")) matrix[fr][to] += data.get("amount") if not quoted_msg_id and quoted_number == 0: # 要平分的情況 to = users.index(data.get("user_id")) for row in matrix: row[to] += (data.get("amount") / users_number) print(matrix) # users.index("b") # username_list = get_username_list() # print(username_list) # for user_id in username_list: # print(get_username(user_id)) raise ValueError("Action Success") if "$" not in payload.get("msg_text"): raise ValueError("Keyword not Found") if get_amount(payload.get("msg_text")) == None: raise ValueError("Amount is None") if payload.get("quoted_msg_id"): message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text")), "quoted_msg_id": payload.get("quoted_msg_id"), } else: message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text")), "quote_token": payload.get("quote_token")} for msg_id, data in message_list.items(): quoted_msg_id = data.get("quoted_msg_id") quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} if not quoted_msg_id and len(quoted_msg_list) != 0 and payload.get("quoted_msg_id") != msg_id: amount: float = data.get("amount") paid: float = 0.0 for _, value in quoted_msg_list.items(): paid += value.get("amount") print(amount, paid) if (amount-paid > 1): send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) break # if quoted_msg_id: # fr = users.index(data.get("user_id")) # to = users.index(message_list.get(quoted_msg_id).get("user_id")) # matrix[fr][to] += data.get("amount") # if not quoted_msg_id and quoted_number == 0: # 要平分的情況 # to = users.index(data.get("user_id")) # for row in matrix: # row[to] += (data.get("amount") / users_number) # username = get_username(payload) # text = payload.get("msg_text") # print(username) # send_text(payload.get("token"), f"{username}:{text}") except Exception as e: print(f"An error occurred: {e}") print(message_list) print() return "", 200 def get_payload_dict(raw_payload) -> dict: print(raw_payload) events = raw_payload.get("events", [{}])[0] return {"token": events.get("replyToken"), "quote_token": events.get("message", {}).get("quoteToken"), "group_id": events.get("source", {}).get("groupId"), "user_id": events.get("source", {}).get("userId"), "msg_type": events.get("message", {}).get("type"), "msg_id": events.get("message", {}).get("id"), "msg_text": events.get("message", {}).get("text"), "quoted_msg_id": events.get("message", {}).get("quotedMessageId"), "unsend_msg_id": events.get("unsend", {}).get("messageId")} def send_text(token: str, text: str, quote_token: str | None = None): requests.post("https://api.line.me/v2/bot/message/reply", headers={ "Content-Type": "application/json; charset=UTF-8", "Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN }, json={ "replyToken": token, "messages": [{"type": "text", "text": text, "quoteToken": quote_token}] }) def get_username(user_id: str): url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return res_json.get("displayName") except: return "Unknow" def get_users_number() -> int: url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return int(res_json.get("count")) except: return 0 def get_amount(text: str) -> float | None: try: after_dollar = text.split("$")[1] rows = after_dollar.split("\n") number_string = rows[0] divisor = 1 if len(rows) > 1 and "/" in rows[1]: divisor = int(rows[1].replace("/", "")) return round(float(simple_eval(number_string))/divisor, 2) except: return None if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)