PgOutput2Json library for .NET
Overview
The PgOutput2Json
library for .NET xuses PostgreSQL logical replication to push changes, as JSON messages, from database tables to a .NET application, or a message broker, such as RabbitMq (implemented) or Redis (todo).
Possible use cases for this library:
- Performing background tasks when a row is inserted/updated in a database table
- Asynchronously updating application caches
- Asynchronous logging of database changes, in a file or a separate database
- Asynchronous data aggregation
- Low latency ETL for DWH solutions
All of the examples above could be implemented with low latency because the PgOutput2Json library will dispatch a change message immediately when a transaction is committed in PostgreSQL.
Background information
Logical replication is a means to stream messages generated by PostgreSQL logical decoding plugins to a client. pgoutput
is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed.
The PgOutput2Json
library converts pgoutput
messages to JSON format which can then be used by any .NET application. One example of such an application is PgOutput2Json.RabbitMq
library which forwards these JSON messages to the RabbitMQ server.
⚠️ PgOutput2Json is in the early stages of development, but it is usable for testing purposes.
⚠️ Npgsql is used for connecting to PostgreSQL. Replication support is new in Npgsql and is considered a bit experimental.
1. Quick Start
Configure postgresql.conf
First set the configuration options in postgresql.conf
:
wal_level = logical
The other required settings have default values that are sufficient for a basic setup. PostgreSQL must be restarted after this change.
Create a user that will be used to start the replication
Login to PostgreSQL with privileges to create users, and execute:
CREATE USER pgoutput2json WITH
PASSWORD '_your_password_here_'
REPLICATION;
If you will be connecting to the PostgreSQL with this user, from a different machine (non-local connection), don’t forget to modify pg_hba.conf
. If pg_hba.conf
is modified, you have to SIGHUP the server for the changes to take effect, run pg_ctl reload
, or execute SELECT pg_reload_conf()
.
Create publication
Connect to the database that holds the tables you want to track and create a publication that specifies the tables and actions for publishing:
CREATE PUBLICATION my_publication
FOR TABLE my_table1, my_table2
WITH (publish = 'insert, update, delete, truncate');
In the C# code examples below, we will assume the database name is my_database
.
Create .NET Worker Service
Finally, create a .NET Worker Service and add the following package reference:
dotnet add package PgOutput2Json
Add using PgOutput2Json;
to the Worker.cs, and use this code in the Worker class:
public class Worker : BackgroundService
{
private readonly ILoggerFactory _loggerFactory;
public Worker(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// This code assumes PostgreSQL is on localhost
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.WithMessageHandler((json, table, key, partition) =>
{
Console.WriteLine($"{table}: {json}");
})
.Build();
await pgOutput2Json.Start(stoppingToken);
}
}
Run the code, and you should see table names and JSON messages being printed in the console every time you insert, update or delete a row from the tables you specified in the previous step.
Note that this “quick start” version is working with a temporary replication slot. That means it will not see any database changes that were done while the worker was stopped. See section 3. in this document if you want to work with a permanent replication slot.
2. Using RabbitMQ
This document assumes you have a running RabbitMQ instance working on the default port, with the default guest
/guest
user allowed from the localhost.
⚠️ First, set up the database, as described in the QuickStart section above.
Create topic exchange
In RabbitMQ, create a durable
topic
type exchange
where PgOutput2Json will be pushing JSON files. We will assume the name of the exchange is my_exchange
in the /
virtual host.
Create and bind a queue to hold the JSON messages
Create a durable
queue named my_queue
and bind it to the exchange created in the previous step. Use public.#
as the routing key. That way, it will receive JSON messages for all tables in the public
schema. Use different schema name if your tables are in different schema
Create .NET Worker Service
Create a .NET Worker Service and add the following package reference:
dotnet add package PgOutput2Json.RabbitMq
Add using PgOutput2Json.RabbitMq;
to the Worker.cs, and use this code in the Worker class:
public class Worker : BackgroundService
{
private readonly ILoggerFactory _loggerFactory;
public Worker(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// This code assumes PostgreSQL and RabbitMQ are on the localhost
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.UseRabbitMq(options =>
{
options.HostNames = new[] { "localhost" };
options.Username = "guest";
options.Password = "guest";
options.VirtualHost = "/";
options.ExchangeName = "my_exchange";
})
.Build();
await pgOutput2Json.Start(stoppingToken);
}
}
Run the code, and with a little luck, you should see JSON messages being pushed in the my_queue
in RabbitMQ when you make a change in the tables specified in my_publication
. The routing key will be in the form: schema.table.key_partition
. Since we did not configure anything specific in the PgOutput2Json, the key_partition
will always be 0
.
3. Working with permanent replication slots
In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.
To create a replication slot, call the pg_create_logical_replication_slot
function, in the same database that holds the tables being tracked:
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
The first parameter is the name of the slot, that must be unique across all databases in a PostgreSQL cluster. Make sure you specify pgoutput
as the second parameter - that specifies the correct logical decoding plugin.
The current position of each slot is persisted only at checkpoint, so in the case of a crash the slot may return to an earlier log sequence number (LSN), which will then cause recent changes to be sent again when the server restarts. It is the responsibility of the receiving client to avoid the ill effects from handling the same message more than once
⚠️ Replication slots persist across crashes and know nothing about the state of their consumer(s). They will prevent removal of required resources even when there is no connection using them. This consumes storage because neither required WAL nor required rows from the system catalogs can be removed by VACUUM as long as they are required by a replication slot. In extreme cases this could cause the database to shut down to prevent transaction ID wraparound. So if a slot is no longer required it should be dropped. To drop a replication slot, use: SELECT * FROM pg_drop_replication_slot('my_slot');
Once the replication slot is created, to use it, simply specify the name of the slot in the PgOutput2JsonBuilder
:
// ...
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.WithPgReplicationSlot("my_slot") // <-- slot specified here
//...
4. Batching confirmations to RabbitMQ
TODO (implemented, missing documentation)
5. Partitions by key
TODO (implemented, missing documentation)
6. JSON options
TODO (implemented, missing documentation)