Building Real Time Data Pipelines with CDC, Outbox, PostgreSQL, and Apache NiFi

Two people representing two software architecture patterns: CDC and Outbox.

In one of my previous posts: PostgreSQL CDC Best Practices: Managing WAL Growth and Replication Slots, I explored the mechanics of Change Data Capture (CDC) and included a teaser:

Whether choosing WAL-based CDC or the Outbox Pattern, understanding these underlying mechanics helps make informed architectural decisions and build more resilient data pipelines. Apache NiFi can be an option to consume CDC or Outbox Pattern data.

This post is about that promise. I share how to implements both, CDC and Outbox pattern with Apache Nifi 2.7. Also, I describe their trade-offs I’d consider and provide monitoring scripts.


Executive Summary

The Outbox Pattern offers simple operations and explicit control over event schemas, making it well-suited for event-driven architectures where you control the source application.
Implementation: a dedicated database events table is populated by database triggers and NiFi’s QueryDatabaseTable processor for incremental polling.

The CDC Pattern is ideal when you need the complete change history or must capture changes from applications you can’t modify. However, it requires more operational overhead (compared to outbox) to manage replication slots and WAL growth.
Implementation: changes captured directly from PostgreSQL’s Write-Ahead Log (WAL) using pg_logical_slot_get_changes().


Why Apache NiFi?

Apache NiFi 2.0/2.7 brings significant improvements: a modernized architecture (goodbye ZooKeeper), enhanced security, and better cloud-native deployment options.

Also Snowflake Openflow is built on Apache NiFi. It’s Snowflake’s native data integration service that became generally available in November 2025. It is:

an integration service that connects any data source and any destination with hundreds of processors supporting structured and unstructured text, images, audio, video and sensor data. Built on Apache NiFi.

About Openflow

Skills developed with NiFi 2.7 can be transferred to Snowflake’s managed offering. Whether you’re running self-hosted NiFi or Snowflake Openflow, the approaches demonstrated below apply.

What you’ll learn

  • Hands-on implementation of CDC using PostgreSQL logical replication
  • Hands-on implementation of the Outbox Pattern with table polling
  • Monitoring strategies for both approaches
  • When to choose which pattern

Architecture

Let’s revisit the architectural differences between the CDC and Outbox patterns. I covered this in CDC and Outbox Pattern: Same Same but Different, however let’s recap:

Change Data Capture (CDC)

Change Data Capture (CDC) Software Architecture Pattern

Outbox Pattern

Outbox Software Architecture Pattern

Implementation

The complete code, including setup scripts, monitoring tools, and Apache NiFi 2.7 flow configurations, is available on Codeberg codeberg.org/lotharschulz/nifi-postgresql and GitHub gh.com/lotharschulz/nifi-postgresql.
Please note the GitHub copilot agent definitions in case your consider to fork and adapt the code.

Docker Compose Base

The implementation uses two docker containers, one for Postgres and another for Apache Nifi orchestrated by docker compose:

services:
  postgres:
    image: postgres:17.7-alpine3.23
    container_name: nifi_database
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=4"
      - "-c"
      - "max_wal_senders=4"

  nifi:
    image: apache/nifi:2.7.2
    container_name: nifi_cdc
    ports:
      - "8443:8443"
    environment:
      - SINGLE_USER_CREDENTIALS_USERNAME=${NIFI_SINGLE_USER_CREDENTIALS_USERNAME}
      - SINGLE_USER_CREDENTIALS_PASSWORD=${NIFI_SINGLE_USER_CREDENTIALS_PASSWORD}
      - NIFI_WEB_HTTPS_PORT=8443
    volumes:
      - nifi_state:/opt/nifi/nifi-current/state
      - nifi_conf:/opt/nifi/nifi-current/conf
      - nifi_logs:/opt/nifi/nifi-current/logs
      # Mount the PostgreSQL JDBC driver directly
      - ./jdbc-driver/postgresql-42.7.8.jar:/opt/nifi/nifi-current/lib/postgresql-42.7.8.jar
    depends_on:
      - postgres

volumes:
  postgres_data:
  nifi_state:
  nifi_conf:
  nifi_logs:

docker-compose.yml

Key PostgreSQL configuration

The wal_level=logical setting is important — it instructs PostgreSQL to add information necessary to support logical decoding. The max_replication_slots and max_wal_senders parameters determine how many concurrent subscriptions, in this case CDC consumers, can be supported.

The initialization script creates both the business table and the outbox infrastructure:

-- Create the business/order table
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_name VARCHAR(255) NOT NULL,
    product VARCHAR(255) NOT NULL,
    quantity INTEGER NOT NULL,
    total_amount DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create the outbox table
