Event Sourcing для бедных или вам хватит одного PostgreSQL

Введение

Данные можно хранить в двух вариациях. Сразу в финальном их состоянии или иметь список событий, последовательно обработав которые вы получаете это состояние. Последнее называют Event Sourcing'ом.

Event Sourcing — это подход работы с событиями. Вы используете набор событий об изменениях и на их основаниях реконструируете состояние вашего объекта.

В современном мире в любом бизнес приложение рано или поздно придется найти ответ на вопрос - кто, что и когда поменял? Можно ли откатиться данные к предыдущему состоянию? или например как посмотреть срез данных актуальных на неделю назад? Поэтому начинать что-то новое и не задуматься об архитектуре на берегу, с моей точки зрения, - преступно.

Но есть обратная сторона медали - это скорость и опыт. Даже при наличии опыта и знаний, как строить такие системы правильно - я предпочту не начинать проект с инфраструктурной стройки. А иметь хороший брокер сообщений и быстрое хранилище данных - это то, что нужно, по хорошему, подготовить для работы с Event Sourcing'ом. Зачастую связка получается типа Kafka + RabbitMQ + Mongo/Elastic. Это отлично работает, но я не хочу поднимать у себя локально всех этих монстров, чтобы просто начать писать код. Да и не нужны они до тех пор пока ваши пользователи не создают сотни тысяч запросов в секунду и перед вами не стоят задачи масштабирования.

Поэтому вводные для поиска хорошего решения у меня были такие:

Так я пришел к бесконечно полной любви с PostgreSQL.

Не только лишь Event Sourcing

Но одной работы с событиями будет не достаточно, также стоит пересмотреть подход к работе с API как таковым. Можно много спорить, кидаться книжками, проектами, докладами о том, как правильно строить взаимодействие с приложением между клиентами. Но моё мнение - CQRS. Особенно, учитывая тот факт, что вместе с событийно ориентированной архитектурой он работает просто идеально.

The Command and Query Responsibility Segregation - принцип или парадигма CQRS разделяет назначение запросов и команд на обработку данных.

Проще говоря API делится на два типа запросов - мутационные и запрашивающие какие-то данные.

Вместе с событийно ориентированным подходом у нас получается буквально один мутационный запрос на вставку событий и какое-то кол-во необходимых и совершенно разных запросов на чтение данных.

Помимо этого тут стоит понимать, что представление данных и их чтение не обязано быть исполнено уже в классическом виде. А может и должно быть ближе к предметно ориентированному проектированию.

Предметно-ориентированное проектирование — набор принципов и схем, направленных на создание оптимальных систем объектов. Сводится к созданию программных абстракций, которые называются моделями предметных областей.

Конечно нет никакого смысла упираться рогом в матчасть от Эванса и тащить DDD во всех красках, но я буду заимствовать некоторые практики их этого подхода.

Также важно понимать, что в целом мы будем паразитировать около паттерна Event Streaming. Но лишь от части и не сильно погружаясь в детали. Суть в том, что совокупность связки Event Storing + Event Streaming + CQRS дает как раз тот результат, к которому я стремлюсь с точки зрения приложения.

Потоковая передача событий (Event Streaming) — это процесс непрерывного захвата и хранения событий по мере их возникновения в системе. Эти события затем можно обрабатывать и анализировать в режиме реального времени или сохранять для последующего анализа.

Верхнеуровневая идея реализации

Идея достаточно простая:

image-20240511232007413

Если на пальцах, то:

При этом подходе у нас нет никаких ограничений по организации структуры данных, хотим в json все сложим, хотим нормализуем полностью - это все зависит от задач.

Перейдем к реализации

Весь работающий пример ниже доступен в репозитории на GitHub.

Я буду реализовывать пример на Go и на том наборе библиотек, которые мне нравятся и кажутся удобными, но сама идея может быть реализована как вам будет угодно и не зависит от используемых мной решений.

Создаем новый проект и используя cobra от spf13 инициируем точку входа.

Лучше установить cobra-cli и выполнить:

Не будем сейчас останавливаться на тонком описании и настройки команд, продолжим накидывать структуру.

