File size: 4,599 Bytes
fa6e3b2 bf00a95 fa6e3b2 822d179 fa6e3b2 8300a7c 6a17841 d5e4b99 6a17841 fa6e3b2 77bb999 2ba3143 eb1cacb fa6e3b2 2ba3143 da3b0f7 35130c5 53558ec da3b0f7 6abbd80 8d13b89 da3b0f7 8d13b89 da3b0f7 9065332 2ba3143 5c18dcf fa6e3b2 eb1cacb 6a17841 fa6e3b2 af688b0 fa6e3b2 6a17841 fa6e3b2 6a17841 4bc235e fa6e3b2 13edd55 0a39678 1bb9b5d 9da9263 fa6e3b2 35130c5 fa6e3b2 9da9263 fa6e3b2 27407e5 fa6e3b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import os
import re
import json
import requests
from flask import Flask, request
from simpleeval import simple_eval
app = Flask(__name__)
GROUP_ID = os.environ.get("GROUP_ID")
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN")
message_list = {}
@app.route('/', methods=['GET'])
def index():
return {}, 200
@app.route("/api/", methods=["POST"])
def api():
global message_list
try:
payload = get_payload_dict(request.get_json())
if payload.get("group_id") != GROUP_ID:
raise ValueError("Invalid Group")
if payload.get("unsend_msg_id"):
unsend_msg_id = payload.get("unsend_msg_id")
message_list.pop(unsend_msg_id, None)
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id}
raise ValueError("Unsend Success")
if "$$$" in payload.get("msg_text"):
if "計算" in payload.get("msg_text"):
users_number = get_users_number()
users = list({item["user_id"] for item in message_list.values()})
if len(users) != users_number:
users.append("others")
print(users)
matrix = [[0 for _ in range(len(users))] for _ in range(len(users))]
print(matrix)
for msg_id in message_list:
print([key for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id])
# users.index("b")
# username_list = get_username_list()
# print(username_list)
# for user_id in username_list:
# print(get_username(user_id))
raise ValueError("Action Success")
if "$" not in payload.get("msg_text"):
raise ValueError("Keyword not Found")
if get_amount(payload.get("msg_text")) == None:
raise ValueError("Amount is None")
if payload.get("quoted_msg_id"):
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quoted_msg_id": payload.get("quoted_msg_id")}
else:
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text"))}
# username = get_username(payload)
# text = payload.get("msg_text")
# print(username)
# send_text(payload.get("token"), f"{username}:{text}")
except Exception as e:
print(f"An error occurred: {e}")
print(message_list)
print()
return "", 200
def get_payload_dict(raw_payload) -> dict:
# print(raw_payload)
events = raw_payload.get("events", [{}])[0]
return {"token": events.get("replyToken"),
"group_id": events.get("source", {}).get("groupId"),
"user_id": events.get("source", {}).get("userId"),
"msg_type": events.get("message", {}).get("type"),
"msg_id": events.get("message", {}).get("id"),
"msg_text": events.get("message", {}).get("text"),
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"),
"unsend_msg_id": events.get("unsend", {}).get("messageId")}
def send_text(token: str, text: str):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{"type": "text", "text": text}]
})
def get_username(user_id: str):
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("displayName")
except:
return "Unknow"
def get_users_number() -> int:
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return int(res_json.get("count"))
except:
return 0
def get_amount(text: str) -> float | None:
try:
return round(float(simple_eval(text.split("$")[1])), 2)
except:
return None
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)
|