|
import os |
|
import re |
|
import json |
|
import requests |
|
from flask import Flask, request |
|
from simpleeval import simple_eval |
|
|
|
app = Flask(__name__) |
|
|
|
GROUP_ID = os.environ.get("GROUP_ID") |
|
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN") |
|
|
|
message_list = {} |
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
def index(): |
|
return {}, 200 |
|
|
|
|
|
@app.route("/api/", methods=["POST"]) |
|
def api(): |
|
global message_list |
|
try: |
|
payload = get_payload_dict(request.get_json()) |
|
|
|
if payload.get("group_id") != GROUP_ID: |
|
raise ValueError("Invalid Group") |
|
|
|
if payload.get("unsend_msg_id"): |
|
unsend_msg_id = payload.get("unsend_msg_id") |
|
message_list.pop(unsend_msg_id, None) |
|
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id} |
|
raise ValueError("Unsend Success") |
|
|
|
if "$$$" in payload.get("msg_text"): |
|
if "結算" in payload.get("msg_text"): |
|
users_number = get_users_number() |
|
|
|
users = list({item["user_id"] for item in message_list.values()}) |
|
if len(users) != users_number: |
|
users.append("others") |
|
print(users) |
|
|
|
matrix = [[0 for _ in range(len(users))] for _ in range(len(users))] |
|
|
|
for msg_id, data in message_list.items(): |
|
quoted_msg_id = data.get("quoted_msg_id") |
|
quoted_number = len([key for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id]) |
|
|
|
if quoted_msg_id: |
|
fr = users.index(data.get("user_id")) |
|
to = users.index(message_list.get(quoted_msg_id).get("user_id")) |
|
matrix[fr][to] += data.get("amount") |
|
|
|
if not quoted_msg_id and quoted_number == 0: |
|
to = users.index(data.get("user_id")) |
|
for row in matrix: |
|
row[to] += (data.get("amount") / users_number) |
|
|
|
print(matrix) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise ValueError("Action Success") |
|
|
|
if "$" not in payload.get("msg_text"): |
|
raise ValueError("Keyword not Found") |
|
|
|
if get_amount(payload.get("msg_text")) == None: |
|
raise ValueError("Amount is None") |
|
|
|
if payload.get("quoted_msg_id"): |
|
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), |
|
"amount": get_amount(payload.get("msg_text")), |
|
"quoted_msg_id": payload.get("quoted_msg_id")} |
|
else: |
|
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), |
|
"amount": get_amount(payload.get("msg_text")), |
|
"quote_token": payload.get("quote_token"), |
|
"msg_text": payload.get("msg_text")} |
|
|
|
for msg_id, data in message_list.items(): |
|
quoted_msg_id = data.get("quoted_msg_id") |
|
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} |
|
|
|
if not quoted_msg_id and len(quoted_msg_list) != 0: |
|
amount: float = data.get("amount") |
|
paid: float = 0.0 |
|
for _, value in quoted_msg_list.items(): |
|
paid += value.get("amount") |
|
print(amount, paid) |
|
|
|
if amount-paid <= 1 and payload.get("quoted_msg_id") == msg_id: |
|
s = f"{data.get("msg_text")} paid by {get_username(data.get("user_id"))}\n\n" |
|
for _, q_data in quoted_msg_list.items(): |
|
s += f"{get_username(q_data.get("user_id"))} : {q_data.get("amount")}\n" |
|
send_text(payload.get("token"), s, data.get("quote_token")) |
|
break |
|
|
|
if amount-paid > 1 and payload.get("quoted_msg_id") != msg_id: |
|
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) |
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
|
|
print(message_list) |
|
print() |
|
return "", 200 |
|
|
|
|
|
def get_payload_dict(raw_payload) -> dict: |
|
|
|
events = raw_payload.get("events", [{}])[0] |
|
return {"token": events.get("replyToken"), |
|
"quote_token": events.get("message", {}).get("quoteToken"), |
|
"group_id": events.get("source", {}).get("groupId"), |
|
"user_id": events.get("source", {}).get("userId"), |
|
"msg_type": events.get("message", {}).get("type"), |
|
"msg_id": events.get("message", {}).get("id"), |
|
"msg_text": events.get("message", {}).get("text"), |
|
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"), |
|
"unsend_msg_id": events.get("unsend", {}).get("messageId")} |
|
|
|
|
|
def send_text(token: str, text: str, quote_token: str | None = None): |
|
requests.post("https://api.line.me/v2/bot/message/reply", headers={ |
|
"Content-Type": "application/json; charset=UTF-8", |
|
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN |
|
}, json={ |
|
"replyToken": token, |
|
"messages": [{"type": "text", "text": text, "quoteToken": quote_token}] |
|
}) |
|
|
|
|
|
def get_username(user_id: str): |
|
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}" |
|
try: |
|
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() |
|
return res_json.get("displayName") |
|
except: |
|
return "Unknow" |
|
|
|
|
|
def get_users_number() -> int: |
|
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count" |
|
try: |
|
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() |
|
return int(res_json.get("count")) |
|
except: |
|
return 0 |
|
|
|
|
|
def get_amount(text: str) -> float | None: |
|
try: |
|
after_dollar = text.split("$")[1] |
|
rows = after_dollar.split("\n") |
|
number_string = rows[0] |
|
divisor = 1 |
|
if len(rows) > 1 and "/" in rows[1]: |
|
divisor = int(rows[1].replace("/", "")) |
|
return round(float(simple_eval(number_string))/divisor, 2) |
|
except: |
|
return None |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |
|
|