Thursday 28 November 2019

Postgresql PUB/SUB

Note: The blog is copied over(used for my reference) here https://citizen428.net/blog/asynchronous-notifications-in-postgres/


I’m fascinated by Postgres: the more I learn about it, the more I realize how much I still don’t know. Recently I discovered its asynchronous communication capabilities, which apparently have been around for a long time ¯\(ツ)
Let’s look at the two most interesting commands related to this topic, NOTIFY and LISTEN. Here’s what the documentation has to say on them:
NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).
Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.
LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.
Sounds like publish-subscribe on the database level, interesting! I learn best by trying things out and writing some code, so let’s dive in.

Setting up Postgres for notifications

For testing purposes, let’s create an overly simplified orders table, that except for the primary key also contains an email address to identify the person who placed the order and a bigint field to store the total order amount in cents:
CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  email TEXT NOT NULL,
  total BIGINT NOT NULL
);
Next we need to define a function which returns a trigger:
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
  DECLARE
    record RECORD;
    payload JSON;
  BEGIN
    IF (TG_OP = 'DELETE') THEN
      record = OLD;
    ELSE
      record = NEW;
    END IF;

    payload = json_build_object('table', TG_TABLE_NAME,
                                'action', TG_OP,
                                'data', row_to_json(record));

    PERFORM pg_notify('events', payload::text);

    RETURN NULL;
  END;
$$ LANGUAGE plpgsql;
The above is pretty straightforward:
  1. Declare some variables for later use.
  2. Switch on the TG_OP special variable to decide which version of the row we want to serialize.
  3. Use json_build_object and row_to_json to generate the notification payload.
  4. Use pg_notify to broadcast a message on the events channel.
  5. Return NULL since this is an AFTER trigger.
Now we can create a notify_order_event trigger, which will call this function after we perform a CRUD operation on the orders table:
CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
  FOR EACH ROW EXECUTE PROCEDURE notify_event();
With this in place we should now be able to receive events. Let’s inform Postgres that we’re interested in notifications on the events channel:
LISTEN events;
Now whenever we insert, update or delete a record we will receive a notification:
INSERT into orders (email, total) VALUES ('test@example.com', 10000);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.
Great, we just received our first asynchronous notification, though admittedly that’s not particularly useful within the same psql session, so let’s add another listener.

Listening from another process

For the following example we’ll once again use Jeremy Evan’s excellent Sequel gem:
require 'sequel'

DB = Sequel.connect('postgres://user@localhost/notify-test')

puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload|
  puts payload
end
The above code first connects to the database and then uses Sequel::Postgres::Database#listen to listen for events in a loop.
If we start this script and insert a record in our database the JSON payload will get output to the console:
→ ruby test.rb
Listening for DB events...
{"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"test@example.com","total":10000}}

What did I learn today?

Welcome to the what did I learn today series. The intention of this blog spot is to compose the stuff that I learnt day-to-day basics and jo...