Sneak peek at Immutable’s event liberation pipeline, Part 1

Wei
Immutable Engineering
9 min readJun 27, 2023

--

This is the first blog of a 3-part blog series.

Introduction

We take platform scalability seriously at Immutable, so we have chosen the event-driven microservices (EDM) pattern as our architectural language. This approach requires our backend services to model state updates and communicate with each other regarding events, which are lightweight data structures carrying the details of the new states.

Like many start-up companies, we initially didn’t start on the event-driven path. Our first version was built as a monolithic system with high coupling. We took several months to transition from the monolithic architecture to the microservices. This blog post discusses a crucial concept that secured our successful transition, event liberation, and how we implement this concept in our backend using PostgreSQL, A concurrent task queue, and AWS EventBridge.

The transition from Monolith to Microservices

Back in the days when our platform was built upon a monolithic system, a single transaction could touch many components, updating several tables (the execution of the transaction is single-threaded).

HandleRequest
Query1
Update1
Query2
Update2
Query3
Update3
...
Done

Maintaining and testing this system was hard. Our system could be easily overloaded if there were many requests to handle. It took a lot of work to parallelize the request handling to utilize modern cloud infrastructure; even worse, the database became a bottleneck for our business growth because it could not deal with infinite read/write activities.

We adopted a method called tactical forking to decompose these components inside the monolith, making them independent systems with their own databases.

Handle the same request in the new architecture looks like this:

// microservice 1
System1-HandleRequest
Query1
BeginTransaction
Update1
EmitEvent1
EndTransaction
Done

// microservive 2
System2-ConsumeEvent1
Query2
BeginTransaction
Update2
EmitEvent2
EndTransaction
Done

// microservice 3
System2-ConsumeEvent2

... // other microservices repeating the same pattern

Those systems (microservices) communicate with each other to complete the saga — a set of fully distributed activities running concurrently to serve a request end-to-end. Because each system has its own computing resources and databases, we can scale it up and down based on the volume of requests it’s serving. Their implementation tends to be much simpler than the monolithic version.

It looked like a big win. However, we realized we also introduced a new challenge: how could we guarantee distributed transactional safety?

To illustrate this challenge, let’s zoom in to microservice 1 (I have added some additional comments):

// microservice 1
System1-HandleRequest
Query1 // validate request
BeginTransaction // update states then send out an event
Update1 // update sender and receiver's accounts
EmitEvent1 // emit an event to the downstream services
EndTransaction
Done

Imagine this system needs to check whether the user is eligible to transfer some assets to another user (done by Query1), and only if so can the system move on to remove the assets from her account, update the receiver’s account, then emits an event containing both the sender and receiver user’s id and the asset details (done by BeginTransaction..EndTransactionblock)

The special notation BeginTransction, and EndTransaction, remind us that the operations within this block must succeed or fail atomically. In more concise terms, 1) when they fail, it is as if none of these operations happened. Conversely, 2) if both activities succeed, the system guarantees that Update1 happened before EmitEvent1.

Keen readers may notice that this resembles the database transaction guarantee. We indeed took inspiration from this guarantee, but we couldn’t just wrap these two steps in a database transaction because EmitEvent involves a small subsystem written with custom backend code, not understood by the database. More importantly, to mimic the behavior of the database transaction, we added another key requirement:

3) The step EmitEvent must ensure the success of the event delivery. The entire transaction may only fail if the first step Update1 fails; the second step should have much stronger error resilience or a self-healing property.

We did extensive R&D for the third requirement. We decided to give the event system the self-healing property, so it will recover automatically in the case of event-emission failures and ensure delivery (with an acceptable amount of delay). We land our eyes on the transactional outbox pattern. It’s worth giving a shoutout to Chris Richardson, who explained this pattern in his book.

The gist of this pattern is:

A service that uses a relational database inserts messages/events into an outbox table (e.g. MESSAGE) as part of the local transaction.

A separate Message Relay process publishes the events inserted into database to a message broker.

(here is the outbox pattern diagram borrowed from Chris’ website)

In our case, the Database is PostgreSQL; the Message Broker is AWS EventBridge. The only missing piece is the Message Relay — let’s introduce the Liberator.

Liberator

Why the name Liberator?

In event-driven architecture (EDA), event liberation refers to the act (or system) that frees data from traditional data storage (relational databases), allowing business services to consume the data without separate data-retrieval mechanisms.

We borrowed this term from the book Building Event-Driven Microservices: Leveraging Organizational Data at Scale, which dedicates a chapter on event liberation.

The Liberator is a daemon service we developed that connects to the PostgreSQL database, receiving the logical replication objects, aka WAL objects (we will talk about them in a second), and publishing them to the AWS EventBridge. We took inspiration from Kafka CDC connector — a similar service that streams WAL objects from PostgreSQL to the Kafka cluster.

Logical replication objects

For simplicity and ease of memorization, we are going to use these two terms interchangeably throughout this blog post:

  • replication object
  • WAL object

Both refer to the binary representation of the event that modifies a table (can be insertion, deletion, or column modification). WAL stands for write-ahead-log, a decades-old technology invented for database replication.

