Spaces:
Runtime error
Runtime error
import collections | |
import os | |
from datetime import datetime, timedelta | |
import json | |
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer | |
from urllib.parse import parse_qs, urlparse | |
from huggingface_hub import list_datasets, set_access_token, HfFolder | |
from datasets import load_dataset, DatasetDict, Dataset | |
import numpy as np | |
HF_TOKEN = os.environ['HF_TOKEN'] | |
set_access_token(HF_TOKEN) | |
HfFolder.save_token(HF_TOKEN) | |
datasets = { | |
"stars": load_dataset("open-source-metrics/preprocessed_stars"), | |
"issues": load_dataset("open-source-metrics/preprocessed_issues"), | |
"pip": load_dataset("open-source-metrics/preprocessed_pip").sort('day'), | |
} | |
external_datasets = { | |
"pip": load_dataset("open-source-metrics/pip-external").sort('day'), | |
"stars": load_dataset("open-source-metrics/stars-external"), | |
"issues": load_dataset("open-source-metrics/issues-external") | |
} | |
def cut_output(full_output: Dataset, library_names: list): | |
output = full_output.to_dict().items() | |
output = {k: v + [None] for k, v in output if k in library_names + ['day']} | |
last_value = max(output[k].index(None) for k in output.keys() if k != 'day') | |
return {k: v[:last_value] for k, v in output.items()} | |
def parse_name_and_options(path): | |
url = urlparse(path) | |
query = parse_qs(url.query) | |
library_names = query.get("input", None)[0] | |
library_names = library_names.split(',') | |
options = query.get("options", None)[0] | |
options = options.split(',') | |
return library_names, options | |
class RequestHandler(SimpleHTTPRequestHandler): | |
def do_GET(self): | |
print(self.path) | |
if self.path == "/": | |
self.path = "index.html" | |
return SimpleHTTPRequestHandler.do_GET(self) | |
if self.path.startswith("/initialize"): | |
dataset_with_most_splits = max(datasets['stars'].column_names.values(), key=len) | |
if 'day' in dataset_with_most_splits: | |
dataset_with_most_splits.remove('day') | |
external_dataset_keys = {k: set(v.keys()) for k, v in external_datasets.items()} | |
external_dataset_with_most_splits = max([d for d in external_dataset_keys.values()], key=len) | |
for external in external_dataset_with_most_splits: | |
dataset_with_most_splits.remove(external) | |
warnings = [] | |
print("Initializing ...") | |
for k, v in external_dataset_keys.items(): | |
if len(v) < len(external_dataset_with_most_splits): | |
warnings.append( | |
f"The {k} external dataset does not contain all splits. Missing: {external_dataset_with_most_splits - v}" | |
f".\nSelecting that split to show the pip install numbers will not work." | |
) | |
dataset_with_most_splits = list(dataset_with_most_splits) | |
dataset_with_most_splits.sort() | |
external_dataset_with_most_splits = list(external_dataset_with_most_splits) | |
external_dataset_with_most_splits.sort() | |
res = { | |
'internal': dataset_with_most_splits, | |
'external': external_dataset_with_most_splits, | |
'warnings': warnings | |
} | |
print(f"Returning: {res}") | |
return self.response(res) | |
if self.path.startswith("/retrievePipInstalls"): | |
errors = [] | |
library_names, options = parse_name_and_options(self.path) | |
cumulated = '1' in options | |
week_over_week = '2' in options | |
def sum_of_lists(lists): | |
def _sum(items): | |
while None in items: | |
items.remove(None) | |
return sum(items) | |
return [_sum(list(a)) for a in zip(*lists)] | |
if week_over_week: | |
if cumulated: | |
cumulated_dict = { | |
'Cumulated': sum_of_lists([v for k, v in datasets['pip']['wow'].to_dict().items() if k in library_names]), | |
'day': datasets['pip']['wow'].to_dict()['day'] | |
} | |
return self.response(cumulated_dict) | |
else: | |
return self.response({k: v for k, v in datasets['pip']['wow'].to_dict().items() if k in library_names + ['day']}) | |
else: | |
if cumulated: | |
cumulated_dict = { | |
'Cumulated': sum_of_lists([v for k, v in datasets['pip']['raw'].to_dict().items() if k in library_names]), | |
'day': datasets['pip']['raw'].to_dict()['day'] | |
} | |
return self.response(cumulated_dict) | |
else: | |
return self.response({k: v for k, v in datasets['pip']['raw'].to_dict().items() if k in library_names + ['day']}) | |
if self.path.startswith("/retrieveStars"): | |
library_names, options = parse_name_and_options(self.path) | |
week_over_week = '1' in options | |
if week_over_week: | |
return self.response({k: v for k, v in datasets['stars']['wow'].to_dict().items() if k in library_names + ['day']}) | |
else: | |
return self.response({k: v for k, v in datasets['stars']['raw'].to_dict().items() if k in library_names + ['day']}) | |
if self.path.startswith("/retrieveIssues"): | |
library_names, options = parse_name_and_options(self.path) | |
exclude_org_members = '1' in options | |
week_over_week = '2' in options | |
if week_over_week: | |
if exclude_org_members: | |
return self.response(cut_output(datasets['issues']['eom_wow'], library_names)) | |
else: | |
return self.response({k: v for k, v in datasets['issues']['wow'].to_dict().items() if k in library_names + ['day']}) | |
else: | |
if exclude_org_members: | |
return self.response({k: v for k, v in datasets['issues']['eom'].to_dict().items() if k in library_names + ['day']}) | |
else: | |
return self.response({k: v for k, v in datasets['issues']['raw'].to_dict().items() if k in library_names + ['day']}) | |
return SimpleHTTPRequestHandler.do_GET(self) | |
def response(self, output): | |
self.send_response(200) | |
self.send_header("Content-Type", "application/json") | |
self.end_headers() | |
self.wfile.write(json.dumps(output).encode("utf-8")) | |
return SimpleHTTPRequestHandler | |
server = ThreadingHTTPServer(("", 7860), RequestHandler) | |
print("Running on port 7860") | |
server.serve_forever() | |