In one of my previous posts: PostgreSQL CDC Best Practices: Managing WAL Growth and Replication Slots, I explored the mechanics of Change Data Capture (CDC) and included a teaser:
Whether choosing WAL-based CDC or the Outbox Pattern, understanding these underlying mechanics helps make informed architectural decisions and build more resilient data pipelines. Apache NiFi can be an option to consume CDC or Outbox Pattern data.
This post is about that promise. I share how to implements both, CDC and Outbox pattern with Apache Nifi 2.7. Also, I describe their trade-offs I’d consider and provide monitoring scripts.
Executive Summary
The Outbox Pattern offers simple operations and explicit control over event schemas, making it well-suited for event-driven architectures where you control the source application.
Implementation: a dedicated database events table is populated by database triggers and NiFi’s QueryDatabaseTable processor for incremental polling.
The CDC Pattern is ideal when you need the complete change history or must capture changes from applications you can’t modify. However, it requires more operational overhead (compared to outbox) to manage replication slots and WAL growth.
Implementation: changes captured directly from PostgreSQL’s Write-Ahead Log (WAL) using pg_logical_slot_get_changes().
Why Apache NiFi?
Apache NiFi 2.0/2.7 brings significant improvements: a modernized architecture (goodbye ZooKeeper), enhanced security, and better cloud-native deployment options.
Also Snowflake Openflow is built on Apache NiFi. It’s Snowflake’s native data integration service that became generally available in November 2025. It is:
an integration service that connects any data source and any destination with hundreds of processors supporting structured and unstructured text, images, audio, video and sensor data. Built on Apache NiFi.
Skills developed with NiFi 2.7 can be transferred to Snowflake’s managed offering. Whether you’re running self-hosted NiFi or Snowflake Openflow, the approaches demonstrated below apply.
What you’ll learn
- Hands-on implementation of CDC using PostgreSQL logical replication
- Hands-on implementation of the Outbox Pattern with table polling
- Monitoring strategies for both approaches
- When to choose which pattern
Architecture
Let’s revisit the architectural differences between the CDC and Outbox patterns. I covered this in CDC and Outbox Pattern: Same Same but Different, however let’s recap:
Change Data Capture (CDC)

Outbox Pattern

Implementation
The complete code, including setup scripts, monitoring tools, and Apache NiFi 2.7 flow configurations, is available on Codeberg codeberg.org/lotharschulz/nifi-postgresql and GitHub gh.com/lotharschulz/nifi-postgresql.
Please note the GitHub copilot agent definitions in case your consider to fork and adapt the code.
Docker Compose Base
The implementation uses two docker containers, one for Postgres and another for Apache Nifi orchestrated by docker compose:
services:
postgres:
image: postgres:17.7-alpine3.23
container_name: nifi_database
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
nifi:
image: apache/nifi:2.7.2
container_name: nifi_cdc
ports:
- "8443:8443"
environment:
- SINGLE_USER_CREDENTIALS_USERNAME=${NIFI_SINGLE_USER_CREDENTIALS_USERNAME}
- SINGLE_USER_CREDENTIALS_PASSWORD=${NIFI_SINGLE_USER_CREDENTIALS_PASSWORD}
- NIFI_WEB_HTTPS_PORT=8443
volumes:
- nifi_state:/opt/nifi/nifi-current/state
- nifi_conf:/opt/nifi/nifi-current/conf
- nifi_logs:/opt/nifi/nifi-current/logs
# Mount the PostgreSQL JDBC driver directly
- ./jdbc-driver/postgresql-42.7.8.jar:/opt/nifi/nifi-current/lib/postgresql-42.7.8.jar
depends_on:
- postgres
volumes:
postgres_data:
nifi_state:
nifi_conf:
nifi_logs:
Key PostgreSQL configuration
The wal_level=logical setting is important — it instructs PostgreSQL to add information necessary to support logical decoding. The max_replication_slots and max_wal_senders parameters determine how many concurrent subscriptions, in this case CDC consumers, can be supported.
The initialization script creates both the business table and the outbox infrastructure:
-- Create the business/order table
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_name VARCHAR(255) NOT NULL,
product VARCHAR(255) NOT NULL,
quantity INTEGER NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create the outbox table
CREATE TABLE outbox (
id SERIAL PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create index for efficient CDC
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
-- Function to automatically create outbox events
CREATE OR REPLACE FUNCTION create_order_event()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'Order',
NEW.id::TEXT,
TG_OP,
jsonb_build_object(
'id', NEW.id,
'customer_name', NEW.customer_name,
'product', NEW.product,
'quantity', NEW.quantity,
'total_amount', NEW.total_amount,
'created_at', NEW.created_at
)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger to capture order changes
CREATE TRIGGER order_outbox_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION create_order_event();
-- Create publication for logical replication
CREATE PUBLICATION outbox_publication FOR TABLE outbox;
CDC Deep Dive
Replication Slots and Write Ahead Logging (WAL)
PostgreSQL’s logical replication uses replication slots to track which WAL log records have been consumed. The pg_logical_slot_get_changes() function is the key interface:
SELECT * FROM pg_logical_slot_get_changes(
'nifi_cdc_slot', -- slot name
NULL, -- up to LSN (NULL = all available)
NULL, -- max changes (NULL = unlimited)
'include-timestamp', 'on' -- decoder options, not used below
);
Critical behavior: This function consumes changes from WAL log. Once the changes are read, they’re gone from the slot. This is both a feature for automatic cleanup and a responsibility for you to process them reliably.
NiFi Flow
The CDC flow is a collection of these steps:
ExecuteSQL → ConvertRecord → SplitJson → EvaluateJsonPath → RouteOnAttribute → LogAttribute
Execute SQL Processor Configuration
"SQL Query":
"SELECT * FROM
pg_logical_slot_get_changes('nifi_cdc_slot', NULL, NULL);"
This configuration defined as property will be executed every 10 seconds as per TIMER_DRIVEN strategy.
NiFi 2.7 gotcha: If you’re migrating from NiFi 1.x or following older tutorials, note that several property names changed to use spaces and title case. Key examples include:
record-reader→Record Readerrecord-writer→Record WriterLog prefix→Log PrefixSQL select query→SQL Queryputsql-sql-statement→SQL Statement
CDC Monitoring Script
The CDC monitoring script checks the CDC data flow. Please note, that this NiFi setup uses polling instead of a streaming connection. This is because the data ingestion into the PostgreSQL database is pushing updates every 5 seconds.
# Summary -> Slot Status and WAL Lag (overview of all slots)
echo -e "${YELLOW}Slot Status and WAL Lag:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
pg_size_pretty(COALESCE(safe_wal_size, 0)) AS safe_wal_size,
restart_lsn,
confirmed_flush_lsn
FROM pg_replication_slots
ORDER BY slot_name;"
# Health status/alerts -> Inactive Slots Analysis (identifies problematic slots)
echo -e "\n${YELLOW}Inactive Slots Analysis:${NC}"
local inactive=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
pg_size_pretty(COALESCE(safe_wal_size, 0)) AS safe_wal_size
FROM pg_replication_slots
WHERE NOT active;" 2>/dev/null)
if echo "$inactive" | grep -q "(0 rows)"; then
echo -e "${GREEN}✓ No inactive slots - all slots are active${NC}"
else
echo "$inactive"
# Check if any inactive slots have large lag (indicating a real problem)
local large_lag_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
"SELECT COUNT(*) FROM pg_replication_slots
WHERE NOT active
AND pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) > ${LAG_THRESHOLD_BYTES};" 2>/dev/null || echo "0")
if [ "$large_lag_count" -gt 0 ]; then
echo -e "\n${RED}⚠ Warning: Inactive slots with large lag (>500 MB) detected!${NC}"
echo -e "${YELLOW}This indicates slots are not being consumed and WAL is accumulating.${NC}"
echo -e "${YELLOW}Action: Investigate why consumers are not active or remove unused slots.${NC}"
else
echo -e "\n${BLUE}🔍 Debug: Slots are inactive but lag is low.${NC}"
echo -e "${CYAN}Context:${NC}"
echo -e "${CYAN} - This is appearing because flows like './test-cdc.sh --continuous' use${NC}"
echo -e "${CYAN} pg_logical_slot_get_changes() via ExecuteSQL on a schedule (e.g., every 10 seconds)${NC}"
echo -e "${CYAN} - This doesn't maintain a persistent connection${NC}"
echo -e "${CYAN} - Inactive slots with low lag are very likely being consumed periodically${NC}"
echo -e "${CYAN} - Slots are only 'active' when a consumer is actively connected and reading${NC}"
fi
fi
# Breakdowns -> WAL Configuration (system settings)
echo -e "\n${YELLOW}WAL Configuration:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
name,
setting,
unit
FROM pg_settings
WHERE name IN ('wal_level', 'max_replication_slots', 'max_wal_senders', 'max_slot_wal_keep_size')
ORDER BY name;"
# Sample data -> Slots Ordered by Lag (shows which slots have most lag)
if [ "$slot_count" -gt 0 ]; then
echo -e "\n${YELLOW}Slots Ordered by Lag Size:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size,
CASE
WHEN active THEN 'consuming'
ELSE 'idle'
END as status,
restart_lsn
FROM pg_replication_slots
ORDER BY pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) DESC;" 2>/dev/null
fi
# Technical metrics -> WAL Directory Information (storage usage)
echo -e "\n${YELLOW}WAL Directory Information:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
pg_size_pretty(SUM(size)) as total_wal_size,
COUNT(*) as wal_file_count
FROM pg_ls_waldir();" 2>/dev/null || echo -e "${YELLOW}Note: pg_ls_waldir() requires superuser privileges${NC}"
The Inactive Slot “False Alarm” Explained
Polling-based CDC consumers will always show active = false between polls. This is fundamentally different from streaming consumers that maintain persistent connections. Due to that, monitor lag size, not active status. A slot with active = false and lag_size = 8KB is healthy. A slot with active = false and lag_size = 5GB needs investigation.
The key insight: inactive + low lag = normal polling behavior.
Only inactive + high lag indicates a problem.
Outbox Pattern Deep Dive
The Outbox Pattern decouples event generation from event consumption. When a business event occurs (upsert on orders), a trigger automatically creates a corresponding item in the outbox table.
CREATE TRIGGER order_outbox_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION create_order_event();
Benefits:
- Application controls the event schema — you decide exactly what goes into
payload - Transactional consistency — the business operation and event creation are atomic
- Simpler consumer logic — just poll a table, no replication slot management
NiFi Flow
QueryDatabaseTable → ConvertRecord → SplitJson → EvaluateJsonPath → LogAttribute
The QueryDatabaseTable Processor uses incremental loading based on the id column:
Maximum-value Columns: idTable Name: outbox- Scheduling: Every 10 seconds
This processor tracks the highest id it has seen and only fetches new records on subsequent runs.
Delete After Processing
After successfully processing an event, we delete it from the outbox database table:
DELETE FROM outbox WHERE id = ${event.id}
This keeps the outbox table small and provides exactly-once semantics. The underlying working assumption is: the downstream processing is idempotent or the delete only happens after successful delivery.
Outbox Monitoring Script
The monitoring script follows the same monitoring pattern as above:
- Summary
- Health status/alerts
- Breakdowns
- Sample Data
- Technical Metrics
# Summary -> Get Pending Events Count
local pending_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
"SELECT COUNT(*) FROM outbox;" 2>/dev/null || echo "0")
if [ "$pending_count" -eq 0 ]; then
echo -e "${GREEN}✓ No pending events in outbox table${NC}"
echo -e " Generate test data with: ${CYAN}./test-outbox.sh${NC}\n"
else
echo -e "${YELLOW}⚠ Pending events in outbox: ${pending_count}${NC}\n"
fi
# Summary -> Event Type Distribution (event type overview)
echo -e "${YELLOW}Event Type Distribution:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
event_type,
COUNT(*) as count,
MIN(created_at) as oldest_event,
MAX(created_at) as newest_event
FROM outbox
GROUP BY event_type
ORDER BY count DESC;" 2>/dev/null
# Health status/alerts -> Event Age Analysis (identifies problems/warnings)
# Check for old events
echo -e "\n${YELLOW}Event Age Analysis:${NC}"
local old_count=$(docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -At -c \
"SELECT COUNT(*) FROM outbox
WHERE created_at < NOW() - INTERVAL '${AGE_THRESHOLD_SECONDS} seconds';" 2>/dev/null || echo "0")
if [ "$old_count" -gt 0 ]; then
echo -e "${RED}⚠ Warning: ${old_count} events older than $((AGE_THRESHOLD_SECONDS / 60)) minutes detected!${NC}"
echo -e "${YELLOW}This may indicate that NiFi consumers are not processing events.${NC}"
echo -e "${YELLOW}Action: Check NiFi flow status and processor state.${NC}\n"
# Show oldest events
echo -e "${YELLOW}Oldest Unprocessed Events:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
id,
aggregate_type,
event_type,
aggregate_id,
created_at,
EXTRACT(EPOCH FROM (NOW() - created_at))::INTEGER as age_seconds
FROM outbox
ORDER BY created_at ASC
LIMIT 5;" 2>/dev/null
else
echo -e "${GREEN}✓ No old events - all events are recent (< $((AGE_THRESHOLD_SECONDS / 60)) minutes)${NC}"
fi
# Breakdowns -> Aggregate Type Distribution (breakdown by aggregate type)
echo -e "\n${YELLOW}Aggregate Type Distribution:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
aggregate_type,
COUNT(*) as count
FROM outbox
GROUP BY aggregate_type
ORDER BY count DESC;" 2>/dev/null
# Sample data -> Recent Events (actual event records)
# Show recent events
if [ "$pending_count" -gt 0 ]; then
echo -e "\n${YELLOW}Recent Events (Last 5):${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
id,
aggregate_type,
event_type,
aggregate_id,
created_at
FROM outbox
ORDER BY created_at DESC
LIMIT 5;" 2>/dev/null
fi
# Technical metrics -> Table Statistics (infrastructure/sizing info)
# Show table statistics
echo -e "\n${YELLOW}Outbox Table Statistics:${NC}"
docker exec nifi_database psql -U "${POSTGRES_USER}" -d "${POSTGRES_DB}" -c \
"SELECT
pg_size_pretty(pg_total_relation_size('outbox')) as total_size,
pg_size_pretty(pg_relation_size('outbox')) as table_size,
pg_size_pretty(pg_indexes_size('outbox')) as indexes_size
FROM pg_class
WHERE relname = 'outbox';" 2>/dev/null
Comparison: CDC vs. Outbox
| Aspect | CDC (Logical Replication) | Outbox |
|---|---|---|
| Setup complexity | Higher (replication slots, WAL config) | Lower (just a table + trigger) |
| Schema control | Database-driven (captures all columns) | Application-driven (you design the payload) |
| Exactly-once delivery | Harder (slot management, consumer tracking) | Easier (delete after process) |
| WAL impact | Direct dependency | Indirect (normal table operations) |
| NiFi processor | ExecuteSQL with pg_logical_slot_get_changes() | QueryDatabaseTable |
| Failure recovery | Restart from restart_lsn | Resume from max(id) |
| Operational overhead | Monitor slot lag, WAL growth | Monitor table size, event age |
Software Architecture Implementation Mapping
CDC

Outbox

| Diagram Component | Your Implementation |
|---|---|
| Message Relay | Apache NiFi 2.7 |
| Outbox Table | outbox table with trigger |
| Transaction Log View | pg_logical_slot_get_changes() |
| Message Broker | LogAttribute Nifi processor |
Production Considerations
Monitoring – WAL Management
Ensure stalled consumers do not fill your disk e.g. by setting max wal size:
-- Set max_slot_wal_keep_size to prevent WAL file bloat
ALTER SYSTEM SET max_slot_wal_keep_size = '20GB';
init.sql & my previous post about wal size limit.
When to Choose: CDC or Outbox
Choose CDC (Logical Replication) when:
- You need to include all changed data, including legacy applications you can’t modify
- Your schema changes are database-driven
- You build a data lake or audit log is required including complete change history
Choose Outbox Pattern when:
- You focus simpler operations
- You manage and implement the application that is the source of data changes
- You need explicit control over event schema and content
- You implement event-driven architecture with self- or well-defined domain events
The code in Action
Please pay attention to the times spent per tasks. It’s in the milliseconds range. Some tasks take even less than 100 milliseconds. I consider this real time. The data source is a database and logical replication (CDC) and polling the database (outbox) is an async process though.
Conclusion
Both CDC and Outbox patterns are viable approaches with Apache NiFi 2.7:
- CDC via logical replication gives you comprehensive change capture with minimal application changes, at the cost of more complex operational requirements
- The Outbox Pattern provides controlled event publishing with simpler operations, however requires application-level integration
This post is part of a series on Change Data Capture (CDC) and Outbox pattern. See also: PostgreSQL CDC Best Practices: Managing WAL Growth and Replication Slots and CDC and Outbox Pattern: Same Same but Different.
2026 01 06 update: PostgreSQL jdbc driver version bump to 42.7.8 (1,2)
Leave a Reply