diff --git a/.vscode/settings.json b/.vscode/settings.json
index 3bf76a5..f67a02b 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -231,6 +231,7 @@
"mountpoint",
"mpiexec",
"multiarch",
+ "nats",
"netcat",
"nfsmount",
"nindent",
diff --git a/knowledge base/nats.md b/knowledge base/nats.md
new file mode 100644
index 0000000..979dcfe
--- /dev/null
+++ b/knowledge base/nats.md
@@ -0,0 +1,175 @@
+# NATS
+
+Messaging system.
+
+1. [TL;DR](#tldr)
+1. [Troubleshooting](#troubleshooting)
+ 1. [Context deadline exceeded](#context-deadline-exceeded)
+ 1. [No responders available for request](#no-responders-available-for-request)
+1. [Further readings](#further-readings)
+ 1. [Sources](#sources)
+
+## TL;DR
+
+
+ Setup
+
+
+ Server
+
+```sh
+# Install.
+brew install 'nats-server'
+choco install 'nats-server'
+docker pull 'nats'
+go install 'github.com/nats-io/nats-server/v2@latest'
+yay 'nats-server'
+
+# Validate the configuration file.
+nats-server -c '/etc/nats/nats-server.conf' -t
+docker run --rm --name 'pg_flo_nats' -v "$PWD/config/nats-server.conf:/etc/nats/nats-server.conf" 'nats' \
+ -c '/etc/nats/nats-server.conf' -t
+```
+
+
+
+
+ Client
+
+```sh
+# Install.
+brew install 'nats-io/nats-tools/nats'
+```
+
+
+
+
+
+
+ Usage
+
+
+ Server
+
+```sh
+# Get help.
+docker run --rm --name 'pg_flo_nats' 'nats' --help
+
+# Run.
+nats-server -V
+nats-server -config 'nats-server.conf'
+docker run --name 'nats' -p '4222:4222' -ti 'nats:latest' -js
+
+# Run as cluster.
+docker run --name 'nats-0' --network 'nats' -p '4222:4222' -p '8222:8222' \
+ 'nats' --http_port '8222' --cluster_name 'NATS' --cluster 'nats://0.0.0.0:6222' \
+&& docker run --name 'nats-1' --network 'nats' \
+ 'nats' --cluster_name 'NATS' --cluster 'nats://0.0.0.0:6222' --routes='nats://ruser:T0pS3cr3t@nats:6222' \
+&& curl -fs 'http://localhost:8222/routez'
+
+# Reload the configuration.
+nats-server --signal 'reload'
+```
+
+
+
+
+ Client
+
+```sh
+# Get help.
+nats cheat server
+
+# Check connection to the server.
+nats server check connection --server 'nats://0.0.0.0:4222'
+nats server check connection -s 'nats://localhost:4222'
+
+# Request a configuration reload.
+nats --user 'sys' --password 'sys' request '$SYS.REQ.SERVER..RELOAD' ""
+
+# Start subscribers.
+nats subscribe '>' -s '0.0.0.0:4222'
+nats subscribe -s 'nats://demo.nats.io' '>'
+
+# Publish messages.
+nats pub 'hello' 'world' -s '0.0.0.0:4222'
+
+# Start listeners for Request-Reply patterns.
+nats reply 'subject' 'message'
+
+# Send requests for Request-Reply patterns.
+nats request 'help.please' 'I need help!'
+
+# List configuration contexts
+nats context ls
+nats context ls --all
+
+# List available JetStream streams.
+nats stream ls
+nats stream ls --all
+
+# List available JetStream consumer.
+nats consumer ls
+nats consumer ls --all
+```
+
+
+
+
+
+
+ Real world use cases
+
+```sh
+# Try out the Request-Reply pattern.
+# The listener will hang waiting, run the second in another shell session.
+nats reply 'help.please' 'OK, I CAN HELP!!!'
+nats request 'help.please' 'I need help!'
+```
+
+
+
+## Troubleshooting
+
+### Context deadline exceeded
+
+Error message example:
+
+```plaintext
+FTL Failed to create NATS client error="failed to create main stream: context deadline exceeded"
+```
+
+Root cause: the client cannot reach the server.
+
+### No responders available for request
+
+Error message example:
+
+```plaintext
+FTL Failed to create NATS client error="failed to create main stream: nats: no responders available for request"
+```
+
+Root cause: a request is sent to a subject that has no subscribers.
+
+## Further readings
+
+- [Website]
+- [Codebase]
+- [Documentation]
+
+### Sources
+
+
+
+
+
+
+
+[codebase]: https://github.com/nats-io
+[documentation]: https://docs.nats.io
+[website]: https://nats.io/
+
+
diff --git a/knowledge base/pg_flo.md b/knowledge base/pg_flo.md
index 1e87b3e..8a4f1bf 100644
--- a/knowledge base/pg_flo.md
+++ b/knowledge base/pg_flo.md
@@ -3,11 +3,22 @@
Move and transform data between PostgreSQL databases using Logical Replication.
1. [TL;DR](#tldr)
+1. [How this works](#how-this-works)
+ 1. [State Management](#state-management)
1. [Further readings](#further-readings)
1. [Sources](#sources)
## TL;DR
+Leverages PostgreSQL's logical replication system to capture changes and apply transformations and filtrations to the
+data before streaming it to the destination.
+
+Decouples the _replicator_ and _worker_ processes using [NATS] as message broker.
+The NATS server **must have JetStream** enabled (`nats-server -js`).
+
+The _replicator_ component captures PostgreSQL changes via logical replication.
+The _worker_ component processes and routes changes through NATS.
+
Setup
@@ -63,41 +74,130 @@ nats-url: "nats://localhost:4222"
Usage
```sh
-# Start NATS server
-docker run -d --name 'pg_flo_nats' --network 'host' -v "$PWD/config/nats-server.conf:/etc/nats/nats-server.conf" \
- 'nats' -c '/etc/nats/nats-server.conf'
+# Open a shell
+# For debugging purposes, mostly
+docker run --rm --name 'pg_flo' --network 'host' --entrypoint 'sh' -ti 'shayonj/pg_flo'
-# Start replicator (using config file)
-docker run -d --name 'pg_flo_replicator' --network 'host' -v "$PWD/config/pg_flo.yaml:/etc/pg_flo/config.yaml" \
- 'shayonj/pg_flo' replicator --config '/etc/pg_flo/config.yaml'
+# Start the replicator
+# Using the config file failed for some reason at the time of writing
+docker run --rm --name 'replicator' --network 'host' 'shayonj/pg_flo' \
+ replicator \
+ --host 'source-db.fqdn' --dbname 'sales' --user 'pgflo' --password '1q2w3e4r' \
+ --group 'whatevah' --nats-url 'nats://localhost:4222'
-# Start worker
-docker run -d --name 'pg_flo_worker' --network 'host' -v "$PWD/config/pg_flo.yaml:/etc/pg_flo/config.yaml" \
- 'shayonj/pg_flo' worker postgres --config '/etc/pg_flo/config.yaml'
+# Start the worker
+docker run --rm --name 'pg_flo_worker' --network 'host' 'shayonj/pg_flo' \
+ worker stdout --group 'whatevah' --nats-url 'nats://localhost:4222'
+docker run … \
+ worker postgres --group 'whatevah' --nats-url 'nats://localhost:4222' \
+ --target-host 'dest-db.fqdn' --target-dbname 'sales' --target-user 'pgflo' --target-password '1q2w3e4r'
```
-
+
+## How this works
+
+Refer [How it Works].
+
+1. The _replicator_ creates a PostgreSQL **publication** in the source DB for the replicated tables.
+1. The _replicator_ creates a **replication slot** in the source DB.
+ This ensures no data is lost between streaming sessions.
+1. The _replicator_ starts streaming changes from the source DB and publishes them to NATS:
+
+ - **After** performing an initial bulk copy, if in _Copy-and-Stream_ mode.
+
+
+
+ If no valid LSN is found in NATS, `pg_flo` performs an initial bulk copy of existing data.
+
+ The process is parallelized for fast data sync:
+
+ 1. A snapshot is taken to ensure consistency.
+ 1. Each table is divided into page ranges.
+ 1. Multiple workers copy different ranges concurrently.
+
+
+
+ - **Immediately**, from the last known position, if in _Stream-Only_ mode.
+
+ It also stores the last processed LSN in NATS, allowing the _worker_ to resume operations from where it left off in
+ case of interruptions.
+
+1. The _worker_ processes messages from NATS.
+
+
+
+ | Message type | Summary |
+ | ------------------------------------------------ | ------------------------------------ |
+ | Relation | Allow understanding table structures |
+ | `Insert`, `Update`, `Delete` | Contain actual data changes |
+ | `Begin`, `Commit` | Enable transaction boundaries |
+ | DDL changes (e.g. `ALTER TABLE`, `CREATE INDEX`) | Contain actual structural changes |
+
+
+
+1. The _worker_ converts received data into a structured format with type-aware conversions for different PostgreSQL
+ data types.
+1. \[If any rule is configured] The _worker_ applies transformation and filtering rules to the data.
+
+
+
+ Transform Rules:
+
+ - Regex: apply regular expression transformations to string values.
+ - Mask: hide sensitive data, keeping the first and last characters visible.
+
+ Filter Rules:
+
+ - Comparison: filter based on equality, inequality, greater than, less than, etc.
+ - Contains: filter string values based on whether they contain a specific substring.
+
+ Rules _can_ be applied selectively to `insert`, `update`, or `delete` operations.
+
+
+
+1. The _worker_ buffers processed data.
+1. The _worker_ flushes data periodically from the buffer to the configured _sinks_.
+ Currently, _sinks_ can be `stdout`, files, PostgreSQL DBs or webhooks.
+ Flushed data is written to DB sinks in batches to optimize write operations.
+
+### State Management
+
+The _replicator_ keeps track of its progress by updating the _Last LSN_ in NATS.
+
+The _worker_ maintains its progress to ensure data consistency.
+This allows for resumable operations across multiple runs.
+
+Periodic status updates are sent to the source DB to maintain the replication's connection.
## Further readings
- [Website]
- [Main repository]
- [Transformation rules]
+- [NATS]
### Sources
- [How to set the wal_level in AWS RDS Postgresql?]
- [Configuration file reference]
+- [How it Works]
+[nats]: nats.md
+
[configuration file reference]: https://github.com/shayonj/pg_flo/blob/main/internal/pg-flo.yaml
+[how it works]: https://github.com/shayonj/pg_flo/blob/main/internal/how-it-works.md
[main repository]: https://github.com/shayonj/pg_flo
[transformation rules]: https://github.com/shayonj/pg_flo/blob/main/pkg/rules/README.md
[website]: https://www.pgflo.io/
diff --git a/snippets/nats.fish b/snippets/nats.fish
new file mode 100644
index 0000000..4f6497c
--- /dev/null
+++ b/snippets/nats.fish
@@ -0,0 +1,51 @@
+#!/usr/bin/env fish
+
+###
+# Server
+# ------------------
+###
+
+# Install
+brew install 'nats-server'
+choco install 'nats-server'
+docker pull 'nats'
+go install 'github.com/nats-io/nats-server/v2@latest'
+yay 'nats-server'
+
+# Validate the configuration file
+nats-server -c '/etc/nats/nats-server.conf' -t
+docker run --rm --name 'pg_flo_nats' -v "$PWD/config/nats-server.conf:/etc/nats/nats-server.conf" 'nats' \
+ -c '/etc/nats/nats-server.conf' -t
+
+# Get help
+docker run --rm --name 'pg_flo_nats' 'nats' --help
+
+# Run
+nats-server -V
+docker run --name 'nats' -p '4222:4222' -ti 'nats:latest'
+
+# Run as cluster
+docker run --name 'nats-0' --network 'nats' -p '4222:4222' -p '8222:8222' \
+ 'nats' --http_port '8222' --cluster_name 'NATS' --cluster 'nats://0.0.0.0:6222' \
+&& docker run --name 'nats-1' --network 'nats' \
+ 'nats' --cluster_name 'NATS' --cluster 'nats://0.0.0.0:6222' --routes='nats://ruser:T0pS3cr3t@nats:6222' \
+&& curl -fs 'http://localhost:8222/routez'
+
+###
+# Client
+# ------------------
+###
+
+# Install
+brew install 'nats-io/nats-tools/nats'
+
+# Check connection to the server
+nats server check connection --server 'nats://0.0.0.0:4222'
+nats server check connection -s 'nats://localhost:4222'
+
+# Start subscribers
+nats subscribe '>' -s '0.0.0.0:4222'
+nats subscribe -s 'nats://demo.nats.io' '>'
+
+# Publish messages
+nats pub 'hello' 'world' -s '0.0.0.0:4222'