yuripeyamashita's picture
Update app.py
5c18dcf verified
raw
history blame
3.5 kB
import os
import re
import json
import requests
from flask import Flask, request
from simpleeval import simple_eval
app = Flask(__name__)
GROUP_ID = os.environ.get("GROUP_ID")
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN")
message_list = {}
@app.route('/', methods=['GET'])
def index():
return {}, 200
@app.route("/api/", methods=["POST"])
def api():
try:
payload = get_payload_dict(request.get_json())
if payload.get("group_id") != GROUP_ID:
raise ValueError("Invalid Group")
if payload.get("unsend_msg_id"):
message_list.pop(payload.get("unsend_msg_id"), None)
raise ValueError("Unsend Success")
if "$" not in payload.get("msg_text"):
raise ValueError("Keyword not Found")
if get_amount(payload.get("msg_text")) == None:
raise ValueError("Amount is None")
if payload.get("quoted_msg_id"):
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quoted_msg_id": payload.get("quoted_msg_id")}
else:
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text"))}
# username = get_username(payload)
# text = payload.get("msg_text")
# print(username)
# send_text(payload.get("token"), f"{username}:{text}")
except Exception as e:
print(f"An error occurred: {e}")
print(message_list)
print()
return "", 200
def get_payload_dict(raw_payload) -> dict:
# print(raw_payload)
events = raw_payload.get("events", [{}])[0]
return {"token": events.get("replyToken"),
"group_id": events.get("source", {}).get("groupId"),
"user_id": events.get("source", {}).get("userId"),
"msg_type": events.get("message", {}).get("type"),
"msg_id": events.get("message", {}).get("id"),
"msg_text": events.get("message", {}).get("text"),
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"),
"unsend_msg_id": events.get("unsend", {}).get("messageId")}
def send_text(token: str, text: str):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{"type": "text", "text": text}]
})
def get_username(user_id: str):
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("displayName")
except:
return "Unknow"
def get_username() -> list:
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/ids"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("memberIds")
except:
return []
def get_amount(text: str) -> float | None:
try:
return round(float(simple_eval(text.split("$")[1])), 2)
except:
return None
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)