|
import os |
|
import requests |
|
from flask import Flask, request |
|
from simpleeval import simple_eval |
|
|
|
app = Flask(__name__) |
|
|
|
GROUP_ID = os.environ.get("GROUP_ID") |
|
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN") |
|
|
|
message_list = {} |
|
group_id = "" |
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
def index(): |
|
return {}, 200 |
|
|
|
|
|
@app.route("/api/", methods=["POST"]) |
|
def api(): |
|
global message_list |
|
try: |
|
payload = get_payload_dict(request.get_json()) |
|
print(group_id) |
|
|
|
if group_id != GROUP_ID: |
|
raise ValueError("Invalid Group") |
|
|
|
if payload.get("unsend_msg_id"): |
|
unsend_msg_id = payload.get("unsend_msg_id") |
|
message_list.pop(unsend_msg_id, None) |
|
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id} |
|
raise ValueError("Unsend Success") |
|
|
|
if "$$$" in payload.get("msg_text"): |
|
if "清除" in payload.get("msg_text"): |
|
message_list = {} |
|
return "", 200 |
|
|
|
if "結算" in payload.get("msg_text"): |
|
users_number = get_users_number() |
|
|
|
users = list({item["user_id"] for item in message_list.values()}) |
|
if len(users) != users_number: |
|
users.append("others") |
|
print(users) |
|
|
|
matrix = [[0.0 for _ in range(len(users))] for _ in range(len(users))] |
|
|
|
should_checkout = True |
|
|
|
for msg_id, data in message_list.items(): |
|
quoted_msg_id = data.get("quoted_msg_id") |
|
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} |
|
|
|
if not quoted_msg_id and len(quoted_msg_list) != 0: |
|
amount: float = data.get("amount") |
|
paid: float = 0.0 |
|
for _, value in quoted_msg_list.items(): |
|
paid += value.get("amount") |
|
if amount-paid > 1: |
|
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) |
|
should_checkout = False |
|
break |
|
|
|
if not quoted_msg_id and len(quoted_msg_list) == 0: |
|
to = users.index(data.get("user_id")) |
|
for row in matrix: |
|
row[to] += (data.get("amount") / users_number) |
|
|
|
if quoted_msg_id: |
|
fr = users.index(data.get("user_id")) |
|
to = users.index(message_list.get(quoted_msg_id).get("user_id")) |
|
matrix[fr][to] += data.get("amount") |
|
|
|
print(matrix) |
|
|
|
if not should_checkout: |
|
return "", 200 |
|
|
|
matrix_copy = [[0 for _ in range(len(matrix))] for _ in range(len(matrix))] |
|
|
|
for i in range(len(matrix_copy)): |
|
for j in range(len(matrix_copy)): |
|
if i < j: |
|
matrix_copy[i][j] = matrix[i][j] - matrix[j][i] |
|
|
|
result = [] |
|
|
|
for i in range(len(matrix_copy)): |
|
for j in range(len(matrix_copy)): |
|
amount = matrix_copy[i][j] |
|
if amount > 0: |
|
result.append({"from": get_username(users[i]), "to": get_username(users[j]), "amount": amount}) |
|
if amount < 0: |
|
result.append({"from": get_username(users[j]), "to": get_username(users[i]), "amount": -amount}) |
|
|
|
if result: |
|
sorted_result = sorted(result, key=lambda x: x["from"]) |
|
bubble = get_checkout_bubble(sorted_result) |
|
send_flex_text(payload.get("token"), bubble) |
|
message_list = {} |
|
|
|
raise ValueError("Action Success") |
|
|
|
if "$" not in payload.get("msg_text"): |
|
if "計算" in payload.get("msg_text"): |
|
try: |
|
text = payload.get("msg_text") |
|
number = float(simple_eval(text.split("計算")[1])) |
|
send_text(payload.get("token"), f"$ {round(number,2)}", payload.get("quote_token")) |
|
except: |
|
raise ValueError("Calculation Error") |
|
raise ValueError("Calculation Successful") |
|
raise ValueError("Keyword not Found") |
|
|
|
if get_amount(payload.get("msg_text")) == None: |
|
raise ValueError("Amount is None") |
|
|
|
if payload.get("quoted_msg_id"): |
|
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), |
|
"amount": get_amount(payload.get("msg_text")), |
|
"quoted_msg_id": payload.get("quoted_msg_id")} |
|
else: |
|
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), |
|
"amount": get_amount(payload.get("msg_text")), |
|
"quote_token": payload.get("quote_token"), |
|
"msg_text": payload.get("msg_text")} |
|
|
|
for msg_id, data in message_list.items(): |
|
quoted_msg_id = data.get("quoted_msg_id") |
|
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} |
|
|
|
if not quoted_msg_id and len(quoted_msg_list) != 0: |
|
amount: float = data.get("amount") |
|
paid: float = 0.0 |
|
for _, value in quoted_msg_list.items(): |
|
paid += value.get("amount") |
|
|
|
if amount-paid <= 1 and payload.get("quoted_msg_id") == msg_id: |
|
borrowers = [] |
|
for _, q_data in quoted_msg_list.items(): |
|
borrowers.append({"name": get_username(q_data.get("user_id")), "amount": q_data.get("amount")}) |
|
bubble = get_summary_bubble(data.get("msg_text"), get_username(data.get("user_id")), borrowers) |
|
send_flex_text(payload.get("token"), bubble) |
|
break |
|
|
|
if amount-paid > 1 and payload.get("quoted_msg_id") != msg_id: |
|
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) |
|
break |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
|
|
print("="*20) |
|
for key in message_list: |
|
print(message_list[key]) |
|
print("="*20, end="\n\n") |
|
|
|
return "", 200 |
|
|
|
|
|
def get_payload_dict(raw_payload) -> dict: |
|
|
|
global group_id |
|
events = raw_payload.get("events", [{}])[0] |
|
group_id = events.get("source", {}).get("groupId") |
|
|
|
return {"token": events.get("replyToken"), |
|
"quote_token": events.get("message", {}).get("quoteToken"), |
|
"group_id": events.get("source", {}).get("groupId"), |
|
"user_id": events.get("source", {}).get("userId"), |
|
"msg_type": events.get("message", {}).get("type"), |
|
"msg_id": events.get("message", {}).get("id"), |
|
"msg_text": events.get("message", {}).get("text"), |
|
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"), |
|
"unsend_msg_id": events.get("unsend", {}).get("messageId")} |
|
|
|
|
|
def send_text(token: str, text: str, quote_token: str | None = None): |
|
requests.post("https://api.line.me/v2/bot/message/reply", headers={ |
|
"Content-Type": "application/json; charset=UTF-8", |
|
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN |
|
}, json={ |
|
"replyToken": token, |
|
"messages": [{"type": "text", "text": text, "quoteToken": quote_token}] |
|
}) |
|
|
|
|
|
def send_flex_text(token, bubble): |
|
requests.post("https://api.line.me/v2/bot/message/reply", headers={ |
|
"Content-Type": "application/json; charset=UTF-8", |
|
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN |
|
}, json={ |
|
"replyToken": token, |
|
"messages": [{ |
|
"type": "flex", |
|
"altText": "您有一則新訊息", |
|
"contents": bubble |
|
}] |
|
}) |
|
|
|
|
|
def get_username(user_id: str): |
|
if user_id == "others": |
|
return "其他人" |
|
url = f"https://api.line.me/v2/bot/group/{group_id}/member/{user_id}" |
|
try: |
|
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() |
|
return res_json.get("displayName") |
|
except: |
|
return "未知" |
|
|
|
|
|
def get_users_number() -> int: |
|
global group_id |
|
url = f"https://api.line.me/v2/bot/group/{group_id}/members/count" |
|
try: |
|
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() |
|
return int(res_json.get("count")) |
|
except: |
|
return 0 |
|
|
|
|
|
def get_amount(text: str) -> float | None: |
|
try: |
|
after_dollar = text.split("$")[1] |
|
rows = after_dollar.split("\n") |
|
number = float(simple_eval(rows[0])) |
|
divisor = 1 |
|
if len(rows) > 1 and "/" in rows[1]: |
|
divisor = int(rows[1].replace("/", "")) |
|
if divisor % 3 == 0: |
|
number += 0.3 |
|
if divisor == 0: |
|
raise ValueError("divisor = 0") |
|
return round(number/divisor, 2) |
|
except: |
|
return None |
|
|
|
|
|
def get_summary_bubble(title: str, payer: str, borrowers: list): |
|
total = 0.0 |
|
borrower_contents = [] |
|
|
|
for borrower in borrowers: |
|
amount = float(borrower.get("amount")) |
|
total += amount |
|
borrower_contents.append({ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": borrower.get("name"), |
|
"size": "sm", |
|
"color": "#555555", |
|
"flex": 0 |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${round(amount,2)}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end" |
|
} |
|
] |
|
}) |
|
|
|
return { |
|
"type": "bubble", |
|
"body": { |
|
"type": "box", |
|
"layout": "vertical", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "對帳明細", |
|
"weight": "bold", |
|
"color": "#1DB446", |
|
"size": "sm" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": title, |
|
"weight": "bold", |
|
"size": "xxl", |
|
"wrap": True, |
|
"margin": "md" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"由 {payer} 付款", |
|
"size": "sm", |
|
"color": "#aaaaaa", |
|
"wrap": True, |
|
"weight": "bold" |
|
}, |
|
{ |
|
"type": "separator", |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "vertical", |
|
"margin": "lg", |
|
"spacing": "sm", |
|
"contents": borrower_contents |
|
}, |
|
{ |
|
"type": "separator", |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "合計", |
|
"size": "sm", |
|
"color": "#555555" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${round(total,2)}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end", |
|
"weight": "bold" |
|
} |
|
], |
|
"margin": "lg" |
|
} |
|
] |
|
}, |
|
"styles": { |
|
"footer": { |
|
"separator": True |
|
} |
|
} |
|
} |
|
|
|
|
|
def get_checkout_bubble(checkout_list: list): |
|
carousel_contents = [] |
|
|
|
checkout_total = 0.0 |
|
checkout_contents = [] |
|
checkout_items = {key: value for key, value in message_list.items() if not value.get("quoted_msg_id")} |
|
for _, value in checkout_items.items(): |
|
msg_text = value.get("msg_text") |
|
amount = round(float(value.get("amount")), 2) |
|
checkout_total += amount |
|
|
|
if msg_text: |
|
msg_text = msg_text.split("$")[0] |
|
|
|
checkout_contents.append({ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": msg_text, |
|
"size": "sm", |
|
"color": "#555555", |
|
"flex": 0 |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${amount}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end" |
|
} |
|
], |
|
"margin": "sm" |
|
}) |
|
|
|
carousel_contents.append({ |
|
"type": "bubble", |
|
"body": { |
|
"type": "box", |
|
"layout": "vertical", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "結算明細", |
|
"weight": "bold", |
|
"color": "#1DB446", |
|
"size": "sm" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": "此次結算包含:", |
|
"size": "sm", |
|
"color": "#aaaaaa", |
|
"wrap": True, |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "vertical", |
|
"margin": "md", |
|
"spacing": "sm", |
|
"contents": checkout_contents}, |
|
{ |
|
"type": "separator", |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "合計", |
|
"size": "sm", |
|
"color": "#555555" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${round(checkout_total,2)}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end", |
|
"weight": "bold" |
|
} |
|
], |
|
"margin": "lg" |
|
} |
|
] |
|
}, |
|
"styles": { |
|
"footer": { |
|
"separator": True |
|
} |
|
} |
|
} |
|
) |
|
|
|
current_user = checkout_list[0].get("from") |
|
current_total = 0.0 |
|
current_pay_to_contents = [] |
|
|
|
checkout_list.append({"from": None, "to": None, "amount": 0}) |
|
|
|
for item in checkout_list: |
|
|
|
if current_user != item.get("from"): |
|
carousel_contents.append({ |
|
"type": "bubble", |
|
"body": { |
|
"type": "box", |
|
"layout": "vertical", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "結算明細", |
|
"weight": "bold", |
|
"color": "#1DB446", |
|
"size": "sm" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": current_user, |
|
"weight": "bold", |
|
"size": "xxl", |
|
"wrap": True, |
|
"margin": "md" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": "需付款給:", |
|
"size": "sm", |
|
"color": "#aaaaaa", |
|
"wrap": True, |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "vertical", |
|
"margin": "md", |
|
"spacing": "sm", |
|
"contents": current_pay_to_contents}, |
|
{ |
|
"type": "separator", |
|
"margin": "lg" |
|
}, |
|
{ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": "合計", |
|
"size": "sm", |
|
"color": "#555555" |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${round(current_total,2)}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end", |
|
"weight": "bold" |
|
} |
|
], |
|
"margin": "lg" |
|
} |
|
] |
|
}, |
|
"styles": { |
|
"footer": { |
|
"separator": True |
|
} |
|
} |
|
} |
|
) |
|
|
|
current_user = item.get("from") |
|
current_total = 0.0 |
|
current_pay_to_contents = [] |
|
|
|
amount = round(float(item.get("amount")), 2) |
|
current_total += amount |
|
|
|
current_pay_to_contents.append({ |
|
"type": "box", |
|
"layout": "horizontal", |
|
"contents": [ |
|
{ |
|
"type": "text", |
|
"text": item.get("to"), |
|
"size": "sm", |
|
"color": "#555555", |
|
"flex": 0 |
|
}, |
|
{ |
|
"type": "text", |
|
"text": f"${amount}", |
|
"size": "sm", |
|
"color": "#111111", |
|
"align": "end" |
|
} |
|
] |
|
},) |
|
|
|
return { |
|
"type": "carousel", |
|
"contents": carousel_contents |
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |
|
|