""" Database migration script for Supplier Management feature. This script adds: 1. suppliers table with unique name constraint 2. Unique index on persons.name 3. Unique index on items.name 4. supplier_id foreign key on items table 5. is_settled field on work_records table Compatible with both SQLite and PostgreSQL. Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 10.6, 10.7 """ import os import sys from datetime import datetime, timezone # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy import create_engine, text, inspect from sqlalchemy.exc import OperationalError, IntegrityError def get_database_url(): """Get database URL from environment or use default SQLite.""" return os.environ.get('DATABASE_URL') or \ 'sqlite:///' + os.path.join(os.path.dirname(os.path.dirname(__file__)), 'dev.db') def is_sqlite(engine): """Check if the database is SQLite.""" return 'sqlite' in engine.dialect.name.lower() def is_postgresql(engine): """Check if the database is PostgreSQL.""" return 'postgresql' in engine.dialect.name.lower() def index_exists(engine, table_name, index_name): """Check if an index exists on a table.""" inspector = inspect(engine) indexes = inspector.get_indexes(table_name) return any(idx['name'] == index_name for idx in indexes) def column_exists(engine, table_name, column_name): """Check if a column exists in a table.""" inspector = inspect(engine) columns = inspector.get_columns(table_name) return any(col['name'] == column_name for col in columns) def table_exists(engine, table_name): """Check if a table exists in the database.""" inspector = inspect(engine) return table_name in inspector.get_table_names() def upgrade(engine): """ Upgrade the database schema. Operations: 1. Create suppliers table with unique name index 2. Add unique index on persons.name 3. Add unique index on items.name 4. Add supplier_id column to items table 5. Add is_settled column to work_records table """ with engine.connect() as conn: # 1. Create suppliers table if not table_exists(engine, 'suppliers'): print("Creating suppliers table...") conn.execute(text(""" CREATE TABLE suppliers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name VARCHAR(100) NOT NULL, created_at DATETIME, updated_at DATETIME ) """) if is_sqlite(engine) else text(""" CREATE TABLE suppliers ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, created_at TIMESTAMP, updated_at TIMESTAMP ) """)) conn.commit() print(" - suppliers table created") else: print(" - suppliers table already exists, skipping") # 2. Add unique index on suppliers.name if not index_exists(engine, 'suppliers', 'ix_suppliers_name'): print("Adding unique index on suppliers.name...") conn.execute(text( "CREATE UNIQUE INDEX ix_suppliers_name ON suppliers (name)" )) conn.commit() print(" - ix_suppliers_name index created") else: print(" - ix_suppliers_name index already exists, skipping") # 3. Add unique index on persons.name if not index_exists(engine, 'persons', 'ix_persons_name_unique'): print("Adding unique index on persons.name...") try: conn.execute(text( "CREATE UNIQUE INDEX ix_persons_name_unique ON persons (name)" )) conn.commit() print(" - ix_persons_name_unique index created") except (OperationalError, IntegrityError) as e: print(f" - Warning: Could not create unique index on persons.name: {e}") print(" This may be due to duplicate names in existing data.") conn.rollback() else: print(" - ix_persons_name_unique index already exists, skipping") # 4. Add unique index on items.name if not index_exists(engine, 'items', 'ix_items_name_unique'): print("Adding unique index on items.name...") try: conn.execute(text( "CREATE UNIQUE INDEX ix_items_name_unique ON items (name)" )) conn.commit() print(" - ix_items_name_unique index created") except (OperationalError, IntegrityError) as e: print(f" - Warning: Could not create unique index on items.name: {e}") print(" This may be due to duplicate names in existing data.") conn.rollback() else: print(" - ix_items_name_unique index already exists, skipping") # 5. Add supplier_id column to items table if not column_exists(engine, 'items', 'supplier_id'): print("Adding supplier_id column to items table...") conn.execute(text( "ALTER TABLE items ADD COLUMN supplier_id INTEGER REFERENCES suppliers(id)" )) conn.commit() print(" - supplier_id column added to items") else: print(" - supplier_id column already exists in items, skipping") # 6. Add is_settled column to work_records table if not column_exists(engine, 'work_records', 'is_settled'): print("Adding is_settled column to work_records table...") if is_sqlite(engine): # SQLite doesn't support DEFAULT in ALTER TABLE well, # so we add the column and then update existing rows conn.execute(text( "ALTER TABLE work_records ADD COLUMN is_settled BOOLEAN DEFAULT 0" )) conn.execute(text( "UPDATE work_records SET is_settled = 0 WHERE is_settled IS NULL" )) else: # PostgreSQL supports DEFAULT in ALTER TABLE conn.execute(text( "ALTER TABLE work_records ADD COLUMN is_settled BOOLEAN NOT NULL DEFAULT FALSE" )) conn.commit() print(" - is_settled column added to work_records") else: print(" - is_settled column already exists in work_records, skipping") print("\nMigration upgrade completed successfully!") def downgrade(engine): """ Downgrade the database schema (reverse the migration). Operations: 1. Remove is_settled column from work_records 2. Remove supplier_id column from items 3. Remove unique index from items.name 4. Remove unique index from persons.name 5. Remove unique index from suppliers.name 6. Drop suppliers table """ with engine.connect() as conn: # For SQLite, we need to recreate tables to remove columns # For PostgreSQL, we can use ALTER TABLE DROP COLUMN if is_sqlite(engine): print("SQLite detected - column removal requires table recreation") print("Downgrade for SQLite is not fully supported.") print("Please manually recreate the database if needed.") # We can still drop indexes and the suppliers table print("Dropping indexes...") try: conn.execute(text("DROP INDEX IF EXISTS ix_items_name_unique")) conn.execute(text("DROP INDEX IF EXISTS ix_persons_name_unique")) conn.execute(text("DROP INDEX IF EXISTS ix_suppliers_name")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() print("Dropping suppliers table...") try: conn.execute(text("DROP TABLE IF EXISTS suppliers")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() else: # PostgreSQL downgrade print("Removing is_settled column from work_records...") try: conn.execute(text("ALTER TABLE work_records DROP COLUMN IF EXISTS is_settled")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() print("Removing supplier_id column from items...") try: conn.execute(text("ALTER TABLE items DROP COLUMN IF EXISTS supplier_id")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() print("Dropping indexes...") try: conn.execute(text("DROP INDEX IF EXISTS ix_items_name_unique")) conn.execute(text("DROP INDEX IF EXISTS ix_persons_name_unique")) conn.execute(text("DROP INDEX IF EXISTS ix_suppliers_name")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() print("Dropping suppliers table...") try: conn.execute(text("DROP TABLE IF EXISTS suppliers")) conn.commit() except Exception as e: print(f" - Warning: {e}") conn.rollback() print("\nMigration downgrade completed!") def main(): """Main entry point for the migration script.""" import argparse parser = argparse.ArgumentParser( description='Database migration for Supplier Management feature' ) parser.add_argument( 'action', choices=['upgrade', 'downgrade'], help='Migration action to perform' ) parser.add_argument( '--database-url', help='Database URL (overrides DATABASE_URL environment variable)' ) args = parser.parse_args() # Get database URL database_url = args.database_url or get_database_url() print(f"Database: {database_url}") # Create engine engine = create_engine(database_url) # Perform migration if args.action == 'upgrade': print("\n=== Running upgrade migration ===\n") upgrade(engine) else: print("\n=== Running downgrade migration ===\n") downgrade(engine) if __name__ == '__main__': main()