yuripeyamashita's picture
Update app.py
d1df012 verified
raw
history blame
7.4 kB
import os
import re
import json
import requests
from flask import Flask, request
from simpleeval import simple_eval
app = Flask(__name__)
GROUP_ID = os.environ.get("GROUP_ID")
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN")
message_list = {}
@app.route('/', methods=['GET'])
def index():
return {}, 200
@app.route("/api/", methods=["POST"])
def api():
global message_list
try:
payload = get_payload_dict(request.get_json())
if payload.get("group_id") != GROUP_ID:
raise ValueError("Invalid Group")
if payload.get("unsend_msg_id"):
unsend_msg_id = payload.get("unsend_msg_id")
message_list.pop(unsend_msg_id, None)
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id}
raise ValueError("Unsend Success")
if "$$$" in payload.get("msg_text"):
if "結算" in payload.get("msg_text"):
users_number = get_users_number()
users = list({item["user_id"] for item in message_list.values()})
if len(users) != users_number:
users.append("others")
print(users)
matrix = [[0 for _ in range(len(users))] for _ in range(len(users))]
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_number = len([key for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id])
if quoted_msg_id:
fr = users.index(data.get("user_id"))
to = users.index(message_list.get(quoted_msg_id).get("user_id"))
matrix[fr][to] += data.get("amount")
if not quoted_msg_id and quoted_number == 0: # 要平分的情況
to = users.index(data.get("user_id"))
for row in matrix:
row[to] += (data.get("amount") / users_number)
print(matrix)
# users.index("b")
# username_list = get_username_list()
# print(username_list)
# for user_id in username_list:
# print(get_username(user_id))
raise ValueError("Action Success")
if "$" not in payload.get("msg_text"):
raise ValueError("Keyword not Found")
if get_amount(payload.get("msg_text")) == None:
raise ValueError("Amount is None")
if payload.get("quoted_msg_id"):
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quoted_msg_id": payload.get("quoted_msg_id")}
else:
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quote_token": payload.get("quote_token"),
"msg_text": payload.get("msg_text")}
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id}
if not quoted_msg_id and len(quoted_msg_list) != 0:
amount: float = data.get("amount")
paid: float = 0.0
for _, value in quoted_msg_list.items():
paid += value.get("amount")
print(amount, paid)
if amount-paid <= 1 and payload.get("quoted_msg_id") == msg_id:
s = f"{data.get("msg_text")} paid by {get_username(data.get("user_id"))}\n\n"
for _, q_data in quoted_msg_list.items():
s += f"{get_username(q_data.get("user_id"))} : {q_data.get("amount")}\n"
send_text(payload.get("token"), s, data.get("quote_token"))
break
if amount-paid > 1 and payload.get("quoted_msg_id") != msg_id:
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token"))
break
# if quoted_msg_id:
# fr = users.index(data.get("user_id"))
# to = users.index(message_list.get(quoted_msg_id).get("user_id"))
# matrix[fr][to] += data.get("amount")
# if not quoted_msg_id and quoted_number == 0: # 要平分的情況
# to = users.index(data.get("user_id"))
# for row in matrix:
# row[to] += (data.get("amount") / users_number)
# username = get_username(payload)
# text = payload.get("msg_text")
# print(username)
# send_text(payload.get("token"), f"{username}:{text}")
except Exception as e:
print(f"An error occurred: {e}")
print(message_list)
print()
return "", 200
def get_payload_dict(raw_payload) -> dict:
# print(raw_payload)
events = raw_payload.get("events", [{}])[0]
return {"token": events.get("replyToken"),
"quote_token": events.get("message", {}).get("quoteToken"),
"group_id": events.get("source", {}).get("groupId"),
"user_id": events.get("source", {}).get("userId"),
"msg_type": events.get("message", {}).get("type"),
"msg_id": events.get("message", {}).get("id"),
"msg_text": events.get("message", {}).get("text"),
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"),
"unsend_msg_id": events.get("unsend", {}).get("messageId")}
def send_text(token: str, text: str, quote_token: str | None = None):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{"type": "text", "text": text, "quoteToken": quote_token}]
})
def get_username(user_id: str):
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("displayName")
except:
return "Unknow"
def get_users_number() -> int:
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return int(res_json.get("count"))
except:
return 0
def get_amount(text: str) -> float | None:
try:
after_dollar = text.split("$")[1]
rows = after_dollar.split("\n")
number_string = rows[0]
divisor = 1
if len(rows) > 1 and "/" in rows[1]:
divisor = int(rows[1].replace("/", ""))
return round(float(simple_eval(number_string))/divisor, 2)
except:
return None
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)