# `pg_flo`
Move and transform data between PostgreSQL databases using Logical Replication.
1. [TL;DR](#tldr)
1. [How this works](#how-this-works)
1. [State Management](#state-management)
1. [Further readings](#further-readings)
1. [Sources](#sources)
## TL;DR
Leverages PostgreSQL's logical replication system to capture changes and apply transformations and filtrations to the
data before streaming it to the destination.
Decouples the _replicator_ and _worker_ processes using [NATS] as message broker.
The NATS server **must have JetStream** enabled (`nats-server -js`).
The _replicator_ component captures PostgreSQL changes via logical replication.
The _worker_ component processes and routes changes through NATS.
Setup
Check requirements
```sql
sourceDb=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name | setting
-------------------------+---------
rds.logical_replication | on
wal_level | logical
(2 rows)
```
```sh
docker pull 'nats' && docker pull 'shayonj/pg_flo'
```
Configuration file
[Reference][configuration file reference]
```yaml
# Replicator settings
host: "localhost"
port: 5432
dbname: "myapp"
user: "replicator"
password: "secret"
group: "users"
tables:
- "users"
# Worker settings (postgres sink)
target-host: "dest-db"
target-dbname: "myapp"
target-user: "writer"
target-password: "secret"
# Common settings
nats-url: "nats://localhost:4222"
```
Usage
```sh
# Open a shell
# For debugging purposes, mostly
docker run --rm --name 'pg_flo' --network 'host' --entrypoint 'sh' -ti 'shayonj/pg_flo'
# Start the replicator
# Using the config file failed for some reason at the time of writing
docker run --rm --name 'replicator' --network 'host' 'shayonj/pg_flo' \
replicator \
--host 'source-db.fqdn' --dbname 'sales' --user 'pgflo' --password '1q2w3e4r' \
--group 'whatevah' --nats-url 'nats://localhost:4222'
# Start the worker
docker run --rm --name 'pg_flo_worker' --network 'host' 'shayonj/pg_flo' \
worker stdout --group 'whatevah' --nats-url 'nats://localhost:4222'
docker run … \
worker postgres --group 'whatevah' --nats-url 'nats://localhost:4222' \
--target-host 'dest-db.fqdn' --target-dbname 'sales' --target-user 'pgflo' --target-password '1q2w3e4r'
```
Real world use cases
```sh
# Start a basic replication to stdout as example.
docker run --rm --name 'pg_flo_nats' -p '4222:4222' 'nats' -js \
&& docker run -d --name 'pg_flo_replicator' --network 'host' 'shayonj/pg_flo' \
replicator \
--host 'source-db.fqdn' --port '6001' --dbname 'sales' --user 'pgflo' --password '1q2w3e4r' \
--copy-and-stream --group 'whatevah' --nats-url 'nats://localhost:4222' \
&& docker run -d --name 'pg_flo_worker' --network 'host' 'shayonj/pg_flo' \
worker stdout --group 'whatevah' --nats-url 'nats://localhost:4222'
```
## How this works
Refer [How it Works].
1. The _replicator_ creates a PostgreSQL **publication** in the source DB for the replicated tables.
1. The _replicator_ creates a **replication slot** in the source DB.
This ensures no data is lost between streaming sessions.
1. The _replicator_ starts streaming changes from the source DB and publishes them to NATS:
- **After** performing an initial bulk copy, if in _Copy-and-Stream_ mode.
If no valid LSN is found in NATS, `pg_flo` performs an initial bulk copy of existing data.
The process is parallelized for fast data sync:
1. A snapshot is taken to ensure consistency.
1. Each table is divided into page ranges.
1. Multiple workers copy different ranges concurrently.
- **Immediately**, from the last known position, if in _Stream-Only_ mode.
It also stores the last processed LSN in NATS, allowing the _worker_ to resume operations from where it left off in
case of interruptions.
1. The _worker_ processes messages from NATS.
| Message type | Summary |
| ------------------------------------------------ | ------------------------------------ |
| Relation | Allow understanding table structures |
| `Insert`, `Update`, `Delete` | Contain actual data changes |
| `Begin`, `Commit` | Enable transaction boundaries |
| DDL changes (e.g. `ALTER TABLE`, `CREATE INDEX`) | Contain actual structural changes |
1. The _worker_ converts received data into a structured format with type-aware conversions for different PostgreSQL
data types.
1. \[If any rule is configured] The _worker_ applies transformation and filtering rules to the data.
Transform Rules:
- Regex: apply regular expression transformations to string values.
- Mask: hide sensitive data, keeping the first and last characters visible.
Filter Rules:
- Comparison: filter based on equality, inequality, greater than, less than, etc.
- Contains: filter string values based on whether they contain a specific substring.
Rules _can_ be applied selectively to `insert`, `update`, or `delete` operations.
1. The _worker_ buffers processed data.
1. The _worker_ flushes data periodically from the buffer to the configured _sinks_.
Currently, _sinks_ can be `stdout`, files, PostgreSQL DBs or webhooks.
Flushed data is written to DB sinks in batches to optimize write operations.
### State Management
The _replicator_ keeps track of its progress by updating the _Last LSN_ in NATS.
The _worker_ maintains its progress to ensure data consistency.
This allows for resumable operations across multiple runs.
Periodic status updates are sent to the source DB to maintain the replication's connection.
## Further readings
- [PostgreSQL]
- [Website]
- [Main repository]
- [Transformation rules]
- [NATS]
### Sources
- [How to set the wal_level in AWS RDS Postgresql?]
- [Configuration file reference]
- [How it Works]
[NATS]: ../nats.md
[PostgreSQL]: README.md
[configuration file reference]: https://github.com/shayonj/pg_flo/blob/main/internal/pg-flo.yaml
[how it works]: https://github.com/shayonj/pg_flo/blob/main/internal/how-it-works.md
[main repository]: https://github.com/shayonj/pg_flo
[transformation rules]: https://github.com/shayonj/pg_flo/blob/main/pkg/rules/README.md
[website]: https://www.pgflo.io/
[How to set the wal_level in AWS RDS Postgresql?]: https://dba.stackexchange.com/questions/238686/how-to-set-the-wal-level-in-aws-rds-postgresql#243576