Back to Projects
pipeline
production

ELT Reconciliation Pipeline — Airflow

ELT pipeline consolidating data from an ERP, a payments portal, and a compliance registry. Runs daily via Airflow, normalizes entity names across three sources using fuzzy matching (pg_trgm + recordlinkage), and loads results into a conformed dimension table in the DWH.

Apache AirflowPythonPostgreSQLpg_trgmELTEntity Matching

Architecture

ERP + Portal + Registry
Normalize Entities
Fuzzy Matching
Conformed DWH Dim

Code Snippet

-- Normalize entity names across three sources
CREATE OR REPLACE VIEW staging.normalized_entities AS

-- ERP
SELECT
    'erp'        AS source,
    "ClientCode" AS code,
    "ClientName" AS original_name,
    UPPER(TRIM(unaccent("ClientName"))) AS entity_std,
    regexp_replace(UPPER(TRIM(unaccent("ClientName"))),
        '[^A-Z0-9 ]', '', 'g')         AS entity_clean
FROM staging.erp_collections

UNION ALL

-- Payments Portal
SELECT
    'payments_portal'                               AS source,
    split_part("Entity", '-', 1)                    AS code,
    split_part("Entity", '-', 2)                    AS original_name,
    UPPER(TRIM(unaccent(split_part("Entity",'-',2)))) AS entity_std,
    regexp_replace(UPPER(TRIM(unaccent(
        split_part("Entity",'-',2)))), '[^A-Z0-9 ]','','g') AS entity_clean
FROM staging.payments_portal_pending
WHERE "Entity" != 'no entity'

UNION ALL

-- Compliance Registry
SELECT
    'compliance_registry'               AS source,
    NULL                                AS code,
    "institution"                       AS original_name,
    UPPER(TRIM(unaccent("institution"))) AS entity_std,
    regexp_replace(UPPER(TRIM(unaccent("institution"))),
        '[^A-Z0-9 ]', '', 'g')         AS entity_clean
FROM staging.compliance_registry_records;
Detailed write-up, screenshots, and metrics coming in Phase 4.