feat: add server-side ETL pipeline, revert client to NocoDB reads
ETL Pipeline (server): - POST /api/etl/sync?mode=full|incremental — fetches ERP, aggregates, writes NocoDB - nocodbClient.ts: table discovery, paginated delete/insert - etlSync.ts: orchestrates fetch → aggregate → upsert - museumMapping.ts moved from client to server - Auth via ETL_SECRET bearer token Client: - dataService.ts reverts to reading NocoDB DailySales table - Paginated fetch via fetchNocoDBTable (handles >1000 rows) - Suspicious data check: prefers cache if NocoDB returns <10 rows - Deleted erpService.ts and client-side museumMapping.ts First full sync: 391K transactions → 5,760 daily records in 108s. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
119
server/src/services/etlSync.ts
Normal file
119
server/src/services/etlSync.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { fetchSales } from './erpClient';
|
||||
import { discoverTableIds, deleteRowsByMonth, deleteAllRows, insertRecords } from './nocodbClient';
|
||||
import { getMuseumFromProduct, getChannelLabel } from '../config/museumMapping';
|
||||
import type { ERPSaleRecord, AggregatedRecord } from '../types';
|
||||
|
||||
function generateMonthBoundaries(startYear: number, startMonth: number): Array<[string, string]> {
|
||||
const now = new Date();
|
||||
const endYear = now.getFullYear();
|
||||
const endMonth = now.getMonth() + 1;
|
||||
const boundaries: Array<[string, string]> = [];
|
||||
|
||||
let y = startYear;
|
||||
let m = startMonth;
|
||||
while (y < endYear || (y === endYear && m <= endMonth)) {
|
||||
const start = `${y}-${String(m).padStart(2, '0')}-01T00:00:00`;
|
||||
const nextM = m === 12 ? 1 : m + 1;
|
||||
const nextY = m === 12 ? y + 1 : y;
|
||||
const end = `${nextY}-${String(nextM).padStart(2, '0')}-01T00:00:00`;
|
||||
boundaries.push([start, end]);
|
||||
y = nextY;
|
||||
m = nextM;
|
||||
}
|
||||
|
||||
return boundaries;
|
||||
}
|
||||
|
||||
function currentMonthBoundary(): [string, string] {
|
||||
const now = new Date();
|
||||
const y = now.getFullYear();
|
||||
const m = now.getMonth() + 1;
|
||||
const start = `${y}-${String(m).padStart(2, '0')}-01T00:00:00`;
|
||||
const nextM = m === 12 ? 1 : m + 1;
|
||||
const nextY = m === 12 ? y + 1 : y;
|
||||
const end = `${nextY}-${String(nextM).padStart(2, '0')}-01T00:00:00`;
|
||||
return [start, end];
|
||||
}
|
||||
|
||||
export function aggregateTransactions(sales: ERPSaleRecord[]): AggregatedRecord[] {
|
||||
const map = new Map<string, AggregatedRecord>();
|
||||
|
||||
for (const sale of sales) {
|
||||
const date = sale.TransactionDate.split(' ')[0];
|
||||
const channel = getChannelLabel(sale.OperatingAreaName);
|
||||
|
||||
for (const product of sale.Products) {
|
||||
const museum = getMuseumFromProduct(product.ProductDescription);
|
||||
const key = `${date}|${museum}|${channel}`;
|
||||
|
||||
let entry = map.get(key);
|
||||
if (!entry) {
|
||||
entry = { Date: date, MuseumName: museum, Channel: channel, Visits: 0, Tickets: 0, GrossRevenue: 0, NetRevenue: 0 };
|
||||
map.set(key, entry);
|
||||
}
|
||||
|
||||
entry.Visits += product.PeopleCount;
|
||||
entry.Tickets += product.UnitQuantity;
|
||||
entry.GrossRevenue += product.TotalPrice;
|
||||
entry.NetRevenue += product.TotalPrice - product.TaxAmount;
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(map.values());
|
||||
}
|
||||
|
||||
export interface SyncResult {
|
||||
status: string;
|
||||
mode: string;
|
||||
transactionsFetched: number;
|
||||
recordsWritten: number;
|
||||
duration: string;
|
||||
}
|
||||
|
||||
export async function runSync(mode: 'full' | 'incremental' = 'incremental'): Promise<SyncResult> {
|
||||
const start = Date.now();
|
||||
|
||||
const tables = await discoverTableIds();
|
||||
const tableId = tables['DailySales'];
|
||||
if (!tableId) throw new Error("NocoDB table 'DailySales' not found");
|
||||
|
||||
let months: Array<[string, string]>;
|
||||
if (mode === 'full') {
|
||||
months = generateMonthBoundaries(2024, 1);
|
||||
} else {
|
||||
months = [currentMonthBoundary()];
|
||||
}
|
||||
|
||||
// Fetch from ERP sequentially (API can't handle concurrent requests)
|
||||
const allSales: ERPSaleRecord[] = [];
|
||||
for (const [startDate, endDate] of months) {
|
||||
console.log(` Fetching ${startDate.slice(0, 7)}...`);
|
||||
const chunk = await fetchSales(startDate, endDate) as ERPSaleRecord[];
|
||||
allSales.push(...chunk);
|
||||
}
|
||||
|
||||
const records = aggregateTransactions(allSales);
|
||||
|
||||
// Write to NocoDB
|
||||
if (mode === 'full') {
|
||||
console.log(' Clearing all DailySales rows...');
|
||||
await deleteAllRows(tableId);
|
||||
} else {
|
||||
const yearMonth = months[0][0].slice(0, 7);
|
||||
console.log(` Clearing ${yearMonth} rows...`);
|
||||
await deleteRowsByMonth(tableId, yearMonth);
|
||||
}
|
||||
|
||||
console.log(` Inserting ${records.length} records...`);
|
||||
const written = await insertRecords(tableId, records);
|
||||
|
||||
const duration = ((Date.now() - start) / 1000).toFixed(1) + 's';
|
||||
|
||||
return {
|
||||
status: 'ok',
|
||||
mode,
|
||||
transactionsFetched: allSales.length,
|
||||
recordsWritten: written,
|
||||
duration,
|
||||
};
|
||||
}
|
||||
109
server/src/services/nocodbClient.ts
Normal file
109
server/src/services/nocodbClient.ts
Normal file
@@ -0,0 +1,109 @@
|
||||
import { nocodb } from '../config';
|
||||
import type { AggregatedRecord } from '../types';
|
||||
|
||||
let discoveredTables: Record<string, string> | null = null;
|
||||
|
||||
async function fetchJson(url: string, options: RequestInit = {}): Promise<unknown> {
|
||||
const res = await fetch(url, {
|
||||
...options,
|
||||
headers: {
|
||||
'xc-token': nocodb.token,
|
||||
'Content-Type': 'application/json',
|
||||
...options.headers,
|
||||
},
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
throw new Error(`NocoDB ${res.status}: ${text.slice(0, 200)}`);
|
||||
}
|
||||
return res.json();
|
||||
}
|
||||
|
||||
export async function discoverTableIds(): Promise<Record<string, string>> {
|
||||
if (discoveredTables) return discoveredTables;
|
||||
if (!nocodb.baseId) throw new Error('NOCODB_BASE_ID not configured');
|
||||
|
||||
const json = await fetchJson(
|
||||
`${nocodb.url}/api/v2/meta/bases/${nocodb.baseId}/tables`
|
||||
) as { list: Array<{ title: string; id: string }> };
|
||||
|
||||
const tables: Record<string, string> = {};
|
||||
for (const t of json.list) {
|
||||
tables[t.title] = t.id;
|
||||
}
|
||||
|
||||
discoveredTables = tables;
|
||||
return tables;
|
||||
}
|
||||
|
||||
export async function deleteRowsByMonth(tableId: string, yearMonth: string): Promise<number> {
|
||||
// Fetch all row IDs for the given month using a where filter
|
||||
const where = `(Date,like,${yearMonth}%)`;
|
||||
let deleted = 0;
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore) {
|
||||
const json = await fetchJson(
|
||||
`${nocodb.url}/api/v2/tables/${tableId}/records?where=${encodeURIComponent(where)}&limit=100&fields=Id`
|
||||
) as { list: Array<{ Id: number }> };
|
||||
|
||||
const ids = json.list.map(r => r.Id);
|
||||
if (ids.length === 0) {
|
||||
hasMore = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// Bulk delete
|
||||
await fetchJson(`${nocodb.url}/api/v2/tables/${tableId}/records`, {
|
||||
method: 'DELETE',
|
||||
body: JSON.stringify(ids.map(Id => ({ Id }))),
|
||||
});
|
||||
|
||||
deleted += ids.length;
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
export async function deleteAllRows(tableId: string): Promise<number> {
|
||||
let deleted = 0;
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore) {
|
||||
const json = await fetchJson(
|
||||
`${nocodb.url}/api/v2/tables/${tableId}/records?limit=100&fields=Id`
|
||||
) as { list: Array<{ Id: number }> };
|
||||
|
||||
const ids = json.list.map(r => r.Id);
|
||||
if (ids.length === 0) {
|
||||
hasMore = false;
|
||||
break;
|
||||
}
|
||||
|
||||
await fetchJson(`${nocodb.url}/api/v2/tables/${tableId}/records`, {
|
||||
method: 'DELETE',
|
||||
body: JSON.stringify(ids.map(Id => ({ Id }))),
|
||||
});
|
||||
|
||||
deleted += ids.length;
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
export async function insertRecords(tableId: string, records: AggregatedRecord[]): Promise<number> {
|
||||
// NocoDB bulk insert accepts max 100 records at a time
|
||||
const batchSize = 100;
|
||||
let inserted = 0;
|
||||
|
||||
for (let i = 0; i < records.length; i += batchSize) {
|
||||
const batch = records.slice(i, i + batchSize);
|
||||
await fetchJson(`${nocodb.url}/api/v2/tables/${tableId}/records`, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(batch),
|
||||
});
|
||||
inserted += batch.length;
|
||||
}
|
||||
|
||||
return inserted;
|
||||
}
|
||||
Reference in New Issue
Block a user