Note: The blog is copied over(used for my reference) here https://citizen428.net/blog/asynchronous-notifications-in-postgres/
I’m fascinated by Postgres: the more I learn about it, the more I realize how much I still don’t know. Recently I discovered its asynchronous communication capabilities, which apparently have been around for a long time ¯\(ツ)/¯
Let’s look at the two most interesting commands related to this topic,
NOTIFY
and LISTEN
. Here’s what the documentation has to say on them:NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.
Sounds like publish-subscribe on the database level, interesting! I learn best by trying things out and writing some code, so let’s dive in.
Setting up Postgres for notifications
For testing purposes, let’s create an overly simplified
orders
table, that except for the primary key also contains an email address to identify the person who placed the order and a bigint
field to store the total order amount in cents:CREATE TABLE orders (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL,
total BIGINT NOT NULL
);
Next we need to define a function which returns a trigger:
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
payload JSON;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;
payload = json_build_object('table', TG_TABLE_NAME,
'action', TG_OP,
'data', row_to_json(record));
PERFORM pg_notify('events', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
The above is pretty straightforward:
- Declare some variables for later use.
- Switch on the
TG_OP
special variable to decide which version of the row we want to serialize. - Use
json_build_object
androw_to_json
to generate the notification payload. - Use
pg_notify
to broadcast a message on theevents
channel. - Return
NULL
since this is anAFTER
trigger.
Now we can create a
notify_order_event
trigger, which will call this function after we perform a CRUD operation on the orders
table:CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE PROCEDURE notify_event();
With this in place we should now be able to receive events. Let’s inform Postgres that we’re interested in notifications on the
events
channel:LISTEN events;
Now whenever we insert, update or delete a record we will receive a notification:
INSERT into orders (email, total) VALUES ('test@example.com', 10000);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.
Great, we just received our first asynchronous notification, though admittedly that’s not particularly useful within the same
psql
session, so let’s add another listener.Listening from another process
For the following example we’ll once again use Jeremy Evan’s excellent Sequel gem:
require 'sequel'
DB = Sequel.connect('postgres://user@localhost/notify-test')
puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload|
puts payload
end
The above code first connects to the database and then uses
Sequel::Postgres::Database#listen
to listen for events in a loop.
If we start this script and insert a record in our database the JSON payload will get output to the console:
→ ruby test.rb
Listening for DB events...
{"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"test@example.com","total":10000}}