Event Sourcing for the poor or one PostgreSQL is enough for you

Introduction

There are two ways to store data. You can store it in its final state right away or keep a list of events that, when processed sequentially, yield that final state. The latter is known as Event Sourcing.

Event Sourcing is an approach to working with events. You use a series of events representing changes, and based on them, you reconstruct the state of your object.

In today's world, any business application will eventually have to answer the questions - who changed what and when? Can the data be rolled back to a previous state? Or, for example, how can you view a snapshot of the data as it was a week ago? Therefore, starting something new without considering the architecture from the outset, in my opinion, is negligent.

However, there is the flip side - speed and experience. Even with the knowledge and experience on how to build such systems correctly, I prefer not to start a project with infrastructure construction. Having a good message broker and a fast data storage is essential for working with Event Sourcing. Often, the combination used is Kafka + RabbitMQ + MongoDB/Elastic. This setup works well, but I don't want to set up all these complex systems locally just to start writing code. And they aren't necessary until your users generate hundreds of thousands of requests per second and you face scaling challenges.

Therefore, my initial criteria for finding a good solution were:

This is how I came to have an endless love for PostgreSQL.

Not Only Event Sourcing

Working with events alone is not enough; it's also necessary to reconsider the approach to working with APIs. There can be much debate, with books, projects, and talks on how to properly build interactions between clients and applications. But in my opinion, CQRS is the way to go. Especially considering the fact that, together with an event-driven architecture, it works perfectly.

Command and Query Responsibility Segregation (CQRS) is a principle or paradigm that separates the handling of queries and commands for data processing.

Simply put, an API is divided into two types of requests: mutating and querying data.

Along with an event-driven approach, this results in essentially one mutating request to insert events and a number of necessary and diverse data read requests.

Additionally, it's important to understand that data representation and reading do not have to be executed in a classical manner. They can and should be closer to Domain-Driven Design (DDD).

Domain-Driven Design (DDD) is a set of principles and patterns aimed at creating optimal object systems. It involves creating software abstractions called domain models.

Of course, there's no sense in rigidly adhering to the entirety of Evans' DDD and implementing it in all its details, but I will borrow some practices from this approach.

It's also important to understand that overall, we will be leveraging the Event Streaming pattern. However, only partially and without delving deeply into details. The point is that the combination of Event Storing + Event Streaming + CQRS gives the result I am aiming for in terms of the application.

Event Streaming is the process of continuously capturing and storing events as they occur in a system. These events can then be processed and analyzed in real-time or stored for later analysis.

High-Level Implementation Idea

The idea is quite simple:

image-20240511232007413

If Explained Simply:

With this approach, we have no restrictions on the data structure organization; we can store everything in JSON if we want, or fully normalize it, depending on the requirements.

Moving to Implementation

The complete working example is available in the GitHub repository.

I will implement the example in Go, using a set of libraries that I find convenient and enjoyable, but the core idea can be realized in any way you prefer and is not dependent on my specific choices.

Create a new project and use cobra by spf13 to initialize the entry point..

It's better to install cobra-cli and run:

We won't go into detailed descriptions and command setups now; let's continue building the structure.

Configuration

First, it would be good to teach our application to read some configuration parameters from a .env. file. For this, viper by spf13 will help, along with a separate package within the application to simplify my life.

Place this file in the project at: internal/config/config.go.

In the envConfigs type, declare the data that will be important to us and expand it as needed.

By reading the configuration in init(), the very first inclusion of our file in the project will make it read the configuration. There are pitfalls in such an implementation, but we'll skip them for now.

Database

Next, we'll need to work with the database itself. I really like the ORM by uptrace, called bun. Moreover, I have been a fan since the days of go-pg by the same team. Therefore, I will use this library as the foundation. It doesn't bind you to anything, but it simplifies life in many aspects.

We'll also add uuid right away since we will use them for typing identifiers.

 

Now it's time to create a small abstraction for working with the database.

Add a file and a new package at internal/db/db.go

In the future, this file can be expanded and gain additional functions, plus it will make it easier to pass dependencies throughout the project.

Now let's address migrations right away, as I don't want to pull in any additional tools for this, and bun has everything we need.

Create a new file cmd/migrations.go with the following content:

Now you have a set of commands that will help with migrations. This is not an exhaustive list but sufficient for the purpose of this example. For expanding the list of commands, refer to bun's implementation examples.

You also need to create the migrations folder ./migrations and place a main.go file there.

Getting Everything Ready for Message Handling

To do this, create a new file and package internal/event_listener/event_listener.go

Also, place helpers.go alongside it.

This file will help us parse what the trigger function will send us later on.

Http Handlers

