docs: add ETL pipeline design spec
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
242
docs/superpowers/specs/2026-03-26-etl-pipeline-design.md
Normal file
242
docs/superpowers/specs/2026-03-26-etl-pipeline-design.md
Normal file
@@ -0,0 +1,242 @@
|
||||
# ETL Pipeline: ERP → NocoDB Daily Sales
|
||||
|
||||
## Goal
|
||||
|
||||
Replace the current client-side ERP fetching (which downloads hundreds of MBs of raw transactions to the browser) with a server-side ETL pipeline that aggregates ERP data into NocoDB. The dashboard reads pre-aggregated data from NocoDB — fast and lightweight.
|
||||
|
||||
## Data Flow
|
||||
|
||||
```
|
||||
Daily (2am cron):
|
||||
ERP API → Server (fetch + aggregate) → NocoDB "DailySales" table
|
||||
|
||||
On page load:
|
||||
NocoDB "DailySales" → Dashboard client (small payload, fast)
|
||||
```
|
||||
|
||||
## NocoDB "DailySales" Table
|
||||
|
||||
One row per date/museum/channel combination. Flat — no lookup tables needed.
|
||||
|
||||
| Column | Type | Example |
|
||||
|--------|------|---------|
|
||||
| Date | string | `2025-03-01` |
|
||||
| MuseumName | string | `Revelation Exhibition` |
|
||||
| Channel | string | `HiHala Website/App` |
|
||||
| Visits | number | `702` |
|
||||
| Tickets | number | `71` |
|
||||
| GrossRevenue | number | `12049.00` |
|
||||
| NetRevenue | number | `10477.40` |
|
||||
|
||||
Museums are derived from product descriptions using a priority-ordered keyword mapping (46 products → 6 museums). Channels are derived from `OperatingAreaName` with display labels (e.g. B2C → "HiHala Website/App").
|
||||
|
||||
## Server Architecture
|
||||
|
||||
### New files
|
||||
|
||||
| File | Responsibility |
|
||||
|------|----------------|
|
||||
| `server/src/config/museumMapping.ts` | Product → museum mapping, channel labels (moved from client) |
|
||||
| `server/src/types.ts` | Server-side ERP types (`ERPSaleRecord`, `ERPProduct`, `ERPPayment`, `AggregatedRecord`) |
|
||||
| `server/src/services/nocodbClient.ts` | NocoDB table discovery (via `process.env`, NOT `import.meta.env`) + paginated read/write |
|
||||
| `server/src/services/etlSync.ts` | Orchestrate: fetch ERP → aggregate → write NocoDB |
|
||||
| `server/src/routes/etl.ts` | `POST /api/etl/sync` endpoint (protected by secret token) |
|
||||
|
||||
### Modified files
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| `server/src/config.ts` | Add NocoDB config (`process.env.NOCODB_*`) |
|
||||
| `server/src/index.ts` | Mount ETL route |
|
||||
| `server/.env` | Add `NOCODB_*` and `ETL_SECRET` vars |
|
||||
| `server/.env.example` | Add `NOCODB_*` and `ETL_SECRET` placeholders |
|
||||
| `src/services/dataService.ts` | Revert to NocoDB fetch with paginated reads for DailySales |
|
||||
|
||||
### Removed files
|
||||
|
||||
| File | Reason |
|
||||
|------|--------|
|
||||
| `server/src/routes/erp.ts` | Client no longer calls ERP directly |
|
||||
| `src/services/erpService.ts` | Client no longer aggregates transactions |
|
||||
| `src/config/museumMapping.ts` | Moved to server |
|
||||
|
||||
## ETL Sync Endpoint
|
||||
|
||||
```
|
||||
POST /api/etl/sync?mode=full|incremental
|
||||
Authorization: Bearer <ETL_SECRET>
|
||||
```
|
||||
|
||||
Protected by a secret token (`ETL_SECRET` env var). Requests without a valid token get 401. The cron passes it: `curl -H "Authorization: Bearer $ETL_SECRET" -X POST ...`.
|
||||
|
||||
- **incremental** (default): fetch current month from ERP, aggregate, upsert into NocoDB. Used by daily cron.
|
||||
- **full**: fetch all months from 2024-01 to now, clear and replace all NocoDB DailySales data. Used for initial setup or recovery.
|
||||
|
||||
### Incremental date range
|
||||
|
||||
The current month is defined as:
|
||||
- `startDate`: `YYYY-MM-01T00:00:00` (first of current month)
|
||||
- `endDate`: `YYYY-{MM+1}-01T00:00:00` (first of next month, exclusive)
|
||||
|
||||
This matches the convention already used in `erpService.ts` month boundary generation.
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"status": "ok",
|
||||
"mode": "incremental",
|
||||
"transactionsFetched": 12744,
|
||||
"recordsWritten": 342,
|
||||
"duration": "8.2s"
|
||||
}
|
||||
```
|
||||
|
||||
## Aggregation Logic
|
||||
|
||||
For each ERP transaction:
|
||||
1. Extract date from `TransactionDate` (split on space, take first part)
|
||||
2. Map `OperatingAreaName` → channel label via `getChannelLabel()`
|
||||
3. For each product in `Products[]`:
|
||||
- Map `ProductDescription` → museum name via `getMuseumFromProduct()` (priority-ordered keyword matching)
|
||||
- Accumulate into composite key `date|museum|channel`:
|
||||
- `visits += PeopleCount`
|
||||
- `tickets += UnitQuantity`
|
||||
- `GrossRevenue += TotalPrice`
|
||||
- `NetRevenue += TotalPrice - TaxAmount`
|
||||
|
||||
Negative quantities (refunds) sum correctly by default.
|
||||
|
||||
## NocoDB Upsert Strategy
|
||||
|
||||
For **incremental** sync:
|
||||
1. Delete all rows in DailySales where `Date` falls within the fetched month range
|
||||
2. Insert the newly aggregated rows
|
||||
|
||||
For **full** sync:
|
||||
1. Delete all rows in DailySales
|
||||
2. Insert all aggregated rows
|
||||
|
||||
This avoids duplicate detection complexity — just replace the month's data.
|
||||
|
||||
### Race condition note
|
||||
|
||||
During the delete/insert window, dashboard reads may see incomplete data. Mitigations:
|
||||
- The sync runs at 2am when traffic is minimal
|
||||
- The client's localStorage cache (7-day TTL) means most page loads never hit NocoDB
|
||||
- The client checks if fetched data is suspiciously small (< 10 rows) and prefers cached data over a likely-incomplete NocoDB read
|
||||
- For full syncs, the window is larger (~2-5 minutes). If this becomes a problem, a shadow-table swap pattern can be added later.
|
||||
|
||||
## Client Changes
|
||||
|
||||
### dataService.ts
|
||||
|
||||
Revert to reading from NocoDB. The `DailySales` table is flat, so no joins needed. **Must use paginated fetch** (NocoDB defaults to 25 rows per page, max 1000). The existing `fetchNocoDBTable()` helper already handles pagination — reintroduce it.
|
||||
|
||||
```typescript
|
||||
async function fetchFromNocoDB(): Promise<MuseumRecord[]> {
|
||||
const tables = await discoverTableIds();
|
||||
const rows = await fetchNocoDBTable<NocoDBDailySale>(tables['DailySales']);
|
||||
return rows.map(row => ({
|
||||
date: row.Date,
|
||||
museum_name: row.MuseumName,
|
||||
channel: row.Channel,
|
||||
visits: row.Visits,
|
||||
tickets: row.Tickets,
|
||||
revenue_gross: row.GrossRevenue,
|
||||
revenue_net: row.NetRevenue,
|
||||
year: row.Date.substring(0, 4),
|
||||
quarter: computeQuarter(row.Date),
|
||||
}));
|
||||
}
|
||||
```
|
||||
|
||||
Add a `NocoDBDailySale` type to `src/types/index.ts`:
|
||||
```typescript
|
||||
export interface NocoDBDailySale {
|
||||
Id: number;
|
||||
Date: string;
|
||||
MuseumName: string;
|
||||
Channel: string;
|
||||
Visits: number;
|
||||
Tickets: number;
|
||||
GrossRevenue: number;
|
||||
NetRevenue: number;
|
||||
}
|
||||
```
|
||||
|
||||
No `Districts`, `Museums`, or `DailyStats` tables needed — just `DailySales` and `PilgrimStats`.
|
||||
|
||||
### Suspicious data check
|
||||
|
||||
In `fetchData()`, if NocoDB returns fewer than 10 rows and a cache exists, prefer the cache:
|
||||
```typescript
|
||||
if (data.length < 10 && cached) {
|
||||
console.warn('NocoDB returned suspiciously few rows, using cache');
|
||||
return { data: cached.data, fromCache: true, cacheTimestamp: cached.timestamp };
|
||||
}
|
||||
```
|
||||
|
||||
## Server Environment
|
||||
|
||||
Add to `server/.env`:
|
||||
```
|
||||
NOCODB_URL=http://localhost:8090
|
||||
NOCODB_TOKEN=<token>
|
||||
NOCODB_BASE_ID=<base_id>
|
||||
ETL_SECRET=<random-secret-for-cron>
|
||||
```
|
||||
|
||||
**Note:** Client `.env.local` retains its existing `VITE_NOCODB_*` vars — the client still reads NocoDB directly for both DailySales and PilgrimStats.
|
||||
|
||||
Update `server/.env.example` with the same keys (placeholder values).
|
||||
|
||||
## Server-Side Types
|
||||
|
||||
ERP types are re-declared in `server/src/types.ts` (not imported from the client `src/types/index.ts`):
|
||||
|
||||
```typescript
|
||||
export interface ERPProduct {
|
||||
ProductDescription: string;
|
||||
SiteDescription: string | null;
|
||||
UnitQuantity: number;
|
||||
PeopleCount: number;
|
||||
TaxAmount: number;
|
||||
TotalPrice: number;
|
||||
}
|
||||
|
||||
export interface ERPSaleRecord {
|
||||
SaleId: number;
|
||||
TransactionDate: string;
|
||||
CustIdentification: string;
|
||||
OperatingAreaName: string;
|
||||
Payments: Array<{ PaymentMethodDescription: string }>;
|
||||
Products: ERPProduct[];
|
||||
}
|
||||
|
||||
export interface AggregatedRecord {
|
||||
Date: string;
|
||||
MuseumName: string;
|
||||
Channel: string;
|
||||
Visits: number;
|
||||
Tickets: number;
|
||||
GrossRevenue: number;
|
||||
NetRevenue: number;
|
||||
}
|
||||
```
|
||||
|
||||
## Cron
|
||||
|
||||
```bash
|
||||
0 2 * * * curl -s -H "Authorization: Bearer $ETL_SECRET" -X POST http://localhost:3002/api/etl/sync
|
||||
```
|
||||
|
||||
Runs daily at 2am. The incremental mode fetches only the current month (~15-25K transactions), aggregates server-side, and writes ~300-500 rows to NocoDB.
|
||||
|
||||
## What's NOT Changing
|
||||
|
||||
- PilgrimStats still fetched from NocoDB by the client (unchanged)
|
||||
- Client `.env.local` retains `VITE_NOCODB_*` vars (still needed for client reads)
|
||||
- All dashboard UI components (Dashboard, Comparison) stay as-is
|
||||
- Channel and museum filters stay as-is
|
||||
- Cache/offline fallback logic stays as-is (enhanced with suspicious-data check)
|
||||
- Dark mode, i18n, accessibility — all unchanged
|
||||
Reference in New Issue
Block a user