Building a Queuing System with Postgres - why we did it and when you shouldn't use it

Image description

Introduction

There are lots of articles nowadays espousing the claim that Postgres should just be used for everything! (best practices be damned). There are many convincing arguments made as to why this is true:

  • https://www.amazingcto.com/postgres-for-everything
  • https://www.timescale.com/blog/postgres-for-everything
  • https://www.rudderstack.com/blog/why-rudderstack-used-postgres-over-apache-kafka-for-streaming-engine
  • https://dagster.io/blog/skip-kafka-use-postgres-message-queue

The arguments typically fall into the same categories

  • Reduce overall architecture complexity
  • Save engineer time by using familiar tech
  • Lower cost
  • Licensing concerns

With that, I shall add one of my own, but with a twist - use Postgres for everything, sometimes!

What is Kestra

The primary focus of this article is on using Postgres to manage queues. However if you are not familiar with Kestra, it is the unified orchestration platform designed to automate any workflow with a simple declarative YAML framework. The project is open source and you can check it out here.

Kestra and Queues

When Kestra was first built, the queuing system was built on Apache Kafka for messaging, along with either Elasticsearch or Opensearch as a persistent data store.

Apache Kafka was chosen because it is widely considered to be the standard bearer when it comes to robust, scalable distributed messaging systems. It has a thriving community and boasts a large ecosystem, allowing you to easily consume or produce data from/to many data sources.

Using it with other tools and services such as Apache Flink, Kafka Streams, and Spark allows you to build a near-real-time event processing system. At Kestra, with our Kafka implementation, we rely on the Kafka Streams library to manage state.

While Kafka excels at processing in-flight data, it should not be considered a persistent data store. This is where OpenSearch and Elasticsearch come into play. We leverage these technologies to store logs, execution state etc. This data store can be used then to restore our state should anything happen to our orchestration system.

So why offer Postgres as an option with Kestra?

The community is at the heart of Kestra. While building it we received feedback that for a lot of our users, the architecture was too complex. Kestra Architecture

Some of our users were using Kestra for personal projects or at a smaller scale at various companies. While Apache Kafka is a fantastic service, it can be an operational nightmare for smaller teams, especially if they have no prior experience.

Furthermore, to truly get the value out of distributed systems such as Apache Kafka, Opensearch and Elasticsearch, you want to run each service on a minimum of 3 nodes (ideally distributed across different availability zones should you be on the cloud). This inevitably leads to a larger infrastructure bill.

With that in mind we set about building Postgres as an option for both queuing and storage to better support the needs of our users. To note, while the remainder of the focus will be on Postgres, we also support H2 and MySQL in this context.

Why Postgres

It's cliché but true - Postgres is the Swiss Army Knife of databases. Postgres is the Swiss Army Knife of Databases

First and foremost, it is a reliable, performant relational database. It has been battle-tested over many years and, thanks to its devoted community, is continually improved through regular releases and extensions.

Looking to build the next hyped AI application? There's pgvector. Want to churn through millions of time-series IoT data points? That's what Timescale is for.

Now, what about building a queuing system…?

How we built a queuing system with Postgres

First, let's introduce the queues table. This is the central database table that mimics message broker queues using a database. queues schema

The columns are defined as follows:

  • offset: Serial identifier of the message.
  • type: The type of the message, like a queue name (we mimic our Kafka implementation where there is one topic per type of message).
  • key: The domain identifier of the message (for example, the flow identifier if the message is a flow). Like in Kafka there can be multiple messages with the same key/
  • value: The message itself, serialized in JSON.
  • updated: The last updated time.
  • consumer_* These are the different consumers that can consume the queues table. Our other database implementations use a single column but in PostgreSQL for indexing optimization, we use different columns. Thanks to this, a message can be consumed by different consumers, but only once by a consumer.
  • consumer_group: This supports Worker Group, an Enterprise Edition feature.

With this table, processing messages can be summarised as follows:

  1. To "produce" a message, a new record will be written to the table.
  2. To "consume" a message, we use a polling mechanism.
  3. Each queue will periodically query the queues table for itself (filter on the type column), locking the rows found.
  4. For each available message, send it to all consumers.
  5. Consumers are responsible for handling the message. They can send back a message in the queue table in the same transaction if needed.
  6. The queue will commit the transaction, releasing the lock.

Queueing tradeoff

Do you want throughput or latency? Because you cannot have both.

Depending on how you design your flows and which kind of tasks you execute, a Kestra instance can handle millions of small tasks lasting a few milliseconds … or only a few tasks lasting tens of minutes! To be able to optimize for both, the Kestra queuing mechanism can be tuned:

  • min-poll-interval: Interval between two poll queries, default 25ms. This supports lower latency but potentially can add excessive load on the database if you have a lot of Kestra instances active.
  • max-poll-interval: When no messages are available to consume during a poll event, the poll interval increases to the max poll interval (1 second by default). This limits the load on the database. This duration is configured by poll-switch-interval which is 5s by default.
  • poll-size: The number of rows fetched by each poll query, default to 100.

So in summary:

  • You optimize for throughput by increasing min-poll-interval and poll-size
  • You optimize for latency by decreasing min-poll-interval You can distribute the load by decreasing poll-size and starting more Kestra instances.

The default configuration provides a good baseline, but like many technologies your mileage will vary. Test different values of these parameters based on your usage to find what works best for your system.