Конфигурация

Для начала неплохо было бы научить наше приложение читать какие-то параметры конфигурации из .env. Тут мне поможет viper от тех же spf13 и отдельный пакетик внутри приложения, который упростит мне жизнь.

Положим этот файлик в проекте по адресу internal/config/config.go.

в типе envConfigs объявим те данные, которые нам будут важны и будем расширять его по мере необходимости.

Благодаря тому, что мы читаем конфигурацию в init() первый же подключение нашего файла в проекте заставит его прочитать конфигурацию. Есть подводные камни в такой реализации, но сейчас опустим их.

База данных

Дальше нам понадобится сама работа с базой. Я очень люблю ORM от uptrace, которая называется bun. Более того, я фанат еще со времен go-pg от них же. Поэтому в качестве основы буду отталкиваться от этой библиотеки. Она ни к чему вас не обяжет, но жизнь сильно упростит во многих аспектах.

Добавим сразу еще uuid так как будем их использовать для типизации идентификаторов

 

Теперь пора создать небольшую абстракцию, для работы с базой.

Добавим файл и новый пакет по адресу internal/db/db.go

В перспективе этот файл может расширяться и обрастать дополнительными функциями, плюс так проще прокидывать зависимость по проекту будет.

Теперь сразу закроем вопрос с миграциями, ибо я не хочу тащить никакой дополнительный инструмент для этого, а в bun все есть.

Создайте новый файл cmd/migrations.go со следующим содержанием:

Теперь у вас есть ряд команд, которые помогут в работе с миграциями. Это не исчерпывающий перечень, но достаточный для работы этого примера. Для расширения перечня команд посмотрите примеры реализаций у bun.

Еще нужно создать саму папку с миграциями ./migrations и положить туда файл main.go

Подготовим все к работе с сообщениями

Для этого создадим новый файл и пакет internal/event_listener/event_listener.go

и положим рядом еще helpers.go

Этот файл поможет нам разбирать то, что будет отправлять нам функция из триггера далее.

Роутер

Теперь подготовим все к тому, что бы запустить веб сервер. Я буду использовать gin как один из популярных роутеров и наиболее мне удобный.

Создадим еще один пакет и файл internal/router/router.go

Опять таки, можете эту идею развить и добавить сюда все что понадобится и как вам будет угодно. CORS, дефолтные роуты и т.д.

Абстракции и интерфейсы

Отлично, мы почти закончили с подготовительными работами.

Давайте добавим немного базовых интерфейсов и моделей, которые нам пригодятся. Для этого создадим пакет и файл internal/aggregates/models.go и положим в него следующее:

Тут вы видите

Если пока что-то кажется странным, подождите, скоро все станет понятным.

Теперь давайте добавим базовый репозиторий для работы с запросами событий.

Создадим еще один пакет с файлом internal/repositories/event_repository.go

А также давайте добавим общий хендлер для событий в пакет internal/handlers/event_handlers.go

Доменные пакеты

В моей картинке мира, когда нибудь весь этот код может понадобится растащить на микросервисы, а значит будет логичнее если я сразу домены буду отделять друг от друга. Вдруг они будут крутиться в разных контейнерах. Это просто упростит рефакторинг в будущем. Шаг не обязательный и может быть упрощен, но я оставлю все так как есть.

Доменный пакет - это пакет содержащий в себе все модели, запросы к бд, хендлеры для роутов и справочную информацию по конкретной предметной области.

Представим, что у нас есть пока только одна сущность для примера - это пользователи. Поэтому будем оперировать ей.

Создадим пакет со следующей структурой:

В файле internal/domains/users/models/users.go объявим нашу модель представления данных:

В файл internal/domains/users/repository/users.go добавим запросы к базе даных:

Их всего 4:

И вот пришла пора пояснить за таблицу блокировок и почему наши запросы Save и Delete реализованы таким образом.

Связка выглядит так:

image-20240512001004871

А нужна для того, что бы обезопасить данные от двойной перезаписи в случае дублирования события по каким-то причинам или в случае запуска двух или более подписчиков (например копий приложения).

