Данные можно хранить в двух вариациях. Сразу в финальном их состоянии или иметь список событий, последовательно обработав которые вы получаете это состояние. Последнее называют Event Sourcing'ом.
Event Sourcing — это подход работы с событиями. Вы используете набор событий об изменениях и на их основаниях реконструируете состояние вашего объекта.
В современном мире в любом бизнес приложение рано или поздно придется найти ответ на вопрос - кто, что и когда поменял? Можно ли откатиться данные к предыдущему состоянию? или например как посмотреть срез данных актуальных на неделю назад? Поэтому начинать что-то новое и не задуматься об архитектуре на берегу, с моей точки зрения, - преступно.
Но есть обратная сторона медали - это скорость и опыт. Даже при наличии опыта и знаний, как строить такие системы правильно - я предпочту не начинать проект с инфраструктурной стройки. А иметь хороший брокер сообщений и быстрое хранилище данных - это то, что нужно, по хорошему, подготовить для работы с Event Sourcing'ом. Зачастую связка получается типа Kafka + RabbitMQ + Mongo/Elastic. Это отлично работает, но я не хочу поднимать у себя локально всех этих монстров, чтобы просто начать писать код. Да и не нужны они до тех пор пока ваши пользователи не создают сотни тысяч запросов в секунду и перед вами не стоят задачи масштабирования.
Поэтому вводные для поиска хорошего решения у меня были такие:
Хочу иметь минимум инфраструктуры
Хочу иметь максимально взрослую архитектуру приложения сразу
Это должно быть быстро разрабатываемо
Это должно быть понятно и просто, т.е. перечень технологий должен быть минимален
Так я пришел к бесконечно полной любви с PostgreSQL.
Но одной работы с событиями будет не достаточно, также стоит пересмотреть подход к работе с API как таковым. Можно много спорить, кидаться книжками, проектами, докладами о том, как правильно строить взаимодействие с приложением между клиентами. Но моё мнение - CQRS. Особенно, учитывая тот факт, что вместе с событийно ориентированной архитектурой он работает просто идеально.
The Command and Query Responsibility Segregation - принцип или парадигма CQRS разделяет назначение запросов и команд на обработку данных.
Проще говоря API делится на два типа запросов - мутационные и запрашивающие какие-то данные.
Вместе с событийно ориентированным подходом у нас получается буквально один мутационный запрос на вставку событий и какое-то кол-во необходимых и совершенно разных запросов на чтение данных.
Помимо этого тут стоит понимать, что представление данных и их чтение не обязано быть исполнено уже в классическом виде. А может и должно быть ближе к предметно ориентированному проектированию.
Предметно-ориентированное проектирование — набор принципов и схем, направленных на создание оптимальных систем объектов. Сводится к созданию программных абстракций, которые называются моделями предметных областей.
Конечно нет никакого смысла упираться рогом в матчасть от Эванса и тащить DDD во всех красках, но я буду заимствовать некоторые практики их этого подхода.
Также важно понимать, что в целом мы будем паразитировать около паттерна Event Streaming. Но лишь от части и не сильно погружаясь в детали. Суть в том, что совокупность связки Event Storing + Event Streaming + CQRS дает как раз тот результат, к которому я стремлюсь с точки зрения приложения.
Потоковая передача событий (Event Streaming) — это процесс непрерывного захвата и хранения событий по мере их возникновения в системе. Эти события затем можно обрабатывать и анализировать в режиме реального времени или сохранять для последующего анализа.
Идея достаточно простая:

Если на пальцах, то:
У нас есть таблица событий.
На таблице событий висит триггер выполняющий функцию отправки сообщения.
На стороне приложения есть слушатель топика сообщений из таблицы событий. Да, в PG есть функционал notify/listen.
На основе полученного сообщения некоторая логика обработки события запрашивает из базы данных по идентификатору события его payload и выполняет обновление представления или нескольких представлений, если это того требует.
Также на стороне приложения все методы чтения данных работают только с таблицами представления данных.
При этом подходе у нас нет никаких ограничений по организации структуры данных, хотим в json все сложим, хотим нормализуем полностью - это все зависит от задач.
Весь работающий пример ниже доступен в репозитории на GitHub.
Я буду реализовывать пример на Go и на том наборе библиотек, которые мне нравятся и кажутся удобными, но сама идея может быть реализована как вам будет угодно и не зависит от используемых мной решений.
Создаем новый проект и используя cobra от spf13 инициируем точку входа.
31mkdir example-event-sourcing && cd example-event-sourcing2go mod init example-event-sourcing3go get github.com/spf13/cobra
Лучше установить cobra-cli и выполнить:
41go install github.com/spf13/cobra-cli@latest23cobra-cli init4cobra-cli add run
Не будем сейчас останавливаться на тонком описании и настройки команд, продолжим накидывать структуру.
Для начала неплохо было бы научить наше приложение читать какие-то параметры конфигурации из .env. Тут мне поможет viper от тех же spf13 и отдельный пакетик внутри приложения, который упростит мне жизнь.
11go get github.com/spf13/viper
Положим этот файлик в проекте по адресу internal/config/config.go.
471package config2
3import (4 "github.com/spf13/viper"5 "log"6)7
8// EnvConfigs Initialize this variable to access the env values9var EnvConfigs *envConfigs10
11// InitEnvConfigs We will call this in main.go to load the env variables12func InitEnvConfigs() {13 EnvConfigs = loadEnvVariables()14}15
16// struct to map env values17type envConfigs struct {18 AppServerPort string `mapstructure:"APP_SERVER_PORT"`19 DbDsn string `mapstructure:"DB_DSN"`20}21
22// Call to load the variables from env23func loadEnvVariables() (config *envConfigs) {24 // Tell viper the path/location of your env file. If it is root just add "."25 viper.AddConfigPath(".")26
27 // Tell viper the name of your file28 viper.SetConfigName(".env")29
30 // Tell viper the type of your file31 viper.SetConfigType("env")32
33 // Viper reads all the variables from env file and log error if any found34 if err := viper.ReadInConfig(); err != nil {35 log.Fatal("Error reading env file", err)36 }37
38 // Viper unmarshal the loaded env variables into the struct39 if err := viper.Unmarshal(&config); err != nil {40 log.Fatal(err)41 }42 return43}44
45func init() {46 InitEnvConfigs()47}в типе envConfigs объявим те данные, которые нам будут важны и будем расширять его по мере необходимости.
Благодаря тому, что мы читаем конфигурацию в init() первый же подключение нашего файла в проекте заставит его прочитать конфигурацию. Есть подводные камни в такой реализации, но сейчас опустим их.
Дальше нам понадобится сама работа с базой. Я очень люблю ORM от uptrace, которая называется bun. Более того, я фанат еще со времен go-pg от них же. Поэтому в качестве основы буду отталкиваться от этой библиотеки. Она ни к чему вас не обяжет, но жизнь сильно упростит во многих аспектах.
31go get github.com/uptrace/bun2go get github.com/uptrace/bun/dialect/pgdialect3go get github.com/uptrace/bun/driver/pgdriver
Добавим сразу еще uuid так как будем их использовать для типизации идентификаторов
11go get github.com/google/uuid
Теперь пора создать небольшую абстракцию, для работы с базой.
Добавим файл и новый пакет по адресу internal/db/db.go
151package db2
3import (4 "database/sql"5 "github.com/uptrace/bun"6 "github.com/uptrace/bun/dialect/pgdialect"7 "github.com/uptrace/bun/driver/pgdriver"8 "example-event-sourcing/internal/config"9)10
11func Connect() *bun.DB {12 sqlDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(config.EnvConfigs.DbDsn)))13 db := bun.NewDB(sqlDB, pgdialect.New())14 return db15}В перспективе этот файл может расширяться и обрастать дополнительными функциями, плюс так проще прокидывать зависимость по проекту будет.
Теперь сразу закроем вопрос с миграциями, ибо я не хочу тащить никакой дополнительный инструмент для этого, а в bun все есть.
Создайте новый файл cmd/migrations.go со следующим содержанием:
1571package cmd2
3import (4 "context"5 "example-event-sourcing/internal/db"6 "example-event-sourcing/migrations"7 "fmt"8 "github.com/spf13/cobra"9 "github.com/uptrace/bun"10 "github.com/uptrace/bun/migrate"11 "log"12)13
14const MigrationsGroup = "migrations"15
16var migrationNameSql *string17
18func getMigrator(db *bun.DB) *migrate.Migrator {19 return migrate.NewMigrator(db, migrations.Migrations)20}21
22var migrationGroup = &cobra.Group{23 ID: MigrationsGroup,24 Title: "Migrations",25}26
27var migrateInit = &cobra.Command{28 Use: "migrate/init",29 Short: "create migration tables",30 GroupID: MigrationsGroup,31 Run: func(cmd *cobra.Command, args []string) {32 database := db.Connect()33 defer func(database *bun.DB) {34 err := database.Close()35 if err != nil {36 panic(err)37 }38 }(database)39 migrator := getMigrator(database)40 c := context.Background()41 err := migrator.Init(c)42 if err != nil {43 log.Fatalln(err.Error())44 }45 },46}47
48var migrateUp = &cobra.Command{49 Use: "migrate/up",50 Short: "migrate database",51 GroupID: MigrationsGroup,52 Run: func(cmd *cobra.Command, args []string) {53 database := db.Connect()54 defer func(database *bun.DB) {55 err := database.Close()56 if err != nil {57 panic(err)58 }59 }(database)60 migrator := getMigrator(database)61 c := context.Background()62
63 if err := migrator.Lock(c); err != nil {64 log.Fatalln(err.Error())65 }66 defer func(migrator *migrate.Migrator, ctx context.Context) {67 err := migrator.Unlock(ctx)68 if err != nil {69 log.Fatalln(err.Error())70 }71 }(migrator, c)72
73 group, err := migrator.Migrate(c)74 if err != nil {75 log.Fatalln(err.Error())76 }77 if group.IsZero() {78 fmt.Printf("there are no new migrations to run (database is up to date)\n")79 }80 fmt.Printf("migrated to %s\n", group)81 },82}83
84var migrateDown = &cobra.Command{85 Use: "migrate/down",86 Short: "rollback the last migration group",87 GroupID: MigrationsGroup,88 Run: func(cmd *cobra.Command, args []string) {89 database := db.Connect()90 defer func(database *bun.DB) {91 err := database.Close()92 if err != nil {93 panic(err)94 }95 }(database)96 migrator := getMigrator(database)97 c := context.Background()98
99 if err := migrator.Lock(c); err != nil {100 log.Fatalln(err.Error())101 }102 defer func(migrator *migrate.Migrator, ctx context.Context) {103 err := migrator.Unlock(ctx)104 if err != nil {105 log.Fatalln(err.Error())106 }107 }(migrator, c)108
109 group, err := migrator.Rollback(c)110 if err != nil {111 log.Fatalln(err.Error())112 }113 if group.IsZero() {114 fmt.Printf("there are no groups to roll back\n")115 }116 fmt.Printf("rolled back %s\n", group)117 },118}119
120var migrateCreateSql = &cobra.Command{121 Use: "migrate/create_sql",122 Short: "create up and down SQL migrations",123 GroupID: MigrationsGroup,124 Run: func(cmd *cobra.Command, args []string) {125 database := db.Connect()126 defer func(database *bun.DB) {127 err := database.Close()128 if err != nil {129 panic(err)130 }131 }(database)132 migrator := getMigrator(database)133 c := context.Background()134 if migrationNameSql == nil {135 log.Fatalln("name not specified")136 }137 files, err := migrator.CreateSQLMigrations(c, *migrationNameSql)138 if err != nil {139 log.Fatalln(err.Error())140 }141
142 for _, mf := range files {143 fmt.Printf("created migration %s (%s)\n", mf.Name, mf.Path)144 }145
146 },147}148
149func init() {150 rootCmd.AddGroup(migrationGroup)151 rootCmd.AddCommand(migrateInit)152 rootCmd.AddCommand(migrateUp)153 rootCmd.AddCommand(migrateDown)154 rootCmd.AddCommand(migrateCreateSql)155
156 migrationNameSql = migrateCreateSql.Flags().StringP("name", "n", "", "migration name")157}Теперь у вас есть ряд команд, которые помогут в работе с миграциями. Это не исчерпывающий перечень, но достаточный для работы этого примера. Для расширения перечня команд посмотрите примеры реализаций у bun.
Еще нужно создать саму папку с миграциями ./migrations и положить туда файл main.go
111package migrations2
3import "github.com/uptrace/bun/migrate"4
5var Migrations = migrate.NewMigrations()6
7func init() {8 if err := Migrations.DiscoverCaller(); err != nil {9 panic(err)10 }11}Для этого создадим новый файл и пакет internal/event_listener/event_listener.go
541package event_listener2
3import (4 "context"5 "github.com/uptrace/bun"6 "github.com/uptrace/bun/driver/pgdriver"7 "log"8 "strings"9)10
11const (12 EventsCreated = "events:created"13)14
15type EventListener struct {16 db *bun.DB17 listener *pgdriver.Listener18 retryCounter map[string]int19}20
21func New(db *bun.DB) *EventListener {22 return &EventListener{db: db, listener: pgdriver.NewListener(db), retryCounter: make(map[string]int)}23}24
25func (el *EventListener) Listen(ctx context.Context, channel string, callback func(string) error) {26 if err := el.listener.Listen(ctx, channel); err != nil {27 panic(err)28 }29
30 for notification := range el.listener.Channel() {31 err := callback(notification.Payload)32 if err != nil && !strings.Contains(err.Error(), `duplicate key value violates unique constraint "locker_pk"`) {33 log.Println(err)34 if _, ok := el.retryCounter[notification.Payload]; !ok {35 el.retryCounter[notification.Payload] = 036 }37 if el.retryCounter[notification.Payload] < 3 {38 el.retryCounter[notification.Payload]++39 el.Notify(ctx, channel, notification.Payload)40 }41 } else {42 // fixme unsafe, but will see how it goes43 go func() {44 delete(el.retryCounter, notification.Payload)45 }()46 }47 }48}49
50func (el *EventListener) Notify(ctx context.Context, channel string, payload string) {51 if err := pgdriver.Notify(ctx, el.db, channel, payload); err != nil {52 panic(err)53 }54}и положим рядом еще helpers.go
181package event_listener2
3import (4 "strconv"5 "strings"6)7
8func ExtractIdAndTypeFromPayload(payload string) (string, int, string) {9 data := strings.Split(payload, ",")10 if len(data) == 3 {11 eventTypeID, err := strconv.Atoi(data[1])12 if err != nil {13 return "", 0, ""14 }15 return data[0], eventTypeID, data[2]16 }17 return "", 0, ""18}Этот файл поможет нам разбирать то, что будет отправлять нам функция из триггера далее.
Теперь подготовим все к тому, что бы запустить веб сервер. Я буду использовать gin как один из популярных роутеров и наиболее мне удобный.
11go get github.com/gin-gonic/gin
Создадим еще один пакет и файл internal/router/router.go
301package router2
3import (4 "github.com/gin-gonic/gin"5 "example-event-sourcing/internal/config"6)7
8type Router struct {9 router *gin.Engine10}11
12func New() *Router {13 return &Router{router: gin.Default()}14}15
16func (r *Router) AddMutationRoutes(routes map[string]gin.HandlerFunc) {17 for path, handler := range routes {18 r.router.POST(path, handler)19 }20}21
22func (r *Router) AddQueryRoutes(routes map[string]gin.HandlerFunc) {23 for path, handler := range routes {24 r.router.GET(path, handler)25 }26}27
28func (r *Router) Run() {29 r.router.Run(config.EnvConfigs.AppServerPort)30}Опять таки, можете эту идею развить и добавить сюда все что понадобится и как вам будет угодно. CORS, дефолтные роуты и т.д.
Отлично, мы почти закончили с подготовительными работами.
Давайте добавим немного базовых интерфейсов и моделей, которые нам пригодятся. Для этого создадим пакет и файл internal/aggregates/models.go и положим в него следующее:
421package aggregates2
3import (4 "context"5 "encoding/json"6 "github.com/gin-gonic/gin"7 "github.com/google/uuid"8 "github.com/uptrace/bun"9 "time"10)11
12type Domain interface {13 Run()14 QueryRoutes() map[string]gin.HandlerFunc15 MutationRoutes() map[string]gin.HandlerFunc16}17
18type Aggregate interface {19 GetID() string20}21
22type Event struct {23 bun.BaseModel `bun:"table:events,alias:e"`24 ID string `bun:"id" json:"id"`25 TypeID int `bun:"type_id" json:"type_id"`26 AggregateID string `bun:"aggregate_id" json:"aggregate_id"`27 Payload json.RawMessage `bun:"payload" json:"payload"`28 CreatedAt time.Time `bun:"created_at" json:"created_at"`29}30
31type Locker struct {32 bun.BaseModel `bun:"table:locker,alias:loc"`33 EventID uuid.UUID `bun:"event_id,type:uuid" json:"event_id"`34 AggregateID uuid.UUID `bun:"aggregate_id,type:uuid" json:"aggregate_id"`35 LockDomain string `bun:"lock_domain" json:"lock_domain"`36}37
38type AggregateEvent int39
40type EventDelete struct {41 ID string `json:"id"`42}Тут вы видите
интерфейс для доменного пакета, который должен будет реализовывать несколько функций, о них будет ниже.
интерфейс для агрегата.
базовую модель для события.
структурка для таблицы блокировки, о ней расскажу тоже чуть позже.
и заготовку для типов событий.
а также базовый тип события для удаления.
Если пока что-то кажется странным, подождите, скоро все станет понятным.
Теперь давайте добавим базовый репозиторий для работы с запросами событий.
Создадим еще один пакет с файлом internal/repositories/event_repository.go
261package repositories2
3import (4 "context"5 "github.com/uptrace/bun"6 "example-event-sourcing/internal/aggregates"7)8
9type EventRepository struct {10 db *bun.DB11}12
13func NewEventRepository(db *bun.DB) *EventRepository {14 return &EventRepository{db: db}15}16
17func (r *EventRepository) GetEventByIdAndType(ctx context.Context, id string, eventType int) (*aggregates.Event, error) {18 events := &aggregates.Event{}19 err := r.db.NewSelect().Model(events).Where("id = ? and type_id = ?", id, eventType).Scan(ctx)20 return events, err21}22
23func (r *EventRepository) SaveEvent(ctx context.Context, event *aggregates.Event) (*aggregates.Event, error) {24 _, err := r.db.NewInsert().Model(event).Exec(ctx)25 return event, err26}А также давайте добавим общий хендлер для событий в пакет internal/handlers/event_handlers.go
321package handlers2
3import (4 "github.com/gin-gonic/gin"5 "example-event-sourcing/internal/aggregates"6 "example-event-sourcing/internal/repositories"7)8
9type EventHandlers struct {10 eventRepository *repositories.EventRepository11}12
13func NewEventHandlers(eventRepository *repositories.EventRepository) *EventHandlers {14 return &EventHandlers{15 eventRepository: eventRepository,16 }17}18
19func (e *EventHandlers) SaveEvent(c *gin.Context) {20 event := &aggregates.Event{}21 err := c.BindJSON(event)22 if err != nil {23 c.JSON(500, gin.H{24 "error": err.Error(),25 })26 return27 }28 _, err = e.eventRepository.SaveEvent(c, event)29 c.JSON(200, gin.H{30 "result": "Event saved",31 })32}В моей картинке мира, когда нибудь весь этот код может понадобится растащить на микросервисы, а значит будет логичнее если я сразу домены буду отделять друг от друга. Вдруг они будут крутиться в разных контейнерах. Это просто упростит рефакторинг в будущем. Шаг не обязательный и может быть упрощен, но я оставлю все так как есть.
Доменный пакет - это пакет содержащий в себе все модели, запросы к бд, хендлеры для роутов и справочную информацию по конкретной предметной области.
Представим, что у нас есть пока только одна сущность для примера - это пользователи. Поэтому будем оперировать ей.
Создадим пакет со следующей структурой:
41internal/domains/users/aggregator/users.go2internal/domains/users/models/users.go3internal/domains/users/repository/users.go4internal/domains/users/users.go
В файле internal/domains/users/models/users.go объявим нашу модель представления данных:
201package models2
3import (4 "github.com/google/uuid"5 "github.com/uptrace/bun"6)7
8type User struct {9 bun.BaseModel `bun:"table:users,alias:u"`10
11 ID uuid.UUID `bun:"id,type:uuid" json:"id"`12 Username string `bun:"username" json:"username"`13 Password string `bun:"password" json:"-"`14 Email string `bun:"email" json:"email"`15
16}17
18func (u *User) GetID() string {19 return u.ID.String()20}В файл internal/domains/users/repository/users.go добавим запросы к базе даных:
991package repository2
3import (4 "context"5 "github.com/google/uuid"6 "github.com/uptrace/bun"7 "example-event-sourcing/internal/aggregates"8 "example-event-sourcing/internal/domains/users/models"9)10
11type Repository struct {12 db *bun.DB13}14
15func New(db *bun.DB) *Repository {16 return &Repository{db: db}17}18
19func (r *Repository) Find(ctx context.Context, id string) (aggregates.Aggregate, error) {20 user := &models.User{}21 err := r.db.NewSelect().Model(user).Where("id = ?", id).Scan(ctx)22 if err != nil {23 return nil, err24 }25 return user, nil26}27
28func (r *Repository) Save(ctx context.Context, aggregate aggregates.Aggregate, eventID string) error {29 user := aggregate.(*models.User)30 tx, err := r.db.BeginTx(ctx, nil)31 if err != nil {32 tx.Rollback()33 return err34 }35 _, err = r.Find(ctx, user.GetID())36 if err != nil {37 _, err := tx.NewInsert().Model(user).Exec(ctx)38 if err != nil {39 tx.Rollback()40 return err41 }42 }43 _, err = tx.NewUpdate().Model(user).Where("id = ?", user.ID).Exec(ctx)44 if err != nil {45 tx.Rollback()46 return err47 }48
49 _, err = tx.NewInsert().Model(&aggregates.Locker{50 EventID: uuid.MustParse(eventID),51 AggregateID: uuid.MustParse(aggregate.GetID()),52 LockDomain: "users",53 }).Exec(ctx)54
55 if err != nil {56 tx.Rollback()57 return err58 }59
60 return tx.Commit()61}62
63func (r *Repository) Delete(ctx context.Context, id, eventID string) error {64 tx, err := r.db.BeginTx(ctx, nil)65 if err != nil {66 tx.Rollback()67 return err68 }69 _, err = tx.NewDelete().Model(&models.User{}).Where("id = ?", id).Exec(ctx)70 if err != nil {71 tx.Rollback()72 return err73 }74 _, err = tx.NewInsert().Model(&aggregates.Locker{75 EventID: uuid.MustParse(eventID),76 AggregateID: uuid.MustParse(id),77 LockDomain: "users",78 }).Exec(ctx)79
80 if err != nil {81 tx.Rollback()82 return err83 }84 return tx.Commit()85}86
87func (r *Repository) All(ctx context.Context) ([]aggregates.Aggregate, error) {88 data := make([]aggregates.Aggregate, 0)89 users := make([]*models.User, 0)90 err := r.db.NewSelect().Model(&users).Scan(ctx)91 if err != nil {92 return nil, err93 }94 for _, user := range users {95 data = append(data, user)96 }97
98 return data, nil99}Их всего 4:
Получить запись по идентифкатору
Получить все записи
Сохранить данные
Удалить данные
И вот пришла пора пояснить за таблицу блокировок и почему наши запросы Save и Delete реализованы таким образом.
Связка выглядит так:

А нужна для того, что бы обезопасить данные от двойной перезаписи в случае дублирования события по каким-то причинам или в случае запуска двух или более подписчиков (например копий приложения).
Логика такая, что во время обновления, вставки или удаления данных мы опираемся на связку в виде идентификаторов аггрегата, события и названия представления данных домена (таблицы для вставки по сути в текущем примере). И транзакция изменения данных в представлении, помимо обновления самой таблицы, требует вставки данных в таблицу блокировки. Если же составной основной ключ из трех полей выше уже существует, то транзакция не выполнится и мы не будем лишний раз перестраивать данные. Это лишь один из вариантов реализации защиты, не самый лучший, но достаточный для начала.
Попутно эта таблица блокировок будет являться некоторым источником истины на предмет, - а все ли события были применены к представлению?
А также хеш сумма ключей по аггрегату может являть ключом ревизии данных, нам это сейчас не нужно, но в будущем может пригодиться :-)
Теперь же давайте добавим саму логику обработки событий.
В файле internal/domains/users/aggregator/users.go разместим следущий код:
1541package aggregator2
3import (4 "context"5 "encoding/json"6 "github.com/uptrace/bun"7 "example-event-sourcing/internal/aggregates"8 "example-event-sourcing/internal/domains/users/models"9 "example-event-sourcing/internal/domains/users/repository"10 "example-event-sourcing/internal/event_listener"11 "example-event-sourcing/internal/repositories"12)13
14const Factor = 100015
16const (17 UsersCreated aggregates.AggregateEvent = iota + Factor18 UsersUpdatePassword19 UsersUpdateUsername20 UsersUpdateEmail21 UsersDeleted22
23 UsersUnusedEvent24)25
26type EventUpdatePassword struct {27 Password string `json:"password"`28}29
30type EventUpdateUsername struct {31 Username string `json:"username"`32}33
34type EventUpdateEmail struct {35 Email string `json:"email"`36}37
38func ConvertFromInt(in int) aggregates.AggregateEvent {39 return aggregates.AggregateEvent(in)40}41
42func IsEventTypeValid(eventType aggregates.AggregateEvent) bool {43 return eventType >= UsersCreated && eventType < UsersUnusedEvent44}45
46type Aggregator struct {47 db *bun.DB48 eventListener *event_listener.EventListener49 repository *repository.Repository50 eventRepository *repositories.EventRepository51}52
53func New(54 db *bun.DB,55 eventListener *event_listener.EventListener,56 repository *repository.Repository,57 eventRepository *repositories.EventRepository,58) *Aggregator {59 return &Aggregator{60 db: db,61 eventListener: eventListener,62 repository: repository,63 eventRepository: eventRepository,64 }65}66
67func (a *Aggregator) Run() {68 ctx := context.Background()69 a.eventListener.Listen(ctx, event_listener.EventsCreated, a.buildUsers)70}71
72func (a *Aggregator) GetUserByID(ctx context.Context, id string) (*models.User, error) {73 userAggregate, err := a.repository.Find(ctx, id)74 if err != nil {75 return nil, err76 }77 user := userAggregate.(*models.User)78 return user, nil79}80
81func (a *Aggregator) ExtractUserAndUpdatedModel(ctx context.Context, eventID string, eventType int, aggregateID string, eventUpdate interface{}) (*models.User, interface{}, error) {82 event, err := a.eventRepository.GetEventByIdAndType(ctx, eventID, eventType)83 if err != nil {84 return nil, nil, err85 }86
87 user, err := a.GetUserByID(ctx, aggregateID)88 if err != nil {89 return nil, nil, err90 }91
92 payloadJson := event.Payload93 err = json.Unmarshal(payloadJson, eventUpdate)94 if err != nil {95 return nil, nil, err96 }97
98 return user, eventUpdate, nil99}100
101func (a *Aggregator) buildUsers(payload string) error {102 ctx := context.Background()103 id, eventType, aggregateID := event_listener.ExtractIdAndTypeFromPayload(payload)104 convertedType := ConvertFromInt(eventType)105 if !IsEventTypeValid(convertedType) {106 return nil107 }108 switch convertedType {109 case UsersCreated:110 event, err := a.eventRepository.GetEventByIdAndType(ctx, id, eventType)111 if err != nil {112 return err113 }114 user := &models.User{}115 payloadJson := event.Payload116 err = json.Unmarshal(payloadJson, user)117 if err != nil {118 return err119 }120 return a.repository.Save(ctx, user, id)121
122 case UsersUpdatePassword:123 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdatePassword{})124 if err != nil {125 return err126 }127 user.Password = userUpdate.(*EventUpdatePassword).Password128 return a.repository.Save(ctx, user, id)129 case UsersUpdateEmail:130 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdateEmail{})131 if err != nil {132 return err133 }134 user.Email = userUpdate.(*EventUpdateEmail).Email135 return a.repository.Save(ctx, user, id)136 case UsersUpdateUsername:137 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdateUsername{})138 if err != nil {139 return err140 }141 user.Username = userUpdate.(*EventUpdateUsername).Username142 return a.repository.Save(ctx, user, id)143 case UsersDeleted:144 user, _, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &aggregates.EventDelete{})145 if err != nil {146 return err147 }148 return a.repository.Delete(ctx, user.ID.String(), id)149 default:150 panic("unhandled default case")151 }152
153 return nil154}Тут нас больше всего волнует сама функция Run() , которая вызывает некоторую логику сборки представления.
Суть заключается в понимании того, что за тип события мы получили выполнения какой-то логики и вызывание метода Save. В реальность рекомендуется разносить тела case в switch по разным функциям конечно же.
Остался основной файл internal/domains/users/users.go
Который по сути должен реализовать интерфейс Domain
681package users2
3import (4 "github.com/gin-gonic/gin"5 "github.com/uptrace/bun"6 "example-event-sourcing/internal/aggregates"7 "example-event-sourcing/internal/domains/users/aggregator"8 "example-event-sourcing/internal/domains/users/models"9 "example-event-sourcing/internal/domains/users/repository"10 "example-event-sourcing/internal/event_listener"11 "example-event-sourcing/internal/handlers"12 "example-event-sourcing/internal/repositories"13)14
15type UserDomain struct {16 db *bun.DB17 eventListener *event_listener.EventListener18 aggregator *aggregator.Aggregator19 repository *repository.Repository20 eventRepository *repositories.EventRepository21 eventHandlers *handlers.EventHandlers22}23
24func New(25 db *bun.DB,26 eventListener *event_listener.EventListener,27 eventRepository *repositories.EventRepository,28 eventHandlers *handlers.EventHandlers,29) *UserDomain {30 repo := repository.New(db)31 agg := aggregator.New(db, eventListener, repo, eventRepository)32
33 return &UserDomain{34 db: db,35 eventListener: eventListener,36 repository: repo,37 aggregator: agg,38 eventRepository: eventRepository,39 eventHandlers: eventHandlers,40 }41}42
43func (u *UserDomain) Run() {44 u.aggregator.Run()45}46
47func (u *UserDomain) QueryRoutes() map[string]gin.HandlerFunc {48 routes := make(map[string]gin.HandlerFunc)49 routes["/users"] = func(c *gin.Context) {50 data, err := u.repository.All(c)51 if err != nil {52 c.JSON(500, gin.H{53 "error": err.Error(),54 })55 return56 }57 c.JSON(200, gin.H{58 "result": data,59 })60 }61 return routes62}63
64func (u *UserDomain) MutationRoutes() map[string]gin.HandlerFunc {65 routes := make(map[string]gin.HandlerFunc)66 routes["/users"] = u.eventHandlers.SaveEvent67 return routes68}Тут все предельно просто. Не считая того, зачем мы отделили запись событий в отдельный роут? Это не обязательно, но опять таки более правильно с точки зрения развития идеи дальше, т.к. скорее всего разные роуты могут иметь разные права и вероятно иметь этот уровнь абстрации будет удобнее, нежели же просто одну точку для вброса всех событий. Опять таки конечная реализация все равно на ваших плечах :-)
Мы подходим к непосредственной работе с базой, а значит пора обзавестись каким-либо окружением. Для этого добавим docker-compose.yaml в корень проекта в котором просто поднимем дефолтный контейнер с postgres.
171version'3.1'2
3volumes4 example-event-sourcing_pg_db5
6services7 example-event-sourcing_db8 imagepostgres9 restartalways10 environment11POSTGRES_PASSWORD=postgres12POSTGRES_USER=postgres13POSTGRES_DB=example14 volumes15example-event-sourcing_pg_db:/var/lib/postgresql/data16 ports17$POSTGRES_PORT:-5434:5432И теперь, зная явки и пароли, создадим в корне проекта файл .env
21DB_DSN=postgres://postgres:postgres@localhost:5434/example?sslmode=disable2APP_SERVER_PORT=":7755"
Запустим наш контейнер с базой данных:
11docker-compose up -d
Далее нужно инициализировать миграции как таковые, что бы создались все необходимые таблицы в базе:
11go run main.go migrate/init
Ну и переходим к добавим новых миграций через:
11go run main.go migrate/create_sql -n events
В папке migrations добавится два файлы, формата YYYYMMDDHHmmss_events.(up|down).sql
В добавленный файл up вставим нашу структуру для работы с событиям
321CREATE TABLE IF NOT EXISTS events2(3 id uuid not null,4 type_id smallserial not null,5 aggregate_id uuid not null,6 payload json not null,7 created_at timestamp default CURRENT_TIMESTAMP not null,8 constraint events_pk9 primary key (id, aggregate_id, type_id)10);11
12CREATE FUNCTION events_after_insert_trigger()13 RETURNS TRIGGER AS $$14BEGIN15 PERFORM pg_notify('events:created', CONCAT(NEW.id::text, ',', NEW.type_id::text, ',', NEW.aggregate_id::text));16 RETURN NULL;17END;18$$19 LANGUAGE plpgsql;20
21CREATE TRIGGER events_after_insert_trigger22 AFTER INSERT ON events23 FOR EACH ROW EXECUTE PROCEDURE events_after_insert_trigger();24
25CREATE TABLE IF NOT EXISTS locker26(27 event_id uuid not null,28 aggregate_id uuid not null,29 lock_domain varchar(50) not null,30 constraint locker_pk31 primary key (event_id, aggregate_id, lock_domain)32);Самая мякотка тут это строчка
11pg_notify('events:created', CONCAT(NEW.id::text, ',', NEW.type_id::text, ',', NEW.aggregate_id::text));
В которой мы отправляем в канал events:created строку формата uuid,int,uuid. И как раз этот формат разбирается нашим хелпером в файле internal/event_listener/helpers.go.
Опять таки это можно расширять и модицифировать как угодно.
Для down файла содержание будет обратным добавлению
41DROP TRIGGER IF EXISTS events_after_insert_trigger ON events;2DROP FUNCTION IF EXISTS events_after_insert_trigger();3DROP TABLE IF EXISTS locker;4DROP TABLE IF EXISTS events;Надо не забыть про таблицу с представлением домена пользователей.
11go run main.go migrate/create_sql -n users
up:
91CREATE TABLE IF NOT EXISTS users2(3 id uuid4 constraint users_pk5 primary key,6 username varchar(100),7 password varchar(255),8 email varchar(100)9);down:
11DROP TABLE IF EXISTS users;Ну и добавляем наши миграции к базе:
11go run main.go migrate/up
Ну и вишенка на торте. Теперь мы можем исправить код файла cmd/run.go
471package cmd2
3import (4 "example-event-sourcing/internal/aggregates"5 "example-event-sourcing/internal/db"6 "example-event-sourcing/internal/domains/users"7 "example-event-sourcing/internal/event_listener"8 "example-event-sourcing/internal/handlers"9 "example-event-sourcing/internal/repositories"10 "example-event-sourcing/internal/router"11 "github.com/spf13/cobra"12 "github.com/uptrace/bun"13)14
15func getAllDomain(dbConnection *bun.DB) []aggregates.Domain {16 eventRepository := repositories.NewEventRepository(dbConnection)17 eventHandlers := handlers.NewEventHandlers(eventRepository)18 eventListener := event_listener.New(dbConnection)19 domains := []aggregates.Domain{20 users.New(dbConnection, eventListener, eventRepository, eventHandlers),21 }22
23 return domains24}25
26// runCmd represents the run command27var runCmd = &cobra.Command{28 Use: "run",29 Short: "Run application",30 Run: func(cmd *cobra.Command, args []string) {31 dbConnection := db.Connect()32 defer dbConnection.Close()33 domains := getAllDomain(dbConnection)34 server := router.New()35 for _, domain := range domains {36 go domain.Run()37 server.AddQueryRoutes(domain.QueryRoutes())38 server.AddMutationRoutes(domain.MutationRoutes())39 }40
41 server.Run()42 },43}44
45func init() {46 rootCmd.AddCommand(runCmd)47}Тут нам интересен исключительно момент формирования списка доменов в функции getAllDomain. При расширении функционала мы просто напросто должны добавить в переменную domains инициализацию другого домена.
Теперь можете запустить приложение:
11go run main.go run
Ну или собрать его и запускать бинарь.
Если вы работаете в JetBrains'овских IDE. То создайте файл test.http в корне проекта и можете поиграться с примерами API запросов:
361### GET request to example server to featch all users2GET http://localhost:7755/users3
4###5
6### POST request to example server to create a new user7POST http://localhost:7755/users8Content-Type: application/json9
10{11 "id": "000000aa-1111-2222-3333-000000000001",12 "aggregate_id": "100000bb-1111-2222-3333-000000000001",13 "type_id": 1000,14 "payload": {15 "id": "100000bb-1111-2222-3333-000000000001",16 "username": "test",17 "email": "test@test.test"18 }19}20
21###22
23### POST request to example server to update the password for existing aggregate24POST http://localhost:7755/users25Content-Type: application/json26
27{28 "id": "000000aa-1111-2222-3333-000000000002",29 "aggregate_id": "100000bb-1111-2222-3333-000000000001",30 "type_id": 1001,31 "payload": {32 "password": "test_password"33 }34}35
36###В примере выше мы отдаем клиенту на откуп генерацию идентификаторов для аггрегата и для события при запросе. Тому есть такое объяснение:
Генерация автоматических идентификаторов на стороне бекенда черевата дублированием при повторе одного и того же запроса. Поэтому при выполнении запросов клиент должен сам позаботиться об их генерации. Это немного усложняет логику на клиенте, но буквально на пару функций. Но бенефитов от этого больше.
В частности по этому я использую в примере uuid'ы, что в целом должно свести практически к нулю коллизии по пересечению идентификаторов. Использование serial и автоинкрементов в бд приведет к необходимости добавить больше логики для фильтрации дублей событий.
Идея более чем жизнеспособна, но имеет свои нюансы и особенности. Использовать её или нет нужно принимать решение взвешенно и обдуманно. Не стоит воспринимать это как что-то хорошее или плохое. Это просто вариация на тему и при том не самая лучшая с точки зрения кода, можно написать лучше. Но я старался донести лишь саму идею и показать как это может быть.
Получилось ли достичь целей?
Хочу иметь минимум инфраструктуры ✅
Хочу иметь максимально взрослую архитектуру приложения сразу ✅
Это должно быть быстро разрабатываемо ✅
Это должно быть понятно и просто, т.е. перечень технологий должен быть минимален ✅
Хорошо, но это явно не финишная прямая.
Далее разовьем эту идею и пример с точки зрения того, как не тратиться на написание спецификаций и использовать кодогенерацию для взаимодействия с фронтендом.