import argparse import code import datetime import json import os import oss2 from pytz import timezone import time import pandas as pd # pandas>=2.0.3 import plotly.express as px import plotly.graph_objects as go from tqdm import tqdm NUM_SERVERS = 1 LOG_ROOT_DIR = os.getenv("LOG_ROOT_DIR", "") OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "") OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "") OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "") OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "") OSS_FILE_PREFIX = "logs/vote_log/" auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) def get_log_files(bucket, max_num_files=None): """ Fetch log file paths from OSS, sorted by last modified timestamp. :param bucket: oss2.Bucket instance :param max_num_files: Maximum number of files to return :return: List of log file paths (sorted by timestamp) """ # List objects in the OSS bucket with the specified prefix filenames = [] for obj in oss2.ObjectIterator(bucket, prefix=OSS_FILE_PREFIX): if obj.key.endswith("-conv.json"): # Filter log files by extension filenames.append((obj.key, obj.last_modified)) # Sort filenames by the last modified timestamp filenames = sorted(filenames, key=lambda x: x[1]) # Extract only the file paths (keys) filenames = [x[0] for x in filenames] # Apply the max_num_files limit if specified max_num_files = max_num_files or len(filenames) filenames = filenames[-max_num_files:] return filenames def load_log_files(filename): data = [] for retry in range(5): try: lines = open(filename).readlines() break except FileNotFoundError: time.sleep(2) for l in lines: row = json.loads(l) data.append( dict( type=row["type"], tstamp=row["tstamp"], model=row.get("model", ""), models=row.get("models", ["", ""]), ) ) return data def load_log_files_parallel(log_files, num_threads=16): data_all = [] from multiprocessing import Pool with Pool(num_threads) as p: ret_all = list(tqdm(p.imap(load_log_files, log_files), total=len(log_files))) for ret in ret_all: data_all.extend(ret) return data_all def load_log_files_from_oss(bucket, filename): """ Load log data from a file stored in OSS. :param bucket: oss2.Bucket instance :param filename: Path to the file in OSS :return: Parsed log data as a list of dictionaries """ data = [] for retry in range(5): try: # Read the file from OSS result = bucket.get_object(filename) lines = result.read().decode('utf-8').splitlines() # Read file content and split into lines break except oss2.exceptions.NoSuchKey: print(f"File not found in OSS: {filename}, retrying ({retry + 1}/5)...") time.sleep(2) except Exception as e: print(f"Error reading file {filename} from OSS: {e}") time.sleep(2) for line in lines: row = json.loads(line) data.append( dict( type=row["type"], tstamp=row["tstamp"], model=row.get("model", ""), models=row.get("models", ["", ""]), ) ) return data def load_log_files_parallel_from_oss(bucket, log_files, num_threads=16): """ Load log files from OSS in parallel using multiple threads. :param bucket: oss2.Bucket instance :param log_files: List of log file paths in OSS :param num_threads: Number of threads to use for parallel loading :return: Combined log data from all files """ data_all = [] from multiprocessing import Pool from functools import partial # Partial function to include the bucket in the function arguments load_function = partial(load_log_files_from_oss, bucket) # Parallel processing using multiple threads with Pool(num_threads) as p: ret_all = list(tqdm(p.imap(load_function, log_files), total=len(log_files))) for ret in ret_all: data_all.extend(ret) return data_all def get_anony_vote_df(df): anony_vote_df = df[ df["type"].isin(["leftvote", "rightvote", "tievote", "bothbad_vote"]) ] anony_vote_df = anony_vote_df[anony_vote_df["models"].apply(lambda x: x[0] == "")] return anony_vote_df def merge_counts(series, on, names): ret = pd.merge(series[0], series[1], on=on) for i in range(2, len(series)): ret = pd.merge(ret, series[i], on=on) ret = ret.reset_index() old_names = list(ret.columns)[-len(series) :] rename = {old_name: new_name for old_name, new_name in zip(old_names, names)} ret = ret.rename(columns=rename) return ret def report_basic_stats(bucket, log_files): df_all = load_log_files_parallel_from_oss(bucket, log_files) df_all = pd.DataFrame(df_all) now_t = df_all["tstamp"].max() df_1_hour = df_all[df_all["tstamp"] > (now_t - 3600)] df_1_day = df_all[df_all["tstamp"] > (now_t - 3600 * 24)] anony_vote_df_all = get_anony_vote_df(df_all) # Chat trends chat_dates = [ datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( "%Y-%m-%d" ) for x in df_all[df_all["type"] == "chat"]["tstamp"] ] chat_dates_counts = pd.value_counts(chat_dates) vote_dates = [ datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( "%Y-%m-%d" ) for x in anony_vote_df_all["tstamp"] ] vote_dates_counts = pd.value_counts(vote_dates) chat_dates_bar = go.Figure( data=[ go.Bar( name="Anony. Vote", x=vote_dates_counts.index, y=vote_dates_counts, text=[f"{val:.0f}" for val in vote_dates_counts], textposition="auto", ), go.Bar( name="Chat", x=chat_dates_counts.index, y=chat_dates_counts, text=[f"{val:.0f}" for val in chat_dates_counts], textposition="auto", ), ] ) chat_dates_bar.update_layout( barmode="stack", xaxis_title="Dates", yaxis_title="Count", height=300, width=1200, ) # Model call counts model_hist_all = df_all[df_all["type"] == "chat"]["model"].value_counts() model_hist_1_day = df_1_day[df_1_day["type"] == "chat"]["model"].value_counts() model_hist_1_hour = df_1_hour[df_1_hour["type"] == "chat"]["model"].value_counts() model_hist = merge_counts( [model_hist_all, model_hist_1_day, model_hist_1_hour], on="model", names=["All", "Last Day", "Last Hour"], ) model_hist_md = model_hist.to_markdown(index=False, tablefmt="github") # Action counts action_hist_all = df_all["type"].value_counts() action_hist_1_day = df_1_day["type"].value_counts() action_hist_1_hour = df_1_hour["type"].value_counts() action_hist = merge_counts( [action_hist_all, action_hist_1_day, action_hist_1_hour], on="type", names=["All", "Last Day", "Last Hour"], ) action_hist_md = action_hist.to_markdown(index=False, tablefmt="github") # Anony vote counts anony_vote_hist_all = anony_vote_df_all["type"].value_counts() anony_vote_df_1_day = get_anony_vote_df(df_1_day) anony_vote_hist_1_day = anony_vote_df_1_day["type"].value_counts() # anony_vote_df_1_hour = get_anony_vote_df(df_1_hour) # anony_vote_hist_1_hour = anony_vote_df_1_hour["type"].value_counts() anony_vote_hist = merge_counts( [anony_vote_hist_all, anony_vote_hist_1_day], on="type", names=["All", "Last Day"], ) anony_vote_hist_md = anony_vote_hist.to_markdown(index=False, tablefmt="github") # Last 24 hours chat_1_day = df_1_day[df_1_day["type"] == "chat"] num_chats_last_24_hours = [] base = df_1_day["tstamp"].min() for i in range(24, 0, -1): left = base + (i - 1) * 3600 right = base + i * 3600 num = ((chat_1_day["tstamp"] >= left) & (chat_1_day["tstamp"] < right)).sum() num_chats_last_24_hours.append(num) times = [ datetime.datetime.fromtimestamp( base + i * 3600, tz=timezone("US/Pacific") ).strftime("%Y-%m-%d %H:%M:%S %Z") for i in range(24, 0, -1) ] last_24_hours_df = pd.DataFrame({"time": times, "value": num_chats_last_24_hours}) last_24_hours_md = last_24_hours_df.to_markdown(index=False, tablefmt="github") # Last update datetime last_updated_tstamp = now_t last_updated_datetime = datetime.datetime.fromtimestamp( last_updated_tstamp, tz=timezone("US/Pacific") ).strftime("%Y-%m-%d %H:%M:%S %Z") # code.interact(local=locals()) return { "chat_dates_bar": chat_dates_bar, "model_hist_md": model_hist_md, "action_hist_md": action_hist_md, "anony_vote_hist_md": anony_vote_hist_md, "num_chats_last_24_hours": last_24_hours_md, "last_updated_datetime": last_updated_datetime, } if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--max-num-files", type=int) args = parser.parse_args() log_files = get_log_files(bucket, args.max_num_files) basic_stats = report_basic_stats(bucket, log_files) print(basic_stats["action_hist_md"] + "\n") print(basic_stats["model_hist_md"] + "\n") print(basic_stats["anony_vote_hist_md"] + "\n") print(basic_stats["num_chats_last_24_hours"] + "\n")