Spaces:
Running
Running
File size: 3,357 Bytes
6842c08 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
"""Update chat table
Revision ID: 242a2047eae0
Revises: 6a39f3d8e55c
Create Date: 2024-10-09 21:02:35.241684
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, select, update
import json
revision = "242a2047eae0"
down_revision = "6a39f3d8e55c"
branch_labels = None
depends_on = None
def upgrade():
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns("chat")
column_dict = {col["name"]: col for col in columns}
chat_column = column_dict.get("chat")
old_chat_exists = "old_chat" in column_dict
if chat_column:
if isinstance(chat_column["type"], sa.Text):
print("Converting 'chat' column to JSON")
if old_chat_exists:
print("Dropping old 'old_chat' column")
op.drop_column("chat", "old_chat")
# Step 1: Rename current 'chat' column to 'old_chat'
print("Renaming 'chat' column to 'old_chat'")
op.alter_column(
"chat", "chat", new_column_name="old_chat", existing_type=sa.Text()
)
# Step 2: Add new 'chat' column of type JSON
print("Adding new 'chat' column of type JSON")
op.add_column("chat", sa.Column("chat", sa.JSON(), nullable=True))
else:
# If the column is already JSON, no need to do anything
pass
# Step 3: Migrate data from 'old_chat' to 'chat'
chat_table = table(
"chat",
sa.Column("id", sa.String(), primary_key=True),
sa.Column("old_chat", sa.Text()),
sa.Column("chat", sa.JSON()),
)
# - Selecting all data from the table
connection = op.get_bind()
results = connection.execute(select(chat_table.c.id, chat_table.c.old_chat))
for row in results:
try:
# Convert text JSON to actual JSON object, assuming the text is in JSON format
json_data = json.loads(row.old_chat)
except json.JSONDecodeError:
json_data = None # Handle cases where the text cannot be converted to JSON
connection.execute(
sa.update(chat_table)
.where(chat_table.c.id == row.id)
.values(chat=json_data)
)
# Step 4: Drop 'old_chat' column
print("Dropping 'old_chat' column")
op.drop_column("chat", "old_chat")
def downgrade():
# Step 1: Add 'old_chat' column back as Text
op.add_column("chat", sa.Column("old_chat", sa.Text(), nullable=True))
# Step 2: Convert 'chat' JSON data back to text and store in 'old_chat'
chat_table = table(
"chat",
sa.Column("id", sa.String(), primary_key=True),
sa.Column("chat", sa.JSON()),
sa.Column("old_chat", sa.Text()),
)
connection = op.get_bind()
results = connection.execute(select(chat_table.c.id, chat_table.c.chat))
for row in results:
text_data = json.dumps(row.chat) if row.chat is not None else None
connection.execute(
sa.update(chat_table)
.where(chat_table.c.id == row.id)
.values(old_chat=text_data)
)
# Step 3: Remove the new 'chat' JSON column
op.drop_column("chat", "chat")
# Step 4: Rename 'old_chat' back to 'chat'
op.alter_column("chat", "old_chat", new_column_name="chat", existing_type=sa.Text())
|