Логика такая, что во время обновления, вставки или удаления данных мы опираемся на связку в виде идентификаторов аггрегата, события и названия представления данных домена (таблицы для вставки по сути в текущем примере). И транзакция изменения данных в представлении, помимо обновления самой таблицы, требует вставки данных в таблицу блокировки. Если же составной основной ключ из трех полей выше уже существует, то транзакция не выполнится и мы не будем лишний раз перестраивать данные. Это лишь один из вариантов реализации защиты, не самый лучший, но достаточный для начала.

Попутно эта таблица блокировок будет являться некоторым источником истины на предмет, - а все ли события были применены к представлению?

А также хеш сумма ключей по аггрегату может являть ключом ревизии данных, нам это сейчас не нужно, но в будущем может пригодиться :-)

Теперь же давайте добавим саму логику обработки событий.

В файле internal/domains/users/aggregator/users.go разместим следущий код:

Тут нас больше всего волнует сама функция Run() , которая вызывает некоторую логику сборки представления.

Суть заключается в понимании того, что за тип события мы получили выполнения какой-то логики и вызывание метода Save. В реальность рекомендуется разносить тела case в switch по разным функциям конечно же.

Остался основной файл internal/domains/users/users.go

Который по сути должен реализовать интерфейс Domain

Тут все предельно просто. Не считая того, зачем мы отделили запись событий в отдельный роут? Это не обязательно, но опять таки более правильно с точки зрения развития идеи дальше, т.к. скорее всего разные роуты могут иметь разные права и вероятно иметь этот уровнь абстрации будет удобнее, нежели же просто одну точку для вброса всех событий. Опять таки конечная реализация все равно на ваших плечах :-)

Миграции

Мы подходим к непосредственной работе с базой, а значит пора обзавестись каким-либо окружением. Для этого добавим docker-compose.yaml в корень проекта в котором просто поднимем дефолтный контейнер с postgres.

И теперь, зная явки и пароли, создадим в корне проекта файл .env

Запустим наш контейнер с базой данных:

Далее нужно инициализировать миграции как таковые, что бы создались все необходимые таблицы в базе:

Ну и переходим к добавим новых миграций через:

В папке migrations добавится два файлы, формата YYYYMMDDHHmmss_events.(up|down).sql

В добавленный файл up вставим нашу структуру для работы с событиям

Самая мякотка тут это строчка

В которой мы отправляем в канал events:created строку формата uuid,int,uuid. И как раз этот формат разбирается нашим хелпером в файле internal/event_listener/helpers.go.

Опять таки это можно расширять и модицифировать как угодно.

Для down файла содержание будет обратным добавлению

Надо не забыть про таблицу с представлением домена пользователей.

up:

down:

Ну и добавляем наши миграции к базе:

 

Инициализация приложения

Ну и вишенка на торте. Теперь мы можем исправить код файла cmd/run.go

Тут нам интересен исключительно момент формирования списка доменов в функции getAllDomain. При расширении функционала мы просто напросто должны добавить в переменную domains инициализацию другого домена.

Теперь можете запустить приложение:

Ну или собрать его и запускать бинарь.

Если вы работаете в JetBrains'овских IDE. То создайте файл test.http в корне проекта и можете поиграться с примерами API запросов:

Пояснения за идентификаторы

В примере выше мы отдаем клиенту на откуп генерацию идентификаторов для аггрегата и для события при запросе. Тому есть такое объяснение:

Выводы

Идея более чем жизнеспособна, но имеет свои нюансы и особенности. Использовать её или нет нужно принимать решение взвешенно и обдуманно. Не стоит воспринимать это как что-то хорошее или плохое. Это просто вариация на тему и при том не самая лучшая с точки зрения кода, можно написать лучше. Но я старался донести лишь саму идею и показать как это может быть.

Получилось ли достичь целей?

Хорошо, но это явно не финишная прямая.

Далее разовьем эту идею и пример с точки зрения того, как не тратиться на написание спецификаций и использовать кодогенерацию для взаимодействия с фронтендом.