CREATE TABLE outbox (
    id SERIAL PRIMARY KEY,
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create index for efficient CDC
CREATE INDEX idx_outbox_created_at ON outbox(created_at);

-- Function to automatically create outbox events
CREATE OR REPLACE FUNCTION create_order_event()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        NEW.id::TEXT,
        TG_OP,
        jsonb_build_object(
            'id', NEW.id,
            'customer_name', NEW.customer_name,
            'product', NEW.product,
            'quantity', NEW.quantity,
            'total_amount', NEW.total_amount,
            'created_at', NEW.created_at
        )
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Trigger to capture order changes
CREATE TRIGGER order_outbox_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION create_order_event();

-- Create publication for logical replication
CREATE PUBLICATION outbox_publication FOR TABLE outbox;

init.sql

CDC Deep Dive

Replication Slots and Write Ahead Logging (WAL)

PostgreSQL’s logical replication uses replication slots to track which WAL log records have been consumed. The pg_logical_slot_get_changes() function is the key interface:

SELECT * FROM pg_logical_slot_get_changes(
    'nifi_cdc_slot',  -- slot name
    NULL,             -- up to LSN (NULL = all available)
    NULL,             -- max changes (NULL = unlimited)
    'include-timestamp', 'on'  -- decoder options, not used below
);

Critical behavior: This function consumes changes from WAL log. Once the changes are read, they’re gone from the slot. This is both a feature for automatic cleanup and a responsibility for you to process them reliably.

NiFi Flow

The CDC flow is a collection of these steps:

ExecuteSQL → ConvertRecord → SplitJson → EvaluateJsonPath → RouteOnAttribute → LogAttribute

Execute SQL Processor Configuration

"SQL Query": 
"SELECT * FROM 
pg_logical_slot_get_changes('nifi_cdc_slot', NULL, NULL);"

This configuration defined as property will be executed every 10 seconds as per TIMER_DRIVEN strategy.

SQL Processor Configuration

NiFi 2.7 gotcha: If you’re migrating from NiFi 1.x or following older tutorials, note that several property names changed to use spaces and title case. Key examples include:

  • record-readerRecord Reader
  • record-writerRecord Writer
  • Log prefixLog Prefix
  • SQL select querySQL Query
  • putsql-sql-statementSQL Statement

CDC Monitoring Script

The CDC monitoring script checks the CDC data flow. Please note, that this NiFi setup uses polling instead of a streaming connection. This is because the data ingestion into the PostgreSQL database is pushing updates every 5 seconds.

    # Summary -> Slot Status and WAL Lag (overview of all slots)
    echo -e "${YELLOW}Slot Status and WAL Lag:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            slot_name,
            active,
            pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
            pg_size_pretty(COALESCE(safe_wal_size, 0)) AS safe_wal_size,
            restart_lsn,
            confirmed_flush_lsn
        FROM pg_replication_slots
        ORDER BY slot_name;"
    
    # Health status/alerts -> Inactive Slots Analysis (identifies problematic slots)
    echo -e "\n${YELLOW}Inactive Slots Analysis:${NC}"
    local inactive=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            slot_name,
            pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
            pg_size_pretty(COALESCE(safe_wal_size, 0)) AS safe_wal_size
        FROM pg_replication_slots
        WHERE NOT active;" 2>/dev/null)
    
    if echo "$inactive" | grep -q "(0 rows)"; then
        echo -e "${GREEN}✓ No inactive slots - all slots are active${NC}"
    else
        echo "$inactive"
        
        # Check if any inactive slots have large lag (indicating a real problem)
        local large_lag_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
            "SELECT COUNT(*) FROM pg_replication_slots 
             WHERE NOT active 
             AND pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) > ${LAG_THRESHOLD_BYTES};" 2>/dev/null || echo "0")
        
        if [ "$large_lag_count" -gt 0 ]; then
            echo -e "\n${RED}⚠ Warning: Inactive slots with large lag (>500 MB) detected!${NC}"
            echo -e "${YELLOW}This indicates slots are not being consumed and WAL is accumulating.${NC}"
            echo -e "${YELLOW}Action: Investigate why consumers are not active or remove unused slots.${NC}"
        else
            echo -e "\n${BLUE}🔍 Debug: Slots are inactive but lag is low.${NC}"
            echo -e "${CYAN}Context:${NC}"
            echo -e "${CYAN}  - This is appearing because flows like './test-cdc.sh --continuous' use${NC}"
            echo -e "${CYAN}    pg_logical_slot_get_changes() via ExecuteSQL on a schedule (e.g., every 10 seconds)${NC}"
            echo -e "${CYAN}  - This doesn't maintain a persistent connection${NC}"
            echo -e "${CYAN}  - Inactive slots with low lag are very likely being consumed periodically${NC}"
            echo -e "${CYAN}  - Slots are only 'active' when a consumer is actively connected and reading${NC}"
        fi
    fi
    
    # Breakdowns -> WAL Configuration (system settings)
    echo -e "\n${YELLOW}WAL Configuration:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            name,
            setting,
            unit
        FROM pg_settings
        WHERE name IN ('wal_level', 'max_replication_slots', 'max_wal_senders', 'max_slot_wal_keep_size')
        ORDER BY name;"
    
    # Sample data -> Slots Ordered by Lag (shows which slots have most lag)
    if [ "$slot_count" -gt 0 ]; then
        echo -e "\n${YELLOW}Slots Ordered by Lag Size:${NC}"
        docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
            "SELECT
                slot_name,
                active,
                pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
                CASE 
                    WHEN active THEN 'consuming'
                    ELSE 'idle'
                END as status,
                restart_lsn
            FROM pg_replication_slots
            ORDER BY pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) DESC;" 2>/dev/null
    fi

    # Technical metrics -> WAL Directory Information (storage usage)
    echo -e "\n${YELLOW}WAL Directory Information:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            pg_size_pretty(SUM(size)) as total_wal_size,
            COUNT(*) as wal_file_count
        FROM pg_ls_waldir();" 2>/dev/null || echo -e "${YELLOW}Note: pg_ls_waldir() requires superuser privileges${NC}"

monitor-cdc-slot.sh

The Inactive Slot “False Alarm” Explained

Polling-based CDC consumers will always show active = false between polls. This is fundamentally different from streaming consumers that maintain persistent connections. Due to that, monitor lag size, not active status. A slot with active = false and lag_size = 8KB is healthy. A slot with active = false and lag_size = 5GB needs investigation.

The key insight: inactive + low lag = normal polling behavior.
Only inactive + high lag indicates a problem.

Outbox Pattern Deep Dive

The Outbox Pattern decouples event generation from event consumption. When a business event occurs (upsert on orders), a trigger automatically creates a corresponding item in the outbox table.

CREATE TRIGGER order_outbox_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION create_order_event();

init.sql

Benefits:

  • Application controls the event schema — you decide exactly what goes into payload
  • Transactional consistency — the business operation and event creation are atomic
  • Simpler consumer logic — just poll a table, no replication slot management

NiFi Flow

QueryDatabaseTable → ConvertRecord → SplitJson → EvaluateJsonPath → LogAttribute

The QueryDatabaseTable Processor uses incremental loading based on the id column:

  • Maximum-value Columns: id
  • Table Name: outbox
  • Scheduling: Every 10 seconds

This processor tracks the highest id it has seen and only fetches new records on subsequent runs.

Delete After Processing

After successfully processing an event, we delete it from the outbox database table:

DELETE FROM outbox WHERE id = ${event.id}

This keeps the outbox table small and provides exactly-once semantics. The underlying working assumption is: the downstream processing is idempotent or the delete only happens after successful delivery.

Outbox Monitoring Script

The monitoring script follows the same monitoring pattern as above:

  1. Summary
  2. Health status/alerts
  3. Breakdowns
  4. Sample Data
  5. Technical Metrics
    # Summary -> Get Pending Events Count
    local pending_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
        "SELECT COUNT(*) FROM outbox;" 2>/dev/null || echo "0")
    
    if [ "$pending_count" -eq 0 ]; then
        echo -e "${GREEN}✓ No pending events in outbox table${NC}"
        echo -e "  Generate test data with: ${CYAN}./test-outbox.sh${NC}\n"
    else
        echo -e "${YELLOW}⚠ Pending events in outbox: ${pending_count}${NC}\n"
    fi
    
    # Summary -> Event Type Distribution (event type overview)
    echo -e "${YELLOW}Event Type Distribution:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            event_type,
            COUNT(*) as count,
            MIN(created_at) as oldest_event,
            MAX(created_at) as newest_event
        FROM outbox
        GROUP BY event_type
        ORDER BY count DESC;" 2>/dev/null
    
    # Health status/alerts -> Event Age Analysis (identifies problems/warnings)
    # Check for old events
    echo -e "\n${YELLOW}Event Age Analysis:${NC}"
    local old_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
        "SELECT COUNT(*) FROM outbox 
         WHERE created_at < NOW() - INTERVAL '${AGE_THRESHOLD_SECONDS} seconds';" 2>/dev/null || echo "0")
    
    if [ "$old_count" -gt 0 ]; then
        echo -e "${RED}&#x26a0; Warning: ${old_count} events older than $((AGE_THRESHOLD_SECONDS / 60)) minutes detected!${NC}"
        echo -e "${YELLOW}This may indicate that NiFi consumers are not processing events.${NC}"
        echo -e "${YELLOW}Action: Check NiFi flow status and processor state.${NC}\n"
        
        # Show oldest events
        echo -e "${YELLOW}Oldest Unprocessed Events:${NC}"
        docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
            "SELECT
                id,
                aggregate_type,
                event_type,
                aggregate_id,
                created_at,
                EXTRACT(EPOCH FROM (NOW() - created_at))::INTEGER as age_seconds
            FROM outbox
            ORDER BY created_at ASC
            LIMIT 5;" 2>/dev/null
    else
        echo -e "${GREEN}✓ No old events - all events are recent (< $((AGE_THRESHOLD_SECONDS / 60)) minutes)${NC}"
    fi
    
    # Breakdowns -> Aggregate Type Distribution (breakdown by aggregate type)
    echo -e "\n${YELLOW}Aggregate Type Distribution:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            aggregate_type,
            COUNT(*) as count
        FROM outbox
        GROUP BY aggregate_type
        ORDER BY count DESC;" 2>/dev/null
    
    # Sample data -> Recent Events (actual event records)
    # Show recent events
    if [ "$pending_count" -gt 0 ]; then
        echo -e "\n${YELLOW}Recent Events (Last 5):${NC}"
        docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
            "SELECT
                id,
                aggregate_type,
                event_type,
                aggregate_id,
                created_at
            FROM outbox
            ORDER BY created_at DESC
            LIMIT 5;" 2>/dev/null
    fi
    
    # Technical metrics -> Table Statistics (infrastructure/sizing info)
    # Show table statistics
    echo -e "\n${YELLOW}Outbox Table Statistics:${NC}"
    docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
        "SELECT
            pg_size_pretty(pg_total_relation_size('outbox')) as total_size,
            pg_size_pretty(pg_relation_size('outbox')) as table_size,
            pg_size_pretty(pg_indexes_size('outbox')) as indexes_size
        FROM pg_class
        WHERE relname = 'outbox';" 2>/dev/null

monitor-outbox-slot.sh

Comparison: CDC vs. Outbox

AspectCDC (Logical Replication)Outbox
Setup complexityHigher (replication slots, WAL config)Lower (just a table + trigger)
Schema controlDatabase-driven (captures all columns)Application-driven (you design the payload)
Exactly-once deliveryHarder (slot management, consumer tracking)Easier (delete after process)
WAL impactDirect dependencyIndirect (normal table operations)
NiFi processorExecuteSQL with pg_logical_slot_get_changes()QueryDatabaseTable
Failure recoveryRestart from restart_lsnResume from max(id)
Operational overheadMonitor slot lag, WAL growthMonitor table size, event age

Software Architecture Implementation Mapping

CDC

Change Data Capture (CDC) Software Architecture Pattern

Outbox

Outbox Software Architecture Pattern
Diagram ComponentYour Implementation
Message RelayApache NiFi 2.7
Outbox Tableoutbox table with trigger
Transaction Log Viewpg_logical_slot_get_changes()
Message BrokerLogAttribute Nifi processor

Production Considerations

Monitoring – WAL Management

Ensure stalled consumers do not fill your disk e.g. by setting max wal size:

-- Set max_slot_wal_keep_size to prevent WAL file bloat
ALTER SYSTEM SET max_slot_wal_keep_size = '20GB';

init.sql & my previous post about wal size limit.

When to Choose: CDC or Outbox

Choose CDC (Logical Replication) when:

  • You need to include all changed data, including legacy applications you can’t modify
  • Your schema changes are database-driven
  • You build a data lake or audit log is required including complete change history

Choose Outbox Pattern when:

  • You focus simpler operations
  • You manage and implement the application that is the source of data changes
  • You need explicit control over event schema and content
  • You implement event-driven architecture with self- or well-defined domain events

The code in Action

Please pay attention to the times spent per tasks. It’s in the milliseconds range. Some tasks take even less than 100 milliseconds. I consider this real time. The data source is a database and logical replication (CDC) and polling the database (outbox) is an async process though.

Conclusion

Both CDC and Outbox patterns are viable approaches with Apache NiFi 2.7:

  • CDC via logical replication gives you comprehensive change capture with minimal application changes, at the cost of more complex operational requirements
  • The Outbox Pattern provides controlled event publishing with simpler operations, however requires application-level integration

This post is part of a series on Change Data Capture (CDC) and Outbox pattern. See also: PostgreSQL CDC Best Practices: Managing WAL Growth and Replication Slots and CDC and Outbox Pattern: Same Same but Different.


2026 01 06 update: PostgreSQL jdbc driver version bump to 42.7.8 (1,2)

Be the first to comment

Leave a Reply

Your email address will not be published.


*


This site uses Akismet to reduce spam. Learn how your comment data is processed.