Spaces:
Running
Running
import argparse | |
import code | |
import datetime | |
import json | |
import os | |
import oss2 | |
from pytz import timezone | |
import time | |
import pandas as pd # pandas>=2.0.3 | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from tqdm import tqdm | |
NUM_SERVERS = 1 | |
LOG_ROOT_DIR = os.getenv("LOG_ROOT_DIR", "") | |
OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "") | |
OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "") | |
OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "") | |
OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "") | |
OSS_FILE_PREFIX = "logs/vote_log/" | |
auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) | |
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) | |
def get_log_files(bucket, max_num_files=None): | |
""" | |
Fetch log file paths from OSS, sorted by last modified timestamp. | |
:param bucket: oss2.Bucket instance | |
:param max_num_files: Maximum number of files to return | |
:return: List of log file paths (sorted by timestamp) | |
""" | |
# List objects in the OSS bucket with the specified prefix | |
filenames = [] | |
for obj in oss2.ObjectIterator(bucket, prefix=OSS_FILE_PREFIX): | |
if obj.key.endswith("-conv.json"): # Filter log files by extension | |
filenames.append((obj.key, obj.last_modified)) | |
# Sort filenames by the last modified timestamp | |
filenames = sorted(filenames, key=lambda x: x[1]) | |
# Extract only the file paths (keys) | |
filenames = [x[0] for x in filenames] | |
# Apply the max_num_files limit if specified | |
max_num_files = max_num_files or len(filenames) | |
filenames = filenames[-max_num_files:] | |
return filenames | |
def load_log_files(filename): | |
data = [] | |
for retry in range(5): | |
try: | |
lines = open(filename).readlines() | |
break | |
except FileNotFoundError: | |
time.sleep(2) | |
for l in lines: | |
row = json.loads(l) | |
data.append( | |
dict( | |
type=row["type"], | |
tstamp=row["tstamp"], | |
model=row.get("model", ""), | |
models=row.get("models", ["", ""]), | |
) | |
) | |
return data | |
def load_log_files_parallel(log_files, num_threads=16): | |
data_all = [] | |
from multiprocessing import Pool | |
with Pool(num_threads) as p: | |
ret_all = list(tqdm(p.imap(load_log_files, log_files), total=len(log_files))) | |
for ret in ret_all: | |
data_all.extend(ret) | |
return data_all | |
def load_log_files_from_oss(bucket, filename): | |
""" | |
Load log data from a file stored in OSS. | |
:param bucket: oss2.Bucket instance | |
:param filename: Path to the file in OSS | |
:return: Parsed log data as a list of dictionaries | |
""" | |
data = [] | |
for retry in range(5): | |
try: | |
# Read the file from OSS | |
result = bucket.get_object(filename) | |
lines = result.read().decode('utf-8').splitlines() # Read file content and split into lines | |
break | |
except oss2.exceptions.NoSuchKey: | |
print(f"File not found in OSS: {filename}, retrying ({retry + 1}/5)...") | |
time.sleep(2) | |
except Exception as e: | |
print(f"Error reading file {filename} from OSS: {e}") | |
time.sleep(2) | |
for line in lines: | |
row = json.loads(line) | |
data.append( | |
dict( | |
type=row["type"], | |
tstamp=row["tstamp"], | |
model=row.get("model", ""), | |
models=row.get("models", ["", ""]), | |
) | |
) | |
return data | |
def load_log_files_parallel_from_oss(bucket, log_files, num_threads=16): | |
""" | |
Load log files from OSS in parallel using multiple threads. | |
:param bucket: oss2.Bucket instance | |
:param log_files: List of log file paths in OSS | |
:param num_threads: Number of threads to use for parallel loading | |
:return: Combined log data from all files | |
""" | |
data_all = [] | |
from multiprocessing import Pool | |
from functools import partial | |
# Partial function to include the bucket in the function arguments | |
load_function = partial(load_log_files_from_oss, bucket) | |
# Parallel processing using multiple threads | |
with Pool(num_threads) as p: | |
ret_all = list(tqdm(p.imap(load_function, log_files), total=len(log_files))) | |
for ret in ret_all: | |
data_all.extend(ret) | |
return data_all | |
def get_anony_vote_df(df): | |
anony_vote_df = df[ | |
df["type"].isin(["leftvote", "rightvote", "tievote", "bothbad_vote"]) | |
] | |
anony_vote_df = anony_vote_df[anony_vote_df["models"].apply(lambda x: x[0] == "")] | |
return anony_vote_df | |
def merge_counts(series, on, names): | |
ret = pd.merge(series[0], series[1], on=on) | |
for i in range(2, len(series)): | |
ret = pd.merge(ret, series[i], on=on) | |
ret = ret.reset_index() | |
old_names = list(ret.columns)[-len(series) :] | |
rename = {old_name: new_name for old_name, new_name in zip(old_names, names)} | |
ret = ret.rename(columns=rename) | |
return ret | |
def report_basic_stats(bucket, log_files): | |
df_all = load_log_files_parallel_from_oss(bucket, log_files) | |
df_all = pd.DataFrame(df_all) | |
now_t = df_all["tstamp"].max() | |
df_1_hour = df_all[df_all["tstamp"] > (now_t - 3600)] | |
df_1_day = df_all[df_all["tstamp"] > (now_t - 3600 * 24)] | |
anony_vote_df_all = get_anony_vote_df(df_all) | |
# Chat trends | |
chat_dates = [ | |
datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( | |
"%Y-%m-%d" | |
) | |
for x in df_all[df_all["type"] == "chat"]["tstamp"] | |
] | |
chat_dates_counts = pd.value_counts(chat_dates) | |
vote_dates = [ | |
datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( | |
"%Y-%m-%d" | |
) | |
for x in anony_vote_df_all["tstamp"] | |
] | |
vote_dates_counts = pd.value_counts(vote_dates) | |
chat_dates_bar = go.Figure( | |
data=[ | |
go.Bar( | |
name="Anony. Vote", | |
x=vote_dates_counts.index, | |
y=vote_dates_counts, | |
text=[f"{val:.0f}" for val in vote_dates_counts], | |
textposition="auto", | |
), | |
go.Bar( | |
name="Chat", | |
x=chat_dates_counts.index, | |
y=chat_dates_counts, | |
text=[f"{val:.0f}" for val in chat_dates_counts], | |
textposition="auto", | |
), | |
] | |
) | |
chat_dates_bar.update_layout( | |
barmode="stack", | |
xaxis_title="Dates", | |
yaxis_title="Count", | |
height=300, | |
width=1200, | |
) | |
# Model call counts | |
model_hist_all = df_all[df_all["type"] == "chat"]["model"].value_counts() | |
model_hist_1_day = df_1_day[df_1_day["type"] == "chat"]["model"].value_counts() | |
model_hist_1_hour = df_1_hour[df_1_hour["type"] == "chat"]["model"].value_counts() | |
model_hist = merge_counts( | |
[model_hist_all, model_hist_1_day, model_hist_1_hour], | |
on="model", | |
names=["All", "Last Day", "Last Hour"], | |
) | |
model_hist_md = model_hist.to_markdown(index=False, tablefmt="github") | |
# Action counts | |
action_hist_all = df_all["type"].value_counts() | |
action_hist_1_day = df_1_day["type"].value_counts() | |
action_hist_1_hour = df_1_hour["type"].value_counts() | |
action_hist = merge_counts( | |
[action_hist_all, action_hist_1_day, action_hist_1_hour], | |
on="type", | |
names=["All", "Last Day", "Last Hour"], | |
) | |
action_hist_md = action_hist.to_markdown(index=False, tablefmt="github") | |
# Anony vote counts | |
anony_vote_hist_all = anony_vote_df_all["type"].value_counts() | |
anony_vote_df_1_day = get_anony_vote_df(df_1_day) | |
anony_vote_hist_1_day = anony_vote_df_1_day["type"].value_counts() | |
# anony_vote_df_1_hour = get_anony_vote_df(df_1_hour) | |
# anony_vote_hist_1_hour = anony_vote_df_1_hour["type"].value_counts() | |
anony_vote_hist = merge_counts( | |
[anony_vote_hist_all, anony_vote_hist_1_day], | |
on="type", | |
names=["All", "Last Day"], | |
) | |
anony_vote_hist_md = anony_vote_hist.to_markdown(index=False, tablefmt="github") | |
# Last 24 hours | |
chat_1_day = df_1_day[df_1_day["type"] == "chat"] | |
num_chats_last_24_hours = [] | |
base = df_1_day["tstamp"].min() | |
for i in range(24, 0, -1): | |
left = base + (i - 1) * 3600 | |
right = base + i * 3600 | |
num = ((chat_1_day["tstamp"] >= left) & (chat_1_day["tstamp"] < right)).sum() | |
num_chats_last_24_hours.append(num) | |
times = [ | |
datetime.datetime.fromtimestamp( | |
base + i * 3600, tz=timezone("US/Pacific") | |
).strftime("%Y-%m-%d %H:%M:%S %Z") | |
for i in range(24, 0, -1) | |
] | |
last_24_hours_df = pd.DataFrame({"time": times, "value": num_chats_last_24_hours}) | |
last_24_hours_md = last_24_hours_df.to_markdown(index=False, tablefmt="github") | |
# Last update datetime | |
last_updated_tstamp = now_t | |
last_updated_datetime = datetime.datetime.fromtimestamp( | |
last_updated_tstamp, tz=timezone("US/Pacific") | |
).strftime("%Y-%m-%d %H:%M:%S %Z") | |
# code.interact(local=locals()) | |
return { | |
"chat_dates_bar": chat_dates_bar, | |
"model_hist_md": model_hist_md, | |
"action_hist_md": action_hist_md, | |
"anony_vote_hist_md": anony_vote_hist_md, | |
"num_chats_last_24_hours": last_24_hours_md, | |
"last_updated_datetime": last_updated_datetime, | |
} | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--max-num-files", type=int) | |
args = parser.parse_args() | |
log_files = get_log_files(bucket, args.max_num_files) | |
basic_stats = report_basic_stats(bucket, log_files) | |
print(basic_stats["action_hist_md"] + "\n") | |
print(basic_stats["model_hist_md"] + "\n") | |
print(basic_stats["anony_vote_hist_md"] + "\n") | |
print(basic_stats["num_chats_last_24_hours"] + "\n") | |