Storing as JSON

Since we implemented this queuing concept in Kestra, the queues table structure has gone under nearly zero changes.

As we use JSONB for our value column, we can evolve Kestra quickly without needing to update the queues table. This prevents impacting existing indices that are crucial for performance. We use JSON columns almost everywhere in our table schema so that our domain model can change without any database changes. We also store data in JSONB columns separately so when we need to filter on it or we use JSON queries with GIN index. Doing this, we can keep our database schema simple and still have good query performance.

forUpdate().skipLocked()

To consume messages, we need to be transactional and lock the rows so that only one Kestra instance is consuming the message. For that, we use a classical FOR UPDATE clause in our poll query where we specify SKIP LOCKED so that rows currently locked by another Kestra instance do not cause delay on the poll query of another instance. This is important for performance and to avoid deadlock. What this looks like in plain SQL is the following (using the above "queues" definition)

SELECT *
FROM queues
ORDER BY offset ASC
LIMIT 100
FOR UPDATE SKIP LOCKED

Alternatives Considered

We looked at some other implementations and common queuing streaming cases involving Postgres. during our evaluation.

The Listen/Notify approach, as outlined in this article, was one. It's another interesting concept in Postgres. Unfortunately, it did not fit our use case:

  • Unlike Kafka, it does not provide a similar concept of a consumer group. As you may recall from earlier we require this concept to handle worker groups in our Enterprise Edition.
  • If a client disconnects, messages can be lost (unlike Kafka which will pick up from the last committed offset).

Change Data Capture (commonly referred to as CDC) via Debezium was another option on the table. However this would have put additional operational burden on our users as the database would need to be specifically configured for it. As the main purpose of the PostgreSQL implementation was to reduce operational burden with respect to Kafka, we didn't think it would be a good idea to use CDC - even if the technology itself is very good.

Why we recommend Apache Kafka at scale

The system we built around Postgres allows us to provide a reliable queuing mechanism to our users as they focus on building out their data orchestration flows.

For hobbyists and companies just starting out on their first flows the PostgeSQL implementation is perfect. It provides a simple, robust solution that nearly all teams large and small can deploy and maintain.

There are trade offs. Some were discussed above, particularly around latency and throughput. However, for larger scale users there are other considerations.

Scalability

With Postgres, the operation involves a poll and then fetching the results. At a small scale this is not a concern. As users ramp up their workloads there is a potential to see performance degradation leading to delays in flow execution.

Unless you are using a managed provider, scaling up Postgres is a non-trivial task. Adding more powerful hardware will require a migration of the existing database, and if this is a business critical system you'll want to do this with near-zero downtime.

On the other hand Apache Kafka is built to scale. Firstly, with our Kafka solution there is no polling operation which right away reduces load. If more compute is required in the cluster to handle the increased traffic, you can add new nodes. I am really simplifying this process but it is a well established practice that is documented in many places. This can happen seamlessly with no downtime to your cluster.

Overcoming Complexity with Apache Kafka

Earlier I mentioned some of the concerns our users had with Apache Kafka at the heart of our messaging system. While these are valid concerns, there are multiple remediations users and companies can take.

Managed Providers

One of the easiest ways to offload the operational complexity of Apache Kafka is to partner with a managed service provider. There are lots of managed providers in this space - Aiven, Confluent and AWS MSK being some of the better known but there are plenty others in this space providing varying degrees of features and functionality.

I will concede, though, that using a managed provider does not fully absolve you of all operational overhead. There is a level of shared responsibility in which you should still be aware of your cluster's health, security, etc.

KRaft

A common criticism of Kafka over the years was the Zookeeper component. While necessary for management of the cluster's metadata, it did add additional complexity into the overall architecture. However the Zookeeper will soon be going into retirement to be replaced by KRaft. This new architecture greatly simplifies the metadata store, moving it into the broker nodes themselves. Confluent provide a good description on the changes in this post.

Leverage Cloud Object Stores for Reduced TCO

One great feature common with cloud implementations of Apache Kafka, Elasticsearch and Opensearch is the concept of tiered storage. This feature, while simple in concept, not so much in implementation, allows users to move data onto cheaper storage solutions such as S3 and GCS.

For Kafka, this can mean leaner compute instances as staler data can remain on the object store until the retention period expires, with the latest "hot" data residing on fast, local NVMe. For the NoSQL databases, the older logs and other artefacts can be stored there too, but with the added bonus that they are still readily searchable. This is especially important in highly regulated industries (such as finance or health)where data retention can be of the order of years.

Conclusion

At Kestra we offer a simple, declarative orchestration framework. We want our users to be confident that they can quickly build robust data flows without having to worry about the internals of how their orchestrator is working.

For users looking to get up and running quickly and are not worried about HA, scalability at this point in their journey, we have our fully open-source version of Kestra which is backed by Postgres. For customers looking to expand and ensure High Availability and ease of scalability, we offer our enterprise solution backend by Apache Kafka and your choice of Opensearch or Elasticsearch.

As you've gotten this far you likely have these burning questions - when and at what scale should I consider utilising the Kafka backend versus Postgres? Well stick around for my next post when I will benchmark both solutions! Until then, star Kestra on GitHub to stay in touch :)

KafkaPostgresDataOrchestration
Avatar for paulgrainger85

Written by paulgrainger85

Loading

Fetching comments

Hey! 👋

Got something to say?

or to leave a comment.