View on GitHub

PgOutput2Json.NET

.NET library for building PostgreSQL CDC solutions

PgOutput2Json library for .NET

Overview

The PgOutput2Json library for .NET xuses PostgreSQL logical replication to push changes, as JSON messages, from database tables to a .NET application, or a message broker, such as RabbitMq (implemented) or Redis (todo).

Possible use cases for this library:

All of the examples above could be implemented with low latency because the PgOutput2Json library will dispatch a change message immediately when a transaction is committed in PostgreSQL.

Background information

Logical replication is a means to stream messages generated by PostgreSQL logical decoding plugins to a client. pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed.

The PgOutput2Json library converts pgoutput messages to JSON format which can then be used by any .NET application. One example of such an application is PgOutput2Json.RabbitMq library which forwards these JSON messages to the RabbitMQ server.

⚠️ PgOutput2Json is in the early stages of development, but it is usable for testing purposes.
⚠️ Npgsql is used for connecting to PostgreSQL. Replication support is new in Npgsql and is considered a bit experimental.

1. Quick Start

Configure postgresql.conf

First set the configuration options in postgresql.conf:

wal_level = logical

The other required settings have default values that are sufficient for a basic setup. PostgreSQL must be restarted after this change.

Create a user that will be used to start the replication

Login to PostgreSQL with privileges to create users, and execute:

CREATE USER pgoutput2json WITH
	PASSWORD '_your_password_here_'
	REPLICATION;

If you will be connecting to the PostgreSQL with this user, from a different machine (non-local connection), don’t forget to modify pg_hba.conf. If pg_hba.conf is modified, you have to SIGHUP the server for the changes to take effect, run pg_ctl reload, or execute SELECT pg_reload_conf().

Create publication

Connect to the database that holds the tables you want to track and create a publication that specifies the tables and actions for publishing:

CREATE PUBLICATION my_publication
    FOR TABLE my_table1, my_table2
    WITH (publish = 'insert, update, delete, truncate');

In the C# code examples below, we will assume the database name is my_database.

Create .NET Worker Service

Finally, create a .NET Worker Service and add the following package reference:

dotnet add package PgOutput2Json

Add using PgOutput2Json; to the Worker.cs, and use this code in the Worker class:

public class Worker : BackgroundService
{
    private readonly ILoggerFactory _loggerFactory;

    public Worker(ILoggerFactory loggerFactory)
    {
        _loggerFactory = loggerFactory;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // This code assumes PostgreSQL is on localhost
        using var pgOutput2Json = PgOutput2JsonBuilder.Create()
            .WithLoggerFactory(_loggerFactory)
            .WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
            .WithPgPublications("my_publication")
	    .WithMessageHandler((json, table, key, partition) =>
            {
                Console.WriteLine($"{table}: {json}");
            })
            .Build();

        await pgOutput2Json.Start(stoppingToken);
    }
}

Run the code, and you should see table names and JSON messages being printed in the console every time you insert, update or delete a row from the tables you specified in the previous step.

Note that this “quick start” version is working with a temporary replication slot. That means it will not see any database changes that were done while the worker was stopped. See section 3. in this document if you want to work with a permanent replication slot.

2. Using RabbitMQ

This document assumes you have a running RabbitMQ instance working on the default port, with the default guest/guest user allowed from the localhost.

⚠️ First, set up the database, as described in the QuickStart section above.

Create topic exchange

In RabbitMQ, create a durable topic type exchange where PgOutput2Json will be pushing JSON files. We will assume the name of the exchange is my_exchange in the / virtual host.

Create and bind a queue to hold the JSON messages

Create a durable queue named my_queue and bind it to the exchange created in the previous step. Use public.# as the routing key. That way, it will receive JSON messages for all tables in the public schema. Use different schema name if your tables are in different schema

Create .NET Worker Service

Create a .NET Worker Service and add the following package reference:

dotnet add package PgOutput2Json.RabbitMq

Add using PgOutput2Json.RabbitMq; to the Worker.cs, and use this code in the Worker class:

public class Worker : BackgroundService
{
    private readonly ILoggerFactory _loggerFactory;

    public Worker(ILoggerFactory loggerFactory)
    {
        _loggerFactory = loggerFactory;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // This code assumes PostgreSQL and RabbitMQ are on the localhost
	
        using var pgOutput2Json = PgOutput2JsonBuilder.Create()
            .WithLoggerFactory(_loggerFactory)
            .WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
            .WithPgPublications("my_publication")
            .UseRabbitMq(options =>
            {
                options.HostNames = new[] { "localhost" };
                options.Username = "guest";
                options.Password = "guest";
		options.VirtualHost = "/";
		options.ExchangeName = "my_exchange";
            })
            .Build();

        await pgOutput2Json.Start(stoppingToken);
    }
}

Run the code, and with a little luck, you should see JSON messages being pushed in the my_queue in RabbitMQ when you make a change in the tables specified in my_publication. The routing key will be in the form: schema.table.key_partition. Since we did not configure anything specific in the PgOutput2Json, the key_partition will always be 0.

3. Working with permanent replication slots

In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.

To create a replication slot, call the pg_create_logical_replication_slot function, in the same database that holds the tables being tracked:

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');

The first parameter is the name of the slot, that must be unique across all databases in a PostgreSQL cluster. Make sure you specify pgoutput as the second parameter - that specifies the correct logical decoding plugin.

The current position of each slot is persisted only at checkpoint, so in the case of a crash the slot may return to an earlier log sequence number (LSN), which will then cause recent changes to be sent again when the server restarts. It is the responsibility of the receiving client to avoid the ill effects from handling the same message more than once

⚠️ Replication slots persist across crashes and know nothing about the state of their consumer(s). They will prevent removal of required resources even when there is no connection using them. This consumes storage because neither required WAL nor required rows from the system catalogs can be removed by VACUUM as long as they are required by a replication slot. In extreme cases this could cause the database to shut down to prevent transaction ID wraparound. So if a slot is no longer required it should be dropped. To drop a replication slot, use: SELECT * FROM pg_drop_replication_slot('my_slot');

Once the replication slot is created, to use it, simply specify the name of the slot in the PgOutput2JsonBuilder:

        // ...
        using var pgOutput2Json = PgOutput2JsonBuilder.Create()
            .WithLoggerFactory(_loggerFactory)
            .WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
            .WithPgPublications("my_publication")
	    .WithPgReplicationSlot("my_slot")      // <-- slot specified here
	    //...

4. Batching confirmations to RabbitMQ

TODO (implemented, missing documentation)

5. Partitions by key

TODO (implemented, missing documentation)

6. JSON options

TODO (implemented, missing documentation)