Now let's prepare everything to launch the web server. I will use gin as one of the popular routers and the one that is most convenient for me.

Create another package and file internal/router/router.go.

Again, you can expand on this idea and add anything you need here as you see fit. CORS, default routes, etc.

Abstractions and Interfaces

Great, we're almost done with the preparatory work.

Let's add some basic interfaces and models that we will need. To do this, create a package and file internal/aggregates/models.go and place the following in it:

Here you see:

If something seems strange for now, wait a moment; soon everything will become clear.

Now let's add a basic repository for handling event queries.

Create another package with a file internal/repositories/event_repository.go.

Let's also add a common handler for events in the package internal/handlers/event_handlers.go.

Domain Packages

In my worldview, someday all this code might need to be split into microservices, which means it would be logical to separate domains from each other right away. Who knows, they might end up running in different containers. This will simply facilitate refactoring in the future. This step is not mandatory and can be simplified, but I'll leave everything as it is.

A domain package is a package containing all models, database queries, route handlers, and reference information for a specific domain.

Let's imagine that we currently have only one entity for the example - users. So, we'll operate with it.

Create a package with the following structure:

In the file internal/domains/users/models/users.go, let's declare our data representation model:

n the file internal/domains/users/repository/users.go, we'll add database queries:

There are only 4 of them:

And now it's time to explain the lock table and why our Save and Delete queries are implemented this way.

The setup looks like this:

image-20240512001004871

It's needed to protect data from double overwriting in case of duplicate events for some reason or if multiple subscribers are running (for example, application replicas).

The logic is such that during data update, insertion, or deletion, we rely on a combination of aggregate identifiers, event names, and domain data view names (essentially tables for insertion in the current example). And the transaction for modifying data in the view, besides updating the table itself, requires inserting data into the lock table. If the composite primary key from the three fields mentioned above already exists, the transaction will not be executed, and we won't unnecessarily rebuild the data. This is just one implementation option for protection, not the best one, but sufficient for a start.

In addition, this lock table will serve as a source of truth regarding whether all events have been applied to the view.

Also, the hash sum of aggregate keys can serve as the key to data revision, which we don't need right now but may come in handy in the future.

Now let's add the event handling logic itself.

In the file internal/domains/users/aggregator/users.go, place the following code:

Here, we're primarily concerned with the Run() function, which invokes some logic for building the view.

The essence lies in understanding which event type we received, executing some logic, and calling the Save method. In practice, it's recommended to distribute the switch case bodies across different functions, of course.

The main file left is internal/domains/users/users.go.

Essentially, it should implement the Domain interface.

Here it's all quite straightforward. Except for why we separated the event recording into a separate route? It's not necessary, but again, it's more correct in terms of further developing the idea because different routes might likely have different permissions, and having this level of abstraction might be more convenient than just having one endpoint for dumping all events. Again, the final implementation is up to you :-)

Migrations

We're getting closer to working directly with the database, so it's time to set up some environment. To do this, let's add a docker-compose.yaml file to the project root, where we'll simply spin up a default PostgreSQL container.

And now, knowing the credentials, let's create a .env file in the project root.

Let's start our database container:

Next, we need to initialize the migrations themselves so that all the necessary tables are created in the database:

And now let's add new migrations through:

This will add two files to the migrations folder, in the format YYYYMMDDHHmmss_events.(up|down).sql.

In the added up file, insert our structure for working with events.

The crux here is the line where we send the format string uuid,int,uuid to the events:created channel. And it's precisely this format that our helper in the internal/event_listener/helpers.go file parses.

Again, this can be extended and modified as needed.

For the down file, the content will be the opposite of the addition.

Don't forget about the table representing the user domain.

up:

down:

Now, let's add our migrations to the database:

 

Initializing the Application

And the cherry on top. Now we can fix the code in the cmd/run.go file.

Here, we're interested solely in the moment of forming the list of domains in the getAllDomain function. When expanding functionality, we simply need to add the initialization of another domain to the domains variable.

Now you can run the application:

Or build it and run the binary.

If you're using a JetBrains IDE, create a test.http file in the project root and you can play around with API request examples:

Explanation of Identifiers

In the above example, we let the client generate identifiers for the aggregate and for the event upon request. There's an explanation for this:

Conclusions

The idea is more than viable, but it has its nuances and peculiarities. Whether to use it or not should be a weighed and considered decision. Don't take it as something good or bad. It's just a variation on the theme, and not the best one in terms of code; it could be written better. But I've tried to convey the idea and show how it could be done.

Have we achieved our goals?

Okay, but we're definitely not done yet.

Next, let's develop this idea and example in terms of how to avoid spending time on writing specifications and use code generation for interacting with the frontend.