Recall that the transactional outbox pattern advocates using an outbox table (yes, it’s literally named outbox) as part of the local transaction to store the eventified state updates.

Suppose a balance service needs to update the balance table and emits a balance-updated event to the other components; it would execute the following pseudo-SQL code:

BEGIN TRANSACTION;
UPDATE balance with values (id, name, value)
INSERT INTO outbox (event_id, event_type, name, value)
values (new_event_id(), "balance-updated", name, value)
COMMIT TRANSACTION;

Once the new row is inserted into the outbox table, PostgreSQL will create a special logical replication object — a compact binary representation that describes the insertion.

This behavior must be enabled in the PostgreSQL configure file (or in the AWS RDS Configuration Group), but we will leave this as an exercise for the readers. For more information, please refer to its official document: Chapter 31. Logical Replication

It’s worth noting that PostgreSQL creates replication objects for all types of changes: insert, update, and delete, but it is up to the replication slot (explained in the following section) to decide what type(s) of change events to send out.

Similar features exist in other SQL databases such as MySQL (where the bin-file, bin-object is equivalent to the WAL file, WAL object).

For curious readers, logical replication was originally invented many years ago to replicate database changes across a cluster. We use this feature to create read/write replicas for performance purposes. It also allows us to migrate databases cross-region and even between different versions of PostgreSQL.

The receiver unit

The Liberator has a receiver unit that listens to the insertion of new rows to the outbox table via a special construct called the replication slot. We configured it to only deliver the replication objects for the insertions to the outbox table; all the other events, such as row-updates, row-deletions, or events not originating from the outbox table, are kept hidden from the Liberator.

The receiver unit then decodes these replication objects (which are binary blobs, remember?), transforming them into the shape of schema-conforming event objects, then publishes them to AWS EventBridge. Again, for readers from the Kafka school, this is conceptually similar to the inner working of the Kafka CDC connector (and you may wonder whether there is a mechanism similar to the topic offset. You are right. We will explain this in the Logical Replication Stream section below).

Putting these together, we can illustrate the logical replication stream in the following illustrative way. We also want to thank Pat Wright for his excellent explanation of logical replication.

Logical Replication Stream Illustrated

Let us start with the initial state: on the left-hand side, we have the replication slot (and the WAL file); on the right-hand side, we have the receiver unit, which consists of a buffer, a couple of functions called processor, and LSN-confirm. In the middle, we have 5 events (aka replication objects) pending arrival.

LSN stands for log-sequence-number. It is an unsigned 64-bit integer used to determine a position in the WAL file. You can think of it as an array index. Every WAL object (aka change event) is associated with an LSN, just like every array element is associated with an index.

Note that the replication slot (the square labeled with WAL) has LSN=0, which means the replication slot will deliver any events with LSN higher than 0.

The initial state of the logical replication stream. Note the WAL has LSN=0.

After some time, the receiver unit has received events 1 to 4, which are kept in the buffer for processing.

Note that, at this moment, the PostgreSQL replication slot is unaware of our progress (i.e., it doesn’t know that the receiver unit has received events 1 to 4). It still thinks the LSN is 0 and would redeliver events 1 to 4, which brings some implications and subtleties. We will look at them in a second.

The receiver has received and processed events 1 to 4.

Once the receiver unit has processed events 1 to 4, it invokes the LSN-Confirm function that writes LSN=4 (the highest LSN among all the newly processed events) to the replication slot. Also, the buffer is emptied, ready to hold new events.

The LSN-confirm function writes the new LSN (4) to the replication slot, updating its internal LSN (4).

This step is crucial in securing the replication’s reliability and consistency. Once the WAL unit gets the updated LSN (4), it refreshes its internal state. This means the replication slot will only deliver events with an LSN higher than 4. Consequentially, this ensures:

  • The replication stream will not redeliver old events whose LSN is lower than the confirmed one.
  • If the replication stream goes down due to a network outage, system restarts, or any unpredictable disastrous events, PostgreSQL and the receiver unit can trivially re-establish the logical replication stream to deliver events from the last confirmed LSN.
  • PostgreSQL will keep the undelivered events (as long as it has enough disk space).
  • PostgreSQL will delete all the old replication objects with lower LSN from the WAL file, freeing the disk space.

Continue this streaming process: more and more events arrive at the receiver unit as replication objects. The receiver processes these events and returns the LSN number to the replication slot, enabling more deliveries.

In the next blog, we will look closely at the event-processing and -publishing mechanism, where we will see an interesting concurrency pattern in action: the concurrent task queue pattern.

At Immutable, we quickly test and iterate; we take ownership and strive to improve our work. We are currently preparing for the new improvements to our event pipeline. Join us if you like system programming, networking, and solving distributed system problems.

References

Transactional output pattern.

Chris Richardson’s microservice architecture website.

AWS Event-driven architecture overview.

Building Event-Driven Microservices — Leveraging Organizational Data at Scale, O’Reilly Media (2020)

Replication slots.

Change Data Capture, The Complete Guide.

--

--

Techie@Stealth Mode. I do tech things for art people. My career spans across system programming, blockchain, video games, film vfx, and cybersecurity.