Files
oam/knowledge base/pg_flo.md
2024-11-09 12:46:31 +01:00

7.0 KiB

pg_flo

Move and transform data between PostgreSQL databases using Logical Replication.

  1. TL;DR
  2. How this works
    1. State Management
  3. Further readings
    1. Sources

TL;DR

Leverages PostgreSQL's logical replication system to capture changes and apply transformations and filtrations to the data before streaming it to the destination.

Decouples the replicator and worker processes using NATS as message broker.
The NATS server must have JetStream enabled (nats-server -js).

The replicator component captures PostgreSQL changes via logical replication.
The worker component processes and routes changes through NATS.

Setup
Check requirements
sourceDb=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
          name           | setting
-------------------------+---------
 rds.logical_replication | on
 wal_level               | logical
(2 rows)
docker pull 'nats' && docker pull 'shayonj/pg_flo'
Configuration file

Reference

# Replicator settings
host: "localhost"
port: 5432
dbname: "myapp"
user: "replicator"
password: "secret"
group: "users"
tables:
  - "users"

# Worker settings (postgres sink)
target-host: "dest-db"
target-dbname: "myapp"
target-user: "writer"
target-password: "secret"

# Common settings
nats-url: "nats://localhost:4222"
Usage
# Open a shell
# For debugging purposes, mostly
docker run --rm --name 'pg_flo' --network 'host' --entrypoint 'sh' -ti  'shayonj/pg_flo'

# Start the replicator
# Using the config file failed for some reason at the time of writing
docker run --rm --name 'replicator' --network 'host' 'shayonj/pg_flo' \
  replicator \
    --host 'source-db.fqdn' --dbname 'sales' --user 'pgflo' --password '1q2w3e4r' \
    --group 'whatevah' --nats-url 'nats://localhost:4222'

# Start the worker
docker run --rm --name 'pg_flo_worker' --network 'host' 'shayonj/pg_flo' \
  worker stdout --group 'whatevah' --nats-url 'nats://localhost:4222'
docker run … \
  worker postgres --group 'whatevah' --nats-url 'nats://localhost:4222' \
    --target-host 'dest-db.fqdn' --target-dbname 'sales' --target-user 'pgflo' --target-password '1q2w3e4r'
Real world use cases
# Start a basic replication to stdout as example.
docker run --rm --name 'pg_flo_nats' -p '4222:4222' 'nats' -js \
&& docker run -d --name 'pg_flo_replicator' --network 'host' 'shayonj/pg_flo' \
    replicator \
      --host 'source-db.fqdn' --port '6001' --dbname 'sales' --user 'pgflo' --password '1q2w3e4r' \
      --copy-and-stream --group 'whatevah' --nats-url 'nats://localhost:4222' \
&& docker run -d --name 'pg_flo_worker' --network 'host' 'shayonj/pg_flo' \
    worker stdout --group 'whatevah' --nats-url 'nats://localhost:4222'

How this works

Refer How it Works.

  1. The replicator creates a PostgreSQL publication in the source DB for the replicated tables.

  2. The replicator creates a replication slot in the source DB.
    This ensures no data is lost between streaming sessions.

  3. The replicator starts streaming changes from the source DB and publishes them to NATS:

    • After performing an initial bulk copy, if in Copy-and-Stream mode.

      If no valid LSN is found in NATS, pg_flo performs an initial bulk copy of existing data.

      The process is parallelized for fast data sync:

      1. A snapshot is taken to ensure consistency.
      2. Each table is divided into page ranges.
      3. Multiple workers copy different ranges concurrently.
    • Immediately, from the last known position, if in Stream-Only mode.

    It also stores the last processed LSN in NATS, allowing the worker to resume operations from where it left off in case of interruptions.

  4. The worker processes messages from NATS.

    Message type Summary
    Relation Allow understanding table structures
    Insert, Update, Delete Contain actual data changes
    Begin, Commit Enable transaction boundaries
    DDL changes (e.g. ALTER TABLE, CREATE INDEX) Contain actual structural changes
  5. The worker converts received data into a structured format with type-aware conversions for different PostgreSQL data types.

  6. [If any rule is configured] The worker applies transformation and filtering rules to the data.

    Transform Rules:

    • Regex: apply regular expression transformations to string values.
    • Mask: hide sensitive data, keeping the first and last characters visible.

    Filter Rules:

    • Comparison: filter based on equality, inequality, greater than, less than, etc.
    • Contains: filter string values based on whether they contain a specific substring.

    Rules can be applied selectively to insert, update, or delete operations.

  7. The worker buffers processed data.

  8. The worker flushes data periodically from the buffer to the configured sinks.
    Currently, sinks can be stdout, files, PostgreSQL DBs or webhooks.
    Flushed data is written to DB sinks in batches to optimize write operations.

State Management

The replicator keeps track of its progress by updating the Last LSN in NATS.

The worker maintains its progress to ensure data consistency.
This allows for resumable operations across multiple runs.

Periodic status updates are sent to the source DB to maintain the replication's connection.

Further readings

Sources