#!/usr/bin/env python3 """ NocoDB Migration Script Exports data from a source NocoDB instance and imports it into a target instance. Handles ID remapping so FK references stay correct regardless of auto-increment offsets. Usage: python3 scripts/nocodb-migrate.py Configure source/target via environment variables or edit the config below. """ import json import urllib.request import urllib.error import sys import time # ============================================ # Configuration # ============================================ SOURCE = { "url": "http://localhost:8090", "token": "Crn_mZnlStJ8BjB6a1fvx7_JhiEVFPPm_gI1AwVh", "base_id": "pqbl1a3yie3inqj", } TARGET = { "url": "https://nocodb.cloudron.hihala.com", "token": "j6DBMb9vkebA6i_tY1TtctwAToAsi_xQ3kOn9q5C", "workspace_id": "w0b7k8g8", } # Tables to migrate, in order (parents before children) TABLES = [ { "name": "Districts", "columns": [ {"column_name": "Name", "title": "Name", "uidt": "SingleLineText", "pv": True}, {"column_name": "Description", "title": "Description", "uidt": "LongText"}, ], "fields": ["Name", "Description"], "fk_mappings": {}, # No FK dependencies }, { "name": "Museums", "columns": [ {"column_name": "Code", "title": "Code", "uidt": "SingleLineText", "pv": True}, {"column_name": "Name", "title": "Name", "uidt": "SingleLineText"}, {"column_name": "Status", "title": "Status", "uidt": "SingleLineText"}, {"column_name": "DistrictId", "title": "DistrictId", "uidt": "Number"}, ], "fields": ["Code", "Name", "Status"], "fk_mappings": { # field_name: (source_fk_column_candidates, parent_table_name) "DistrictId": (["DistrictId", "nc_epk____Districts_id"], "Districts"), }, }, { "name": "DailyStats", "columns": [ {"column_name": "Date", "title": "Date", "uidt": "Date"}, {"column_name": "Visits", "title": "Visits", "uidt": "Number"}, {"column_name": "Tickets", "title": "Tickets", "uidt": "Number"}, {"column_name": "GrossRevenue", "title": "GrossRevenue", "uidt": "Number"}, {"column_name": "NetRevenue", "title": "NetRevenue", "uidt": "Decimal"}, {"column_name": "MuseumId", "title": "MuseumId", "uidt": "Number"}, ], "fields": ["Date", "Visits", "Tickets", "GrossRevenue", "NetRevenue"], "fk_mappings": { "MuseumId": (["MuseumId", "nc_epk____Museums_id"], "Museums"), }, }, { "name": "PilgrimStats", "columns": [ {"column_name": "Year", "title": "Year", "uidt": "Number"}, {"column_name": "Quarter", "title": "Quarter", "uidt": "SingleLineText"}, {"column_name": "TotalPilgrims", "title": "TotalPilgrims", "uidt": "Number"}, ], "fields": ["Year", "Quarter", "TotalPilgrims"], "fk_mappings": {}, }, ] # ============================================ # API Helpers # ============================================ def api_request(base_url, token, path, method="GET", data=None): url = f"{base_url}{path}" headers = {"xc-token": token, "Content-Type": "application/json"} body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: error_body = e.read().decode() print(f" ERROR {e.code}: {error_body}") raise def fetch_all_records(base_url, token, table_id, limit=1000): """Fetch all records from a table with pagination.""" all_records = [] offset = 0 while True: data = api_request(base_url, token, f"/api/v2/tables/{table_id}/records?limit={limit}&offset={offset}") records = data.get("list", []) all_records.extend(records) if len(records) < limit: break offset += limit return all_records def insert_records(base_url, token, table_id, records, batch_size=100): """Insert records in batches, return list of created IDs in order.""" all_ids = [] for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] result = api_request(base_url, token, f"/api/v2/tables/{table_id}/records", method="POST", data=batch) if isinstance(result, list): all_ids.extend([r["Id"] for r in result]) elif isinstance(result, dict) and "Id" in result: all_ids.append(result["Id"]) # Brief pause between batches to avoid rate limiting if i + batch_size < len(records): time.sleep(0.1) return all_ids # ============================================ # Discovery # ============================================ def discover_tables(base_url, token, base_id): """Get table name → table_id mapping.""" data = api_request(base_url, token, f"/api/v2/meta/bases/{base_id}/tables") return {t["title"]: t["id"] for t in data["list"]} # ============================================ # Migration # ============================================ def run_migration(): print("=" * 60) print("NocoDB Migration: Source → Target") print("=" * 60) # Step 1: Discover source tables print("\n[1/5] Discovering source tables...") source_tables = discover_tables(SOURCE["url"], SOURCE["token"], SOURCE["base_id"]) for name, tid in source_tables.items(): print(f" {name}: {tid}") # Step 2: Create target base print("\n[2/5] Creating target base...") base = api_request( TARGET["url"], TARGET["token"], f"/api/v2/meta/workspaces/{TARGET['workspace_id']}/bases/", method="POST", data={"title": "HiHala Dashboard"} ) target_base_id = base["id"] print(f" Created base: {target_base_id}") # Step 3: Create target tables print("\n[3/5] Creating target tables...") target_table_ids = {} for table_cfg in TABLES: name = table_cfg["name"] result = api_request( TARGET["url"], TARGET["token"], f"/api/v2/meta/bases/{target_base_id}/tables/", method="POST", data={ "table_name": name, "title": name, "columns": table_cfg["columns"], } ) target_table_ids[name] = result["id"] print(f" {name}: {result['id']}") # Step 4: Export source data and import with ID remapping print("\n[4/5] Migrating data...") # id_maps[table_name] = {old_id: new_id} id_maps = {} for table_cfg in TABLES: name = table_cfg["name"] print(f"\n --- {name} ---") if name not in source_tables: print(f" SKIP: not found in source") continue # Export from source source_records = fetch_all_records(SOURCE["url"], SOURCE["token"], source_tables[name]) print(f" Exported {len(source_records)} records from source") if not source_records: id_maps[name] = {} continue # Build clean records with FK remapping clean_records = [] for r in source_records: row = {} # Copy plain fields for field in table_cfg["fields"]: if field in r: row[field] = r[field] # Remap FK fields for fk_field, (source_candidates, parent_table) in table_cfg["fk_mappings"].items(): # Find the FK value from source (try multiple column name candidates) old_fk = None for candidate in source_candidates: if candidate in r and r[candidate] is not None: old_fk = r[candidate] break if old_fk is not None and parent_table in id_maps: new_fk = id_maps[parent_table].get(old_fk) if new_fk is not None: row[fk_field] = new_fk else: print(f" WARNING: No mapping for {parent_table}.Id={old_fk}") row[fk_field] = old_fk elif old_fk is not None: row[fk_field] = old_fk clean_records.append(row) # Insert into target new_ids = insert_records(TARGET["url"], TARGET["token"], target_table_ids[name], clean_records) print(f" Inserted {len(new_ids)} records into target") # Build ID mapping (old_id → new_id) based on insertion order old_ids = [r["Id"] for r in source_records] id_maps[name] = {} for old_id, new_id in zip(old_ids, new_ids): id_maps[name][old_id] = new_id if id_maps[name]: sample = list(id_maps[name].items())[:3] print(f" ID mapping sample: {sample}") # Step 5: Summary print("\n" + "=" * 60) print("[5/5] Migration complete!") print("=" * 60) print(f"\n Target base ID: {target_base_id}") print(f"\n Target tables:") for name, tid in target_table_ids.items(): print(f" {name}: {tid}") print(f"\n ID mappings:") for name, mapping in id_maps.items(): print(f" {name}: {len(mapping)} records ({list(mapping.items())[:2]}...)") print(f"\n Add this to your Gitea secrets:") print(f" VITE_NOCODB_BASE_ID = {target_base_id}") print(f"\n The VITE_NOCODB_URL and VITE_NOCODB_TOKEN secrets should point to Cloudron.") return target_base_id, target_table_ids if __name__ == "__main__": try: run_migration() except Exception as e: print(f"\nFATAL: {e}") sys.exit(1)