Spaces:
Sleeping
Sleeping
import pandas as pd | |
from sklearn.model_selection import train_test_split | |
import os | |
import logging | |
from encoder_utils import DataEncoder | |
import torch | |
import numpy as np | |
from tqdm import tqdm | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def generate_negative_samples(df: pd.DataFrame, num_negatives: int = 4) -> pd.DataFrame: | |
""" | |
Generate negative samples for each user by randomly sampling items they haven't interacted with. | |
Args: | |
df: DataFrame containing user-item interactions | |
num_negatives: Number of negative samples per positive interaction | |
Returns: | |
DataFrame with both positive and negative samples | |
""" | |
# Create a set of all items | |
all_items = set(df['music_id'].unique()) | |
negative_samples = [] | |
for user_id in tqdm(df['user_id'].unique(), desc="Generating negative samples"): | |
# Get items the user has interacted with | |
user_items = set(df[df['user_id'] == user_id]['music_id']) | |
# Get items the user hasn't interacted with | |
negative_items = list(all_items - user_items) | |
if len(negative_items) > 0: | |
# Sample negative items | |
num_samples = min(len(negative_items), num_negatives) | |
sampled_negatives = np.random.choice(negative_items, size=num_samples, replace=False) | |
# Create negative samples | |
user_data = df[df['user_id'] == user_id].iloc[0].to_dict() | |
for item_id in sampled_negatives: | |
negative = user_data.copy() | |
negative['music_id'] = item_id | |
negative['playcount'] = 0 # Mark as negative sample | |
negative_samples.append(negative) | |
# Convert negative samples to DataFrame | |
negative_df = pd.DataFrame(negative_samples) | |
# Combine positive and negative samples | |
combined_df = pd.concat([df, negative_df], ignore_index=True) | |
return combined_df | |
def split_and_save_data(data_path: str, test_size: float = 0.2, random_state: int = 42): | |
""" | |
Split data into train and test sets while maintaining consistent encoding. | |
""" | |
# Read data | |
df = pd.read_csv(data_path) | |
logger.info(f"Total records: {len(df)}") | |
# Generate negative samples | |
df = generate_negative_samples(df) | |
logger.info(f"Total records after negative sampling: {len(df)}") | |
# Initialize and fit encoders on full dataset | |
encoder = DataEncoder() | |
encoder.fit(df) | |
# Split by user to avoid data leakage | |
users = df['user_id'].unique() | |
train_users, test_users = train_test_split( | |
users, | |
test_size=test_size, | |
random_state=random_state | |
) | |
train_data = df[df['user_id'].isin(train_users)] | |
test_data = df[df['user_id'].isin(test_users)] | |
# Save splits | |
data_dir = os.path.dirname(data_path) | |
os.makedirs(data_dir, exist_ok=True) | |
train_path = os.path.join(data_dir, 'train_data.csv') | |
test_path = os.path.join(data_dir, 'test_data.csv') | |
encoder_path = os.path.join(data_dir, 'data_encoders.pt') | |
# Save data splits | |
train_data.to_csv(train_path, index=False) | |
test_data.to_csv(test_path, index=False) | |
# Save encoders | |
torch.save(encoder.get_encoders(), encoder_path) | |
logger.info(f"Training set size: {len(train_data)}") | |
logger.info(f"Test set size: {len(test_data)}") | |
logger.info(f"\nFiles saved to:") | |
logger.info(f"Training data: {train_path}") | |
logger.info(f"Test data: {test_path}") | |
logger.info(f"Encoders: {encoder_path}") | |
# Log some statistics about the encodings | |
dims = encoder.get_dims() | |
logger.info("\nEncoding dimensions:") | |
for key, value in dims.items(): | |
logger.info(f"{key}: {value}") | |
return train_path, test_path, encoder_path | |
if __name__ == "__main__": | |
data_path = 'data/o2_data.csv' | |
train_path, test_path, encoder_path = split_and_save_data(data_path) | |