yuripeyamashita's picture
Update app.py
ed0747c verified
raw
history blame
15.7 kB
import os
import requests
from flask import Flask, request
from simpleeval import simple_eval
app = Flask(__name__)
GROUP_ID = os.environ.get("GROUP_ID")
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN")
message_list = {}
@app.route('/', methods=['GET'])
def index():
return {}, 200
@app.route("/api/", methods=["POST"])
def api():
global message_list
try:
payload = get_payload_dict(request.get_json())
if payload.get("group_id") != GROUP_ID:
raise ValueError("Invalid Group")
if payload.get("unsend_msg_id"):
unsend_msg_id = payload.get("unsend_msg_id")
message_list.pop(unsend_msg_id, None)
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id}
raise ValueError("Unsend Success")
if "$$$" in payload.get("msg_text"):
if "結算" in payload.get("msg_text"):
users_number = get_users_number()
users = list({item["user_id"] for item in message_list.values()})
if len(users) != users_number:
users.append("others")
print(users)
matrix = [[0 for _ in range(len(users))] for _ in range(len(users))]
should_checkout = True
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id}
if not quoted_msg_id and len(quoted_msg_list) != 0: # 檢查是否付清
amount: float = data.get("amount")
paid: float = 0.0
for _, value in quoted_msg_list.items():
paid += value.get("amount")
if amount-paid > 1:
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token"))
should_checkout = False
break
if not quoted_msg_id and len(quoted_msg_list) == 0: # 要平分的情況
to = users.index(data.get("user_id"))
for row in matrix:
row[to] += (data.get("amount") / users_number)
if quoted_msg_id:
fr = users.index(data.get("user_id"))
to = users.index(message_list.get(quoted_msg_id).get("user_id"))
matrix[fr][to] += data.get("amount")
print(matrix)
if not should_checkout:
return
matrix_copy = [[0 for _ in range(len(matrix))] for _ in range(len(matrix))]
for i in range(len(matrix_copy)):
for j in range(len(matrix_copy)):
if i < j:
matrix_copy[i][j] = matrix[i][j] - matrix[j][i]
for r in matrix_copy:
print(r)
result = []
for i in range(len(matrix_copy)):
for j in range(len(matrix_copy)):
amount = matrix_copy[i][j]
if amount > 0:
result.append({"from": get_username(users[i]), "to": get_username(users[j]), "amount": amount})
if amount < 0:
result.append({"from": get_username(users[j]), "to": get_username(users[i]), "amount": -amount})
sorted_result = sorted(result, key=lambda x: x["from"])
for r in sorted_result:
print(r)
bubble = get_checkout_bubble(sorted_result)
print(bubble)
send_flex_text(payload.get("token"), bubble)
raise ValueError("Action Success")
if "$" not in payload.get("msg_text"):
raise ValueError("Keyword not Found")
if get_amount(payload.get("msg_text")) == None:
raise ValueError("Amount is None")
if payload.get("quoted_msg_id"):
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quoted_msg_id": payload.get("quoted_msg_id")}
else:
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quote_token": payload.get("quote_token"),
"msg_text": payload.get("msg_text")}
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id}
if not quoted_msg_id and len(quoted_msg_list) != 0:
amount: float = data.get("amount")
paid: float = 0.0
for _, value in quoted_msg_list.items():
paid += value.get("amount")
if amount-paid <= 1 and payload.get("quoted_msg_id") == msg_id:
borrowers = []
for _, q_data in quoted_msg_list.items():
borrowers.append({"name": get_username(q_data.get("user_id")), "amount": q_data.get("amount")})
bubble = get_summary_bubble(data.get("msg_text"), get_username(data.get("user_id")), borrowers)
send_flex_text(payload.get("token"), bubble)
break
if amount-paid > 1 and payload.get("quoted_msg_id") != msg_id:
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token"))
break
except Exception as e:
print(f"An error occurred: {e}")
print(message_list)
print()
return "", 200
def get_payload_dict(raw_payload) -> dict:
# print(raw_payload)
events = raw_payload.get("events", [{}])[0]
return {"token": events.get("replyToken"),
"quote_token": events.get("message", {}).get("quoteToken"),
"group_id": events.get("source", {}).get("groupId"),
"user_id": events.get("source", {}).get("userId"),
"msg_type": events.get("message", {}).get("type"),
"msg_id": events.get("message", {}).get("id"),
"msg_text": events.get("message", {}).get("text"),
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"),
"unsend_msg_id": events.get("unsend", {}).get("messageId")}
def send_text(token: str, text: str, quote_token: str | None = None):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{"type": "text", "text": text, "quoteToken": quote_token}]
})
def send_flex_text(token, bubble):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{
"type": "flex",
"altText": "您有一則新訊息",
"contents": bubble
}]
})
def get_username(user_id: str):
if user_id == "others":
return "其他人"
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("displayName")
except:
return "未知"
def get_users_number() -> int:
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return int(res_json.get("count"))
except:
return 0
def get_amount(text: str) -> float | None:
try:
after_dollar = text.split("$")[1]
rows = after_dollar.split("\n")
number_string = rows[0]
divisor = 1
if len(rows) > 1 and "/" in rows[1]:
divisor = int(rows[1].replace("/", ""))
return round(float(simple_eval(number_string))/divisor, 2)
except:
return None
def get_summary_bubble(title: str, payer: str, borrowers: list):
total = 0
borrower_contents = []
for borrower in borrowers:
amount = float(borrower.get("amount"))
total += amount
borrower_contents.append({
"type": "box",
"layout": "horizontal",
"contents": [
{
"type": "text",
"text": borrower.get("name"),
"size": "sm",
"color": "#555555",
"flex": 0
},
{
"type": "text",
"text": f"${round(amount,2)}",
"size": "sm",
"color": "#111111",
"align": "end"
}
]
})
return {
"type": "bubble",
"body": {
"type": "box",
"layout": "vertical",
"contents": [
{
"type": "text",
"text": "對帳明細",
"weight": "bold",
"color": "#1DB446",
"size": "sm"
},
{
"type": "text",
"text": title,
"weight": "bold",
"size": "xxl",
"margin": "md"
},
{
"type": "text",
"text": f"由 {payer} 付款",
"size": "sm",
"color": "#aaaaaa",
"wrap": True,
"weight": "bold"
},
{
"type": "separator",
"margin": "lg"
},
{
"type": "box",
"layout": "vertical",
"margin": "lg",
"spacing": "sm",
"contents": borrower_contents
},
{
"type": "separator",
"margin": "lg"
},
{
"type": "box",
"layout": "horizontal",
"contents": [
{
"type": "text",
"text": "合計",
"size": "sm",
"color": "#555555"
},
{
"type": "text",
"text": f"${round(total,2)}",
"size": "sm",
"color": "#111111",
"align": "end",
"weight": "bold"
}
],
"margin": "lg"
}
]
},
"styles": {
"footer": {
"separator": True
}
}
}
def get_checkout_bubble(checkout_list: list):
current_user = checkout_list[0].get("from")
current_total = 0
current_pay_to_contents = []
carousel_contents = []
for item in checkout_list:
if current_user != item.get("from"):
carousel_contents.append({
"type": "bubble",
"body": {
"type": "box",
"layout": "vertical",
"contents": [
{
"type": "text",
"text": "結算明細",
"weight": "bold",
"color": "#1DB446",
"size": "sm"
},
{
"type": "text",
"text": current_user,
"weight": "bold",
"size": "xxl",
"margin": "md"
},
{
"type": "text",
"text": "需付款給:",
"size": "sm",
"color": "#aaaaaa",
"wrap": True,
"margin": "lg"
},
{
"type": "box",
"layout": "vertical",
"margin": "md",
"spacing": "sm",
"contents": current_pay_to_contents},
{
"type": "separator",
"margin": "lg"
},
{
"type": "box",
"layout": "horizontal",
"contents": [
{
"type": "text",
"text": "合計",
"size": "sm",
"color": "#555555"
},
{
"type": "text",
"text": f"${round(current_total,2)}",
"size": "sm",
"color": "#111111",
"align": "end",
"weight": "bold"
}
],
"margin": "lg"
}
]
},
"styles": {
"footer": {
"separator": True
}
}
}
)
current_user = item.get("from")
current_total = 0
current_pay_to_contents = []
amount = float(item.get("amount"))
current_total += amount
current_pay_to_contents.append({
"type": "box",
"layout": "horizontal",
"contents": [
{
"type": "text",
"text": item.get("to"),
"size": "sm",
"color": "#555555",
"flex": 0
},
{
"type": "text",
"text": f"${round(amount,2)}",
"size": "sm",
"color": "#111111",
"align": "end"
}
]
},)
return {
"type": "carousel",
"contents": carousel_contents
}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)