CAP-Anonymizer als 4-Stage-Pipeline mit Faker, HMAC und Validator
Source-Connector, Transform, Validator, Sink-Loader – vier Stages in einem CAP-Modul, lauffähig in BTP. Wie die Pipeline strukturiert ist, was @sap/cds + Faker + crypto.HMAC + NER zusammen leisten, und was im Validator stehen muss damit nichts durchrutscht.
Was die Pipeline leisten muss
Aus PRD anonymisierte Daten in QAS bringen, deterministisch, prüfbar, ohne dass der Salt PRD verlässt. Auf BTP, in einem CAP-Modul, nightly.
Die vier Stages
```mermaid
flowchart LR
PRD[(PRD HDI
Live-Daten)]
PRD -->|Tech-User
Read-Only| S1[Stage 1
Source-Connector]
S1 -->|Streaming| S2[Stage 2
Transform-Pipeline]
S2 --> S3[Stage 3
Validator]
S3 -->|pass| S4[Stage 4
Sink-Loader]
S3 -.->|fail| Alert[Alert + Rollback]
S4 -->|TRUNCATE +
Bulk-INSERT| QAS[(QAS HDI
Anonymisiert)]
Salt[(HANA Secure Store
HMAC-Salt, nur in PRD)]
Salt -.->|nur zur Laufzeit| S2
style PRD fill:#fecaca,stroke:#b91c1c style QAS fill:#bbf7d0,stroke:#15803d style Salt fill:#fef3c7,stroke:#d97706 ```
Stage 1 – Source-Connector
Liest aus PRD-HDI mit @sap/hana-client, streaming statt all-in-memory.
```typescript // services/anonymizer/src/source.ts import * as cds from '@sap/cds';
export async function* streamFromPrd(entity: string) { const conn = await cds.connect.to('prd-hdi'); const stream = conn.stream(SELECT.from(entity)); for await (const row of stream) { yield row; } } ```
Wichtige Eigenschaften:
- Tech-User mit Read-Only-Scope (BAMVP_ANON_RO)
- keine temporären Files, alles im Memory-Stream
- bei Fehler: graceful abort + Alert
Stage 2 – Transform-Pipeline
Pro Feld eine Regel aus der Field-Registry:
typescript
// services/anonymizer/src/registry.ts
export const fieldRegistry = {
'BillingItems.soldToPartyName': 'faker:companyName',
'BillingItems.senderName': 'faker:companyName',
'BillingItems.senderGln': 'hmac:gln', // K2 – deterministisch
'BillingItems.invoiceeVatNo': 'hmac:vatNumber',
'BillingItems.netAmount': 'jitter:5pct', // K3 – sum-preserving
'BillingItems.description': 'ner:scrub',
'BillingItems.invoiceDate': 'dateShift:customer', // pro Customer derselbe Offset
// ...
};
Die Regeln werden über einen Switch ausgeführt:
typescript
function applyRule(rule: string, value: any, ctx: Context): any {
if (rule === 'faker:companyName') return faker.company.name();
if (rule.startsWith('hmac:')) {
const salt = await secureStore.getSalt(); // nur zur Laufzeit
return formatPreserving(hmac(salt, value), rule.split(':')[1]);
}
if (rule.startsWith('jitter:')) {
const pct = parseInt(rule.split(':')[1].replace('pct', ''));
return value * (1 + (Math.random() - 0.5) * 2 * pct / 100);
}
if (rule === 'ner:scrub') return nerScrub(value);
if (rule === 'dateShift:customer') return shiftByCustomer(value, ctx.customerId);
// ...
}
Salt kommt aus PRD HANA Secure Store, nicht aus env-Variables. Lebt nur zur Pipeline-Laufzeit im Memory.
Stage 3 – Validator
Drei Checks, alle müssen pass:
Check A – Re-ID-Test
Sample 100 Rows, hash-vergleich mit PRD: kein Match = pseudonymisiert ok.
typescript
async function reIdCheck(sample: Row[]) {
const prdHashes = await fetchPrdRowHashes(sample.map(r => r.id));
const matches = sample.filter(r => prdHashes.has(hashRow(r)));
if (matches.length > 0) throw new Error(`Re-ID-leak: ${matches.length} rows`);
}
Check B – k-Anonymity (k≥5)
Pro Quasi-Identifier-Kombination müssen mindestens 5 Datensätze existieren:
typescript
function kAnonymityCheck(rows: Row[], k = 5) {
const groups = groupBy(rows, r => `${r.zipCode}|${r.industry}|${r.invoicePeriod}`);
const violators = Object.entries(groups).filter(([, rs]) => rs.length < k);
if (violators.length > 0) throw new Error(`k<${k} for ${violators.length} groups`);
}
Check C – Aggregat-Plausibilität
Summen-erhaltend: SUM(netAmount) darf um max 2 % vom PRD-Wert abweichen.
typescript
async function sumCheck() {
const qasSum = await sumNetAmount('qas');
const prdSum = await sumNetAmount('prd');
const drift = Math.abs(qasSum - prdSum) / prdSum;
if (drift > 0.02) throw new Error(`Sum-drift ${drift*100}%`);
}
Wenn ein Check fehlschlägt: Pipeline aborts, QAS bleibt im alten Zustand, Alert geht raus.
Stage 4 – Sink-Loader
typescript
async function loadIntoQas(entity: string, rows: Row[]) {
const qas = await cds.connect.to('qas-hdi');
await qas.tx(async tx => {
await tx.run(`TRUNCATE TABLE ${entity}`);
await tx.run(INSERT.into(entity).rows(rows));
});
}
Transaktional. Bei Fehler: Rollback. QAS hat entweder den neuen Stand oder den alten – nie einen Mix.
Job-Scheduler
javascript
// mta.yaml – Auszug
- name: anonymizer
type: nodejs
parameters:
memory: 512M
requires:
- name: prd-hdi-tech-user
- name: qas-hdi-rw
- name: job-scheduler
parameters:
schedule: "0 0 1 * * *" # 01:00 CET nightly
Operations-Aufwand
- Stage-1-Schema-Drift erkennen: pro Schema-Change in PRD muss die Pipeline angepasst werden – typisch 2-4 Touchpoints pro Quartal
- Field-Registry-Updates bei neuen Entities: ~30 Min pro Entity
- Re-ID-Test halbjährlich auf Sample mit Auditor
Was beim ersten Setup typisch schief geht
- Salt-Lifecycle: Salt wird versehentlich in
.envcommittet → Compromiss - NER-Scrub zu aggressiv: löscht Material-Codes die wie Namen aussehen → Bug-Repro auf QAS scheitert
- Date-Shift inkonsistent: pro Row eigener Shift → Customer-Chronologie zerstört
- k-Anonymity nicht geprüft: bei seltenen Industrien sind k=2-3 üblich → Re-ID-Risiko bleibt
Wer das einrichten will
Ein Workshop (halbtägig) baut Ihre Anonymizer-Pipeline für die kritischsten 2-3 Entities, mit lauffähigem Code-Skeleton. Output: deploy-ready CAP-Modul + Field-Registry-Template.
Stand: 2026-05-10
