Spaces:
Running
Running
"""Update chat table | |
Revision ID: 242a2047eae0 | |
Revises: 6a39f3d8e55c | |
Create Date: 2024-10-09 21:02:35.241684 | |
""" | |
from alembic import op | |
import sqlalchemy as sa | |
from sqlalchemy.sql import table, select, update | |
import json | |
revision = "242a2047eae0" | |
down_revision = "6a39f3d8e55c" | |
branch_labels = None | |
depends_on = None | |
def upgrade(): | |
conn = op.get_bind() | |
inspector = sa.inspect(conn) | |
columns = inspector.get_columns("chat") | |
column_dict = {col["name"]: col for col in columns} | |
chat_column = column_dict.get("chat") | |
old_chat_exists = "old_chat" in column_dict | |
if chat_column: | |
if isinstance(chat_column["type"], sa.Text): | |
print("Converting 'chat' column to JSON") | |
if old_chat_exists: | |
print("Dropping old 'old_chat' column") | |
op.drop_column("chat", "old_chat") | |
# Step 1: Rename current 'chat' column to 'old_chat' | |
print("Renaming 'chat' column to 'old_chat'") | |
op.alter_column( | |
"chat", "chat", new_column_name="old_chat", existing_type=sa.Text() | |
) | |
# Step 2: Add new 'chat' column of type JSON | |
print("Adding new 'chat' column of type JSON") | |
op.add_column("chat", sa.Column("chat", sa.JSON(), nullable=True)) | |
else: | |
# If the column is already JSON, no need to do anything | |
pass | |
# Step 3: Migrate data from 'old_chat' to 'chat' | |
chat_table = table( | |
"chat", | |
sa.Column("id", sa.String(), primary_key=True), | |
sa.Column("old_chat", sa.Text()), | |
sa.Column("chat", sa.JSON()), | |
) | |
# - Selecting all data from the table | |
connection = op.get_bind() | |
results = connection.execute(select(chat_table.c.id, chat_table.c.old_chat)) | |
for row in results: | |
try: | |
# Convert text JSON to actual JSON object, assuming the text is in JSON format | |
json_data = json.loads(row.old_chat) | |
except json.JSONDecodeError: | |
json_data = None # Handle cases where the text cannot be converted to JSON | |
connection.execute( | |
sa.update(chat_table) | |
.where(chat_table.c.id == row.id) | |
.values(chat=json_data) | |
) | |
# Step 4: Drop 'old_chat' column | |
print("Dropping 'old_chat' column") | |
op.drop_column("chat", "old_chat") | |
def downgrade(): | |
# Step 1: Add 'old_chat' column back as Text | |
op.add_column("chat", sa.Column("old_chat", sa.Text(), nullable=True)) | |
# Step 2: Convert 'chat' JSON data back to text and store in 'old_chat' | |
chat_table = table( | |
"chat", | |
sa.Column("id", sa.String(), primary_key=True), | |
sa.Column("chat", sa.JSON()), | |
sa.Column("old_chat", sa.Text()), | |
) | |
connection = op.get_bind() | |
results = connection.execute(select(chat_table.c.id, chat_table.c.chat)) | |
for row in results: | |
text_data = json.dumps(row.chat) if row.chat is not None else None | |
connection.execute( | |
sa.update(chat_table) | |
.where(chat_table.c.id == row.id) | |
.values(old_chat=text_data) | |
) | |
# Step 3: Remove the new 'chat' JSON column | |
op.drop_column("chat", "chat") | |
# Step 4: Rename 'old_chat' back to 'chat' | |
op.alter_column("chat", "old_chat", new_column_name="chat", existing_type=sa.Text()) | |