import os import re import json import requests from flask import Flask, request from simpleeval import simple_eval app = Flask(__name__) GROUP_ID = os.environ.get("GROUP_ID") LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN") message_list = {} @app.route('/', methods=['GET']) def index(): return {}, 200 @app.route("/api/", methods=["POST"]) def api(): try: payload = get_payload_dict(request.get_json()) if payload.get("group_id") != GROUP_ID: raise ValueError("Invalid Group") if payload.get("unsend_msg_id"): message_list.pop(payload.get("unsend_msg_id"), None) raise ValueError("Unsend Success") if "$" not in payload.get("msg_text"): raise ValueError("Keyword not Found") if get_amount(payload.get("msg_text")) == None: raise ValueError("Amount is None") if payload.get("quoted_msg_id"): message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text")), "quoted_msg_id": payload.get("quoted_msg_id")} else: message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text"))} # username = get_username(payload) # text = payload.get("msg_text") # print(username) # send_text(payload.get("token"), f"{username}:{text}") except Exception as e: print(f"An error occurred: {e}") print(message_list) print() return "", 200 def get_payload_dict(raw_payload) -> dict: # print(raw_payload) events = raw_payload.get("events", [{}])[0] return {"token": events.get("replyToken"), "group_id": events.get("source", {}).get("groupId"), "user_id": events.get("source", {}).get("userId"), "msg_type": events.get("message", {}).get("type"), "msg_id": events.get("message", {}).get("id"), "msg_text": events.get("message", {}).get("text"), "quoted_msg_id": events.get("message", {}).get("quotedMessageId"), "unsend_msg_id": events.get("unsend", {}).get("messageId")} def send_text(token: str, text: str): requests.post("https://api.line.me/v2/bot/message/reply", headers={ "Content-Type": "application/json; charset=UTF-8", "Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN }, json={ "replyToken": token, "messages": [{"type": "text", "text": text}] }) def get_username(user_id: str): url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return res_json.get("displayName") except: return "Unknow" def get_username() -> list: url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/ids" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return res_json.get("memberIds") except: return [] def get_amount(text: str) -> float | None: try: return round(float(simple_eval(text.split("$")[1])), 2) except: return None if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)