Realtime changefeeds with PostgreSQL

If you need a database feature, PostgreSQL probably has it.

Recently, in versions 9.3 and 9.4, the PostgreSQL devs have added JSON support in the form of JSON types, functions, and operators.

You may not have known this, but PostgreSQL also has publish/subscribe functionality in the form of NOTIFY, LISTEN, UNLISTEN. This is commonly used for sending notifications that table rows have changed. Perfect for writing realtime apps on top of PostgreSQL.

Setting up notifications with PostgreSQL

We create a stored-procedure that is meant to be called from a TRIGGER:

CREATE FUNCTION notify_changes() RETURNS trigger AS $$
DECLARE
BEGIN
  IF TG_OP = 'INSERT' THEN
    PERFORM pg_notify(TG_TABLE_NAME, json_build_object('new_val', row_to_json(NEW))::text);
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    PERFORM pg_notify(TG_TABLE_NAME, json_build_object('old_val', row_to_json(OLD), 'new_val', row_to_json(NEW))::text);
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    PERFORM pg_notify(TG_TABLE_NAME, json_build_object('old_val', row_to_json(OLD))::text);
    RETURN OLD;
  END IF;
END;
$$ LANGUAGE plpgsql;

The function handles INSERT, UPDATE and DELETE operations. Depending on the operation, it sends a JSON object representing the changes performed on the table row.

To fire the function each time a row changes, we need to set up a TRIGGER on each table we want to have support for changefeed notifications:

CREATE TRIGGER changefeed AFTER INSERT OR UPDATE OR DELETE ON chat_messages
  FOR EACH ROW EXECUTE PROCEDURE notify_changes();

That’s it! Now let see how to subscribe to the changefeed.

Implementing subscriptions with Python

We are going to write a small example using Python and the Tornado Web framework to see how to subscribe to these notifications.

The following script implements a Websocket server wich broadcast incoming notifications to all subscribers. This is pretty straightforward using the Psycopg driver.

import json

from tornado.ioloop import IOLoop
from tornado.httpserver import HTTPServer
from tornado.web import Application
from tornado.websocket import WebSocketHandler

import psycopg2
import psycopg2.extensions

conn = psycopg2.connect('dbname=test user=test')
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

subscribers = []

def poll(fd, ev):
    state = conn.poll()
    if state == psycopg2.extensions.POLL_OK:
        if conn.notifies:
            change = conn.notifies.pop()
            for subscriber in subscribers:
                subscriber.on_postgres_notification(change)

def listen(*channels):
    cursor = conn.cursor()
    for channel in channels:
        cursor.execute('LISTEN %s;' % channel)

class PubSubSocket(WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(PubSubSocket, self).__init__(*args, **kwargs)
        self.channels = []

    def open(self):
        subscribers.append(self)

    def on_postgres_notification(self, change):
        if change.channel in self.channels:
            self.write_message('{"channel":"%s","change":%s}' % (change.channel, change.payload))

    def on_message(self, message):
        try:
            payload = json.loads(message)
            if 'subscribe' in payload:
                self.channels.append(payload['subscribe'])
            elif 'unsubscribe' in payload:
                self.channels.remove(payload['unsubscribe'])
        except ValueError as e:
            self.write_message({error: str(e)})

    def on_close(self):
        subscribers.remove(self)


if __name__ == '__main__':
    app = Application([
        (r'/ws', PubSubSocket)
    ])

    http_server = HTTPServer(app)
    http_server.listen(8001)

    io_loop = IOLoop.instance()
    io_loop.add_handler(conn.fileno(), poll, io_loop.READ)

    listen('chat_messages')
    io_loop.start()

Consuming notifications with Javascript

Now that we have our pubsub service up and running, consuming notifications is really easy. Here’s a basic example written in a few lines of Javascript:

// connect to endpoint
let sock = new WebSocket("ws://localhost:8001/ws")

// event listeners
sock.onmessage = event => console.log("Notification: " + event.data)
sock.onclose = event => console.warn("Connection closed.")

// subscribe to "chat_message" notifications
sock.send(JSON.stringify({subscribe: "chat_messages"}))

Bottom Line

As with all things, Postgres’s publish-subscribe is not a silver bullet.

Depending on the size of your project, you may have to rely on external queuing system such as RabbitMQ, MQTT or ZMQ. Those remain useful, providing canned functionality, proven performance, integration with other systems, better scalability, etc. Nonetheless, for simple cases PostgreSQL might provide just what you need.