Discover NocoDB table IDs dynamically instead of hardcoding them
All checks were successful
Deploy HiHala Dashboard / deploy (push) Successful in 7s
All checks were successful
Deploy HiHala Dashboard / deploy (push) Successful in 7s
Table IDs are now fetched at runtime via the NocoDB meta API using VITE_NOCODB_BASE_ID, so the same code works against any NocoDB instance (local or Cloudron). Also adds a migration script for moving data between instances with correct FK remapping. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
278
scripts/nocodb-migrate.py
Normal file
278
scripts/nocodb-migrate.py
Normal file
@@ -0,0 +1,278 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
NocoDB Migration Script
|
||||
Exports data from a source NocoDB instance and imports it into a target instance.
|
||||
Handles ID remapping so FK references stay correct regardless of auto-increment offsets.
|
||||
|
||||
Usage:
|
||||
python3 scripts/nocodb-migrate.py
|
||||
|
||||
Configure source/target via environment variables or edit the config below.
|
||||
"""
|
||||
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import sys
|
||||
import time
|
||||
|
||||
# ============================================
|
||||
# Configuration
|
||||
# ============================================
|
||||
|
||||
SOURCE = {
|
||||
"url": "http://localhost:8090",
|
||||
"token": "Crn_mZnlStJ8BjB6a1fvx7_JhiEVFPPm_gI1AwVh",
|
||||
"base_id": "pqbl1a3yie3inqj",
|
||||
}
|
||||
|
||||
TARGET = {
|
||||
"url": "https://nocodb.cloudron.hihala.com",
|
||||
"token": "j6DBMb9vkebA6i_tY1TtctwAToAsi_xQ3kOn9q5C",
|
||||
"workspace_id": "w0b7k8g8",
|
||||
}
|
||||
|
||||
# Tables to migrate, in order (parents before children)
|
||||
TABLES = [
|
||||
{
|
||||
"name": "Districts",
|
||||
"columns": [
|
||||
{"column_name": "Name", "title": "Name", "uidt": "SingleLineText", "pv": True},
|
||||
{"column_name": "Description", "title": "Description", "uidt": "LongText"},
|
||||
],
|
||||
"fields": ["Name", "Description"],
|
||||
"fk_mappings": {}, # No FK dependencies
|
||||
},
|
||||
{
|
||||
"name": "Museums",
|
||||
"columns": [
|
||||
{"column_name": "Code", "title": "Code", "uidt": "SingleLineText", "pv": True},
|
||||
{"column_name": "Name", "title": "Name", "uidt": "SingleLineText"},
|
||||
{"column_name": "Status", "title": "Status", "uidt": "SingleLineText"},
|
||||
{"column_name": "DistrictId", "title": "DistrictId", "uidt": "Number"},
|
||||
],
|
||||
"fields": ["Code", "Name", "Status"],
|
||||
"fk_mappings": {
|
||||
# field_name: (source_fk_column_candidates, parent_table_name)
|
||||
"DistrictId": (["DistrictId", "nc_epk____Districts_id"], "Districts"),
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "DailyStats",
|
||||
"columns": [
|
||||
{"column_name": "Date", "title": "Date", "uidt": "Date"},
|
||||
{"column_name": "Visits", "title": "Visits", "uidt": "Number"},
|
||||
{"column_name": "Tickets", "title": "Tickets", "uidt": "Number"},
|
||||
{"column_name": "GrossRevenue", "title": "GrossRevenue", "uidt": "Number"},
|
||||
{"column_name": "NetRevenue", "title": "NetRevenue", "uidt": "Decimal"},
|
||||
{"column_name": "MuseumId", "title": "MuseumId", "uidt": "Number"},
|
||||
],
|
||||
"fields": ["Date", "Visits", "Tickets", "GrossRevenue", "NetRevenue"],
|
||||
"fk_mappings": {
|
||||
"MuseumId": (["MuseumId", "nc_epk____Museums_id"], "Museums"),
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "PilgrimStats",
|
||||
"columns": [
|
||||
{"column_name": "Year", "title": "Year", "uidt": "Number"},
|
||||
{"column_name": "Quarter", "title": "Quarter", "uidt": "SingleLineText"},
|
||||
{"column_name": "TotalPilgrims", "title": "TotalPilgrims", "uidt": "Number"},
|
||||
],
|
||||
"fields": ["Year", "Quarter", "TotalPilgrims"],
|
||||
"fk_mappings": {},
|
||||
},
|
||||
]
|
||||
|
||||
# ============================================
|
||||
# API Helpers
|
||||
# ============================================
|
||||
|
||||
def api_request(base_url, token, path, method="GET", data=None):
|
||||
url = f"{base_url}{path}"
|
||||
headers = {"xc-token": token, "Content-Type": "application/json"}
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode()
|
||||
print(f" ERROR {e.code}: {error_body}")
|
||||
raise
|
||||
|
||||
|
||||
def fetch_all_records(base_url, token, table_id, limit=1000):
|
||||
"""Fetch all records from a table with pagination."""
|
||||
all_records = []
|
||||
offset = 0
|
||||
while True:
|
||||
data = api_request(base_url, token, f"/api/v2/tables/{table_id}/records?limit={limit}&offset={offset}")
|
||||
records = data.get("list", [])
|
||||
all_records.extend(records)
|
||||
if len(records) < limit:
|
||||
break
|
||||
offset += limit
|
||||
return all_records
|
||||
|
||||
|
||||
def insert_records(base_url, token, table_id, records, batch_size=100):
|
||||
"""Insert records in batches, return list of created IDs in order."""
|
||||
all_ids = []
|
||||
for i in range(0, len(records), batch_size):
|
||||
batch = records[i:i + batch_size]
|
||||
result = api_request(base_url, token, f"/api/v2/tables/{table_id}/records", method="POST", data=batch)
|
||||
if isinstance(result, list):
|
||||
all_ids.extend([r["Id"] for r in result])
|
||||
elif isinstance(result, dict) and "Id" in result:
|
||||
all_ids.append(result["Id"])
|
||||
# Brief pause between batches to avoid rate limiting
|
||||
if i + batch_size < len(records):
|
||||
time.sleep(0.1)
|
||||
return all_ids
|
||||
|
||||
|
||||
# ============================================
|
||||
# Discovery
|
||||
# ============================================
|
||||
|
||||
def discover_tables(base_url, token, base_id):
|
||||
"""Get table name → table_id mapping."""
|
||||
data = api_request(base_url, token, f"/api/v2/meta/bases/{base_id}/tables")
|
||||
return {t["title"]: t["id"] for t in data["list"]}
|
||||
|
||||
|
||||
# ============================================
|
||||
# Migration
|
||||
# ============================================
|
||||
|
||||
def run_migration():
|
||||
print("=" * 60)
|
||||
print("NocoDB Migration: Source → Target")
|
||||
print("=" * 60)
|
||||
|
||||
# Step 1: Discover source tables
|
||||
print("\n[1/5] Discovering source tables...")
|
||||
source_tables = discover_tables(SOURCE["url"], SOURCE["token"], SOURCE["base_id"])
|
||||
for name, tid in source_tables.items():
|
||||
print(f" {name}: {tid}")
|
||||
|
||||
# Step 2: Create target base
|
||||
print("\n[2/5] Creating target base...")
|
||||
base = api_request(
|
||||
TARGET["url"], TARGET["token"],
|
||||
f"/api/v2/meta/workspaces/{TARGET['workspace_id']}/bases/",
|
||||
method="POST",
|
||||
data={"title": "HiHala Dashboard"}
|
||||
)
|
||||
target_base_id = base["id"]
|
||||
print(f" Created base: {target_base_id}")
|
||||
|
||||
# Step 3: Create target tables
|
||||
print("\n[3/5] Creating target tables...")
|
||||
target_table_ids = {}
|
||||
for table_cfg in TABLES:
|
||||
name = table_cfg["name"]
|
||||
result = api_request(
|
||||
TARGET["url"], TARGET["token"],
|
||||
f"/api/v2/meta/bases/{target_base_id}/tables/",
|
||||
method="POST",
|
||||
data={
|
||||
"table_name": name,
|
||||
"title": name,
|
||||
"columns": table_cfg["columns"],
|
||||
}
|
||||
)
|
||||
target_table_ids[name] = result["id"]
|
||||
print(f" {name}: {result['id']}")
|
||||
|
||||
# Step 4: Export source data and import with ID remapping
|
||||
print("\n[4/5] Migrating data...")
|
||||
# id_maps[table_name] = {old_id: new_id}
|
||||
id_maps = {}
|
||||
|
||||
for table_cfg in TABLES:
|
||||
name = table_cfg["name"]
|
||||
print(f"\n --- {name} ---")
|
||||
|
||||
if name not in source_tables:
|
||||
print(f" SKIP: not found in source")
|
||||
continue
|
||||
|
||||
# Export from source
|
||||
source_records = fetch_all_records(SOURCE["url"], SOURCE["token"], source_tables[name])
|
||||
print(f" Exported {len(source_records)} records from source")
|
||||
|
||||
if not source_records:
|
||||
id_maps[name] = {}
|
||||
continue
|
||||
|
||||
# Build clean records with FK remapping
|
||||
clean_records = []
|
||||
for r in source_records:
|
||||
row = {}
|
||||
# Copy plain fields
|
||||
for field in table_cfg["fields"]:
|
||||
if field in r:
|
||||
row[field] = r[field]
|
||||
|
||||
# Remap FK fields
|
||||
for fk_field, (source_candidates, parent_table) in table_cfg["fk_mappings"].items():
|
||||
# Find the FK value from source (try multiple column name candidates)
|
||||
old_fk = None
|
||||
for candidate in source_candidates:
|
||||
if candidate in r and r[candidate] is not None:
|
||||
old_fk = r[candidate]
|
||||
break
|
||||
|
||||
if old_fk is not None and parent_table in id_maps:
|
||||
new_fk = id_maps[parent_table].get(old_fk)
|
||||
if new_fk is not None:
|
||||
row[fk_field] = new_fk
|
||||
else:
|
||||
print(f" WARNING: No mapping for {parent_table}.Id={old_fk}")
|
||||
row[fk_field] = old_fk
|
||||
elif old_fk is not None:
|
||||
row[fk_field] = old_fk
|
||||
|
||||
clean_records.append(row)
|
||||
|
||||
# Insert into target
|
||||
new_ids = insert_records(TARGET["url"], TARGET["token"], target_table_ids[name], clean_records)
|
||||
print(f" Inserted {len(new_ids)} records into target")
|
||||
|
||||
# Build ID mapping (old_id → new_id) based on insertion order
|
||||
old_ids = [r["Id"] for r in source_records]
|
||||
id_maps[name] = {}
|
||||
for old_id, new_id in zip(old_ids, new_ids):
|
||||
id_maps[name][old_id] = new_id
|
||||
|
||||
if id_maps[name]:
|
||||
sample = list(id_maps[name].items())[:3]
|
||||
print(f" ID mapping sample: {sample}")
|
||||
|
||||
# Step 5: Summary
|
||||
print("\n" + "=" * 60)
|
||||
print("[5/5] Migration complete!")
|
||||
print("=" * 60)
|
||||
print(f"\n Target base ID: {target_base_id}")
|
||||
print(f"\n Target tables:")
|
||||
for name, tid in target_table_ids.items():
|
||||
print(f" {name}: {tid}")
|
||||
print(f"\n ID mappings:")
|
||||
for name, mapping in id_maps.items():
|
||||
print(f" {name}: {len(mapping)} records ({list(mapping.items())[:2]}...)")
|
||||
|
||||
print(f"\n Add this to your Gitea secrets:")
|
||||
print(f" VITE_NOCODB_BASE_ID = {target_base_id}")
|
||||
print(f"\n The VITE_NOCODB_URL and VITE_NOCODB_TOKEN secrets should point to Cloudron.")
|
||||
|
||||
return target_base_id, target_table_ids
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
run_migration()
|
||||
except Exception as e:
|
||||
print(f"\nFATAL: {e}")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user