import base64
import datetime
import io
import json
import re
from collections import Counter
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import tiktoken
import yaml
from openai import OpenAI
def extract_json_from_response(text: str) -> str:
"""Extract JSON from a response that might contain markdown code blocks."""
# Try to find JSON within code blocks first
json_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if json_match:
return json_match.group(1)
# If no code blocks, try to find raw JSON
json_match = re.search(r"\{.*\}", text, re.DOTALL)
if json_match:
return json_match.group(0)
# If no JSON found, return the original text
return text
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""Count tokens in text using tiktoken."""
try:
encoder = tiktoken.encoding_for_model(model)
return len(encoder.encode(str(text)))
except Exception as e:
print(f"Error counting tokens: {e}")
return 0
def create_distribution_plot(data, column):
"""Create a distribution plot using Plotly and convert to image."""
try:
# Check if the column contains lists
if isinstance(data[column].iloc[0], list):
print(f"Processing list column: {column}")
value_counts = flatten_list_column(data, column)
fig = go.Figure(
[
go.Bar(
x=value_counts.index,
y=value_counts.values,
marker=dict(
color=value_counts.values,
colorscale=px.colors.sequential.Plotly3,
),
)
]
)
else:
if data[column].dtype in ["int64", "float64"]:
# Continuous data - use histogram
fig = go.Figure()
fig.add_trace(
go.Histogram(
x=data[column],
name="Count",
nbinsx=30,
marker=dict(
color="rgba(110, 68, 255, 0.7)",
line=dict(color="rgba(184, 146, 255, 1)", width=1),
),
)
)
else:
# Categorical data
value_counts = data[column].value_counts()
fig = go.Figure(
[
go.Bar(
x=value_counts.index,
y=value_counts.values,
marker=dict(
color=value_counts.values,
colorscale=px.colors.sequential.Plotly3,
),
)
]
)
# Common layout updates
fig.update_layout(
title=dict(text=f"Distribution of {column}", x=0.5, y=0.95),
xaxis_title=column,
yaxis_title="Count",
template="plotly_white",
margin=dict(t=50, l=50, r=30, b=50),
width=600,
height=400,
showlegend=False,
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
)
# Rotate x-axis labels if needed
if isinstance(data[column].iloc[0], list) or data[column].dtype not in [
"int64",
"float64",
]:
fig.update_layout(xaxis_tickangle=-45)
# Update grid style
fig.update_yaxes(gridcolor="rgba(128,128,128,0.1)", gridwidth=1)
fig.update_xaxes(gridcolor="rgba(128,128,128,0.1)", gridwidth=1)
# Convert to PNG with moderate resolution
img_bytes = fig.to_image(format="png", scale=1.5)
# Encode to base64
img_base64 = base64.b64encode(img_bytes).decode()
return img_base64
except Exception as e:
print(f"Error creating distribution plot for {column}: {str(e)}")
raise e
def create_wordcloud(data, column):
"""Create a word cloud visualization."""
import matplotlib.pyplot as plt
from wordcloud import WordCloud
try:
# Handle list columns
if isinstance(data[column].iloc[0], list):
text = " ".join(
[
" ".join(map(str, sublist))
for sublist in data[column]
if isinstance(sublist, list)
]
)
else:
# Handle regular columns
text = " ".join(data[column].astype(str))
wordcloud = WordCloud(
width=600,
height=300,
background_color="white",
colormap="plasma",
max_words=100,
).generate(text)
# Create matplotlib figure
plt.figure(figsize=(8, 4))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.title(f"Word Cloud for {column}")
# Save to bytes
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight", dpi=150)
plt.close()
buf.seek(0)
# Convert to base64
img_base64 = base64.b64encode(buf.getvalue()).decode()
return img_base64
except Exception as e:
print(f"Error creating word cloud for {column}: {str(e)}")
raise e
def analyze_dataset_with_openai(client: OpenAI, data) -> dict:
"""Analyze dataset using OpenAI API with improved type inference and efficient sampling."""
# Convert dictionary to DataFrame if needed
if isinstance(data, dict):
df = pd.DataFrame(data)
else:
df = data
# Take a very small sample for efficiency
sample_size = min(3, len(df))
if len(df) > 3:
sample_indices = df.index[
:sample_size
] # Take first 3 rows instead of random sampling
sample_df = df.loc[sample_indices]
else:
sample_df = df
dataset_sample = sample_df.to_dict("records")
single_record = dataset_sample[0]
# Create type hints dictionary - only process the sample
type_hints = {}
for column in sample_df.columns:
# Get the pandas dtype
dtype = sample_df[column].dtype
# Efficiently identify types without complex operations
if pd.api.types.is_integer_dtype(dtype):
type_hints[column] = "integer"
elif pd.api.types.is_float_dtype(dtype):
type_hints[column] = "number"
elif pd.api.types.is_bool_dtype(dtype):
type_hints[column] = "boolean"
elif pd.api.types.is_datetime64_any_dtype(dtype):
type_hints[column] = "datetime"
elif pd.api.types.is_categorical_dtype(dtype):
type_hints[column] = "categorical"
elif pd.api.types.is_string_dtype(dtype):
# Simple check for list-like values
first_val = sample_df[column].iloc[0]
if isinstance(first_val, list):
type_hints[column] = "array"
else:
type_hints[column] = "string"
else:
type_hints[column] = "unknown"
prompt = f"""Analyze this dataset sample and provide the following in a JSON response:
1. A concise description that includes:
- A one-sentence overview of what the dataset contains
- A bullet-pointed list of key features and statistics
- A brief statement about potential ML/AI applications
2. A schema showing each field's type and description. Here is the actual DataFrame type information:
{json.dumps(type_hints, indent=2)}
And here's a single record for reference:
{json.dumps(single_record, indent=2)}
3. A formatted example record
Format your response as JSON with these exact keys:
{{
"description": {{
"overview": "One clear sentence describing the dataset...",
"key_features": [
"Feature or statistic 1",
"Feature or statistic 2"
],
"ml_applications": "Brief statement about ML/AI use cases..."
}},
"schema": {{
"field_name": {{
"type": "use the type from the provided type_hints",
"description": "Description of what this field contains"
}}
}},
"example": {{"key": "value"}}
}}
For context, here are more sample records:
{json.dumps(dataset_sample, indent=2)}
"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000,
)
# Get the response content
response_text = response.choices[0].message.content
# Extract JSON from the response
json_str = extract_json_from_response(response_text)
# Parse the JSON
result = json.loads(json_str)
return result
except Exception as e:
print(f"OpenAI API error: {str(e)}")
return {
"description": {
"overview": "Error analyzing dataset",
"key_features": ["Error: Failed to analyze dataset"],
"ml_applications": "Analysis unavailable",
},
"schema": {},
"example": {},
}
def analyze_dataset_statistics(df):
"""Generate simplified dataset statistics with token counting."""
stats = {
"basic_stats": {
"total_records": len(df),
"total_features": len(df.columns),
"memory_usage": f"{df.memory_usage(deep=True).sum() / (1024*1024):.2f} MB",
},
"token_stats": {"total": 0, "by_column": {}},
}
# Count tokens for each column
for column in df.columns:
try:
if df[column].dtype == "object" or isinstance(df[column].iloc[0], list):
# For list columns, join items into strings
if isinstance(df[column].iloc[0], list):
token_counts = df[column].apply(
lambda x: count_tokens(" ".join(str(item) for item in x))
)
else:
token_counts = df[column].apply(lambda x: count_tokens(str(x)))
total_tokens = int(token_counts.sum())
stats["token_stats"]["total"] += total_tokens
stats["token_stats"]["by_column"][column] = total_tokens
except Exception as e:
print(f"Error processing column {column}: {str(e)}")
continue
return stats
def format_dataset_stats(stats):
"""Format simplified dataset statistics as markdown."""
md = """## Dataset Overview
### Basic Statistics
* Total Records: {total_records:,}
* Total Features: {total_features}
* Memory Usage: {memory_usage}
""".format(
**stats["basic_stats"]
)
# Token Statistics
if stats["token_stats"]["total"] > 0:
md += "\n### Token Info\n"
md += f"* Total Tokens: {stats['token_stats']['total']:,}\n"
if stats["token_stats"]["by_column"]:
md += "\nTokens by Column:\n"
for col, count in stats["token_stats"]["by_column"].items():
md += f"* {col}: {count:,}\n"
return md
def generate_dataset_card(
dataset_info: dict,
distribution_plots: dict,
wordcloud_plots: dict,
openai_analysis: dict,
df: pd.DataFrame,
) -> str:
"""Generate a beautiful and clean dataset card."""
# Basic dataset metadata
yaml_content = {
"language": ["en"],
"license": "apache-2.0",
"multilinguality": "monolingual",
"size_categories": [get_size_category(len(df))],
"task_categories": ["other"],
}
yaml_string = yaml.dump(yaml_content, sort_keys=False)
# Generate dataset statistics
stats = analyze_dataset_statistics(df)
description = openai_analysis["description"]
# Build the markdown content with proper spacing
readme_content = f"""---
{yaml_string}---
# {dataset_info['dataset_name']}
{description['overview']}
### Key Features
{chr(10).join(f'* {feature}' for feature in description['key_features'])}
### Potential Applications
{description['ml_applications']}
## Dataset Statistics
* Total Records: {stats['basic_stats']['total_records']:,}
* Total Features: {stats['basic_stats']['total_features']}
* Memory Usage: {stats['basic_stats']['memory_usage']}
## Dataset Schema
| Field | Type | Description |
| --- | --- | --- |
{chr(10).join(f"| {field} | {info['type']} | {info['description']} |" for field, info in openai_analysis['schema'].items())}
## Example Record
```json
{json.dumps(openai_analysis['example'], indent=2)}
```
## Data Distribution Analysis
The following visualizations show the distribution patterns and characteristics of key features in the dataset:
"""
# Add individual distribution plots with clean spacing
for col, img_str in distribution_plots.items():
readme_content += f"""### Distribution of {col}
"""
# Add word clouds with clean spacing
if wordcloud_plots:
readme_content += "## Feature Word Clouds\n\n"
for col, img_str in wordcloud_plots.items():
readme_content += f"""### Word Cloud for {col}
"""
# Add token statistics if available
if stats.get("token_stats") and stats["token_stats"]["total"] > 0:
readme_content += """## Token Statistics
"""
readme_content += f"* Total Tokens: {stats['token_stats']['total']:,}\n"
if stats["token_stats"].get("by_column"):
readme_content += "\n**Tokens by Column:**\n"
for col, count in stats["token_stats"]["by_column"].items():
readme_content += f"* {col}: {count:,}\n"
# Add citation section
clean_name = dataset_info["dataset_name"].replace("/", "_")
readme_content += f"""
## Citation
```bibtex
@dataset{{{clean_name},
title = {{{dataset_info['dataset_name']}}},
year = {{{datetime.datetime.now().year}}},
publisher = {{Hugging Face}},
url = {{https://huggingface.co/datasets/{dataset_info['dataset_name']}}}
}}
```
### Usage Guidelines
This dataset is released under the Apache 2.0 License. When using this dataset:
* 📚 Cite the dataset using the BibTeX entry above
* 🤝 Consider contributing improvements or reporting issues
* 💡 Share derivative works with the community when possible
"""
return readme_content
def get_size_category(record_count: int) -> str:
"""Determine the size category based on record count."""
if record_count < 1000:
return "n<1K"
elif record_count < 10000:
return "1K1M"
def format_overview_section(analysis: dict, stats: dict) -> str:
"""Create a comprehensive overview section."""
description = analysis["description"]
overview = f"""
{description['overview']}
### Key Features and Characteristics
{chr(10).join(f'* {feature}' for feature in description['key_features'])}
### Potential Applications
{description['ml_applications']}
### Dataset Size
* Total Records: {stats['basic_stats']['total_records']:,}
* Total Features: {stats['basic_stats']['total_features']}
* Memory Usage: {stats['basic_stats']['memory_usage']}
"""
return overview.strip()
def format_schema_section(schema: dict, df: pd.DataFrame) -> str:
"""Generate an enhanced schema section with statistics."""
# Table header
table = "| Field | Type | Description | Non-Null Count | Unique Values |\n"
table += "| --- | --- | --- | --- | --- |\n"
# Generate rows with additional statistics
for field, info in schema.items():
try:
non_null = df[field].count()
unique = df[field].nunique()
row = f"| {field} | {info['type']} | {info['description']} | {non_null:,} | {unique:,} |"
table += row + "\n"
except Exception as e:
print(f"Error processing field {field}: {e}")
continue
return table
def format_visualization_section(
distribution_plots: dict, wordcloud_plots: dict
) -> str:
"""Format the visualization section with improved layout."""
content = (
"""The following visualizations show key characteristics of the dataset:\n\n"""
)
# Add distribution plots
if distribution_plots:
content += "### Distribution Plots\n\n"
content += '
\n'
for col, img_str in distribution_plots.items():
content += f"""
Distribution of {col}
\n"""
content += "
\n\n"
# Add word clouds
if wordcloud_plots:
content += "### Word Clouds\n\n"
content += '
\n'
for col, img_str in wordcloud_plots.items():
content += f"""
Word Cloud for {col}
\n"""
content += "
\n"
return content
def generate_limitations_section(df: pd.DataFrame, analysis: dict) -> str:
"""Generate a section about dataset limitations and potential biases."""
limitations = [
"This dataset may not be representative of all possible scenarios or use cases.",
f"The dataset contains {len(df):,} records, which may limit its applicability to certain tasks.",
"There may be inherent biases in the data collection or annotation process.",
]
# Add warnings about missing values if present
missing_values = df.isnull().sum()
if missing_values.any():
limitations.append(
f"Some fields contain missing values: {', '.join(missing_values[missing_values > 0].index)}"
)
return f"""The following limitations and potential biases should be considered when using this dataset:
{chr(10).join(f'* {limitation}' for limitation in limitations)}
Please consider these limitations when using the dataset and validate results accordingly."""
def generate_usage_section(dataset_info: dict, analysis: dict) -> str:
"""Generate comprehensive usage guidelines."""
return f"""This dataset is released under the Apache 2.0 License. When using this dataset:
* 📚 Cite the dataset using the BibTeX entry provided below
* 🤝 Consider contributing improvements or reporting issues
* 💡 Share derivative works with the community when possible
* 🔍 Validate the dataset's suitability for your specific use case
* ⚠️ Be aware of the limitations and biases discussed above
* 📊 Consider the dataset size and computational requirements for your application
For questions or additional information, please visit the dataset repository on Hugging Face.
"""
def get_task_categories(df: pd.DataFrame, analysis: dict) -> list:
"""Infer potential task categories based on the data and analysis."""
categories = ["other"] # Default category
# Add more sophisticated task inference logic based on column names and content
text_columns = df.select_dtypes(include=["object"]).columns
numeric_columns = df.select_dtypes(include=["int64", "float64"]).columns
if len(text_columns) > 0:
categories.append("text-classification")
if len(numeric_columns) > 0:
categories.append("regression")
return list(set(categories)) # Remove duplicates
def clean_dataset_name(name: str) -> str:
"""Clean dataset name for citation."""
return name.replace("/", "_").replace("-", "_").lower()
def generate_schema_table(schema: dict) -> str:
"""Generate a markdown table for the schema, handling nested structures."""
# Table header
table = "| Field | Type | Description |\n| --- | --- | --- |\n"
# Generate rows recursively
rows = []
for field, info in schema.items():
rows.extend(format_schema_item(field, info))
# Join all rows
table += "\n".join(rows)
return table
def format_stats_section(stats: dict) -> str:
"""Format the statistics section of the dataset card."""
content = """### Basic Statistics
"""
# Add basic stats
for key, value in stats["basic_stats"].items():
# Convert key from snake_case to Title Case
formatted_key = key.replace("_", " ").title()
content += f"* {formatted_key}: {value}\n"
# Add token statistics if available
if stats.get("token_stats") and stats["token_stats"]["total"] > 0:
content += "\n### Token Statistics\n"
content += f"* Total Tokens: {stats['token_stats']['total']:,}\n"
if stats["token_stats"].get("by_column"):
content += "\n**Tokens by Column:**\n"
for col, count in stats["token_stats"]["by_column"].items():
content += f"* {col}: {count:,}\n"
return content
def format_schema_item(field_name: str, field_info: dict, prefix: str = "") -> list:
"""Recursively format schema items for nested structures."""
rows = []
# Handle nested objects
if isinstance(field_info, dict):
if "type" in field_info and "description" in field_info:
# This is a leaf node with type and description
rows.append(
f"| {prefix}{field_name} | {field_info['type']} | {field_info['description']} |"
)
else:
# This is a nested object, recurse through its properties
for subfield, subinfo in field_info.items():
if prefix:
new_prefix = f"{prefix}{field_name}."
else:
new_prefix = f"{field_name}."
rows.extend(format_schema_item(subfield, subinfo, new_prefix))
return rows
def flatten_list_column(data, column):
"""Flatten a column containing lists into individual values with counts."""
# Flatten the lists into individual items
flattened = [
item
for sublist in data[column]
if isinstance(sublist, list)
for item in sublist
]
# Count occurrences
value_counts = pd.Series(Counter(flattened))
return value_counts