Данные можно хранить в двух вариациях. Сразу в финальном их состоянии или иметь список событий, последовательно обработав которые вы получаете это состояние. Последнее называют Event Sourcing'ом.
Event Sourcing — это подход работы с событиями. Вы используете набор событий об изменениях и на их основаниях реконструируете состояние вашего объекта.
В современном мире в любом бизнес приложение рано или поздно придется найти ответ на вопрос - кто, что и когда поменял? Можно ли откатиться данные к предыдущему состоянию? или например как посмотреть срез данных актуальных на неделю назад? Поэтому начинать что-то новое и не задуматься об архитектуре на берегу, с моей точки зрения, - преступно.
Но есть обратная сторона медали - это скорость и опыт. Даже при наличии опыта и знаний, как строить такие системы правильно - я предпочту не начинать проект с инфраструктурной стройки. А иметь хороший брокер сообщений и быстрое хранилище данных - это то, что нужно, по хорошему, подготовить для работы с Event Sourcing'ом. Зачастую связка получается типа Kafka + RabbitMQ + Mongo/Elastic. Это отлично работает, но я не хочу поднимать у себя локально всех этих монстров, чтобы просто начать писать код. Да и не нужны они до тех пор пока ваши пользователи не создают сотни тысяч запросов в секунду и перед вами не стоят задачи масштабирования.
Поэтому вводные для поиска хорошего решения у меня были такие:
Хочу иметь минимум инфраструктуры
Хочу иметь максимально взрослую архитектуру приложения сразу
Это должно быть быстро разрабатываемо
Это должно быть понятно и просто, т.е. перечень технологий должен быть минимален
Так я пришел к бесконечно полной любви с PostgreSQL.
Но одной работы с событиями будет не достаточно, также стоит пересмотреть подход к работе с API как таковым. Можно много спорить, кидаться книжками, проектами, докладами о том, как правильно строить взаимодействие с приложением между клиентами. Но моё мнение - CQRS. Особенно, учитывая тот факт, что вместе с событийно ориентированной архитектурой он работает просто идеально.
The Command and Query Responsibility Segregation - принцип или парадигма CQRS разделяет назначение запросов и команд на обработку данных.
Проще говоря API делится на два типа запросов - мутационные и запрашивающие какие-то данные.
Вместе с событийно ориентированным подходом у нас получается буквально один мутационный запрос на вставку событий и какое-то кол-во необходимых и совершенно разных запросов на чтение данных.
Помимо этого тут стоит понимать, что представление данных и их чтение не обязано быть исполнено уже в классическом виде. А может и должно быть ближе к предметно ориентированному проектированию.
Предметно-ориентированное проектирование — набор принципов и схем, направленных на создание оптимальных систем объектов. Сводится к созданию программных абстракций, которые называются моделями предметных областей.
Конечно нет никакого смысла упираться рогом в матчасть от Эванса и тащить DDD во всех красках, но я буду заимствовать некоторые практики их этого подхода.
Также важно понимать, что в целом мы будем паразитировать около паттерна Event Streaming. Но лишь от части и не сильно погружаясь в детали. Суть в том, что совокупность связки Event Storing + Event Streaming + CQRS дает как раз тот результат, к которому я стремлюсь с точки зрения приложения.
Потоковая передача событий (Event Streaming) — это процесс непрерывного захвата и хранения событий по мере их возникновения в системе. Эти события затем можно обрабатывать и анализировать в режиме реального времени или сохранять для последующего анализа.
Идея достаточно простая:
Если на пальцах, то:
У нас есть таблица событий.
На таблице событий висит триггер выполняющий функцию отправки сообщения.
На стороне приложения есть слушатель топика сообщений из таблицы событий. Да, в PG есть функционал notify/listen.
На основе полученного сообщения некоторая логика обработки события запрашивает из базы данных по идентификатору события его payload и выполняет обновление представления или нескольких представлений, если это того требует.
Также на стороне приложения все методы чтения данных работают только с таблицами представления данных.
При этом подходе у нас нет никаких ограничений по организации структуры данных, хотим в json все сложим, хотим нормализуем полностью - это все зависит от задач.
Весь работающий пример ниже доступен в репозитории на GitHub.
Я буду реализовывать пример на Go и на том наборе библиотек, которые мне нравятся и кажутся удобными, но сама идея может быть реализована как вам будет угодно и не зависит от используемых мной решений.
Создаем новый проект и используя cobra от spf13 инициируем точку входа.
31mkdir example-event-sourcing && cd example-event-sourcing
2go mod init example-event-sourcing
3go get github.com/spf13/cobra
Лучше установить cobra-cli и выполнить:
41go install github.com/spf13/cobra-cli@latest
2
3cobra-cli init
4cobra-cli add run
Не будем сейчас останавливаться на тонком описании и настройки команд, продолжим накидывать структуру.
Для начала неплохо было бы научить наше приложение читать какие-то параметры конфигурации из .env
. Тут мне поможет viper от тех же spf13 и отдельный пакетик внутри приложения, который упростит мне жизнь.
11go get github.com/spf13/viper
Положим этот файлик в проекте по адресу internal/config/config.go
.
471package config
2
3import (
4 "github.com/spf13/viper"
5 "log"
6)
7
8// EnvConfigs Initialize this variable to access the env values
9var EnvConfigs *envConfigs
10
11// InitEnvConfigs We will call this in main.go to load the env variables
12func InitEnvConfigs() {
13 EnvConfigs = loadEnvVariables()
14}
15
16// struct to map env values
17type envConfigs struct {
18 AppServerPort string `mapstructure:"APP_SERVER_PORT"`
19 DbDsn string `mapstructure:"DB_DSN"`
20}
21
22// Call to load the variables from env
23func loadEnvVariables() (config *envConfigs) {
24 // Tell viper the path/location of your env file. If it is root just add "."
25 viper.AddConfigPath(".")
26
27 // Tell viper the name of your file
28 viper.SetConfigName(".env")
29
30 // Tell viper the type of your file
31 viper.SetConfigType("env")
32
33 // Viper reads all the variables from env file and log error if any found
34 if err := viper.ReadInConfig(); err != nil {
35 log.Fatal("Error reading env file", err)
36 }
37
38 // Viper unmarshal the loaded env variables into the struct
39 if err := viper.Unmarshal(&config); err != nil {
40 log.Fatal(err)
41 }
42 return
43}
44
45func init() {
46 InitEnvConfigs()
47}
в типе envConfigs объявим те данные, которые нам будут важны и будем расширять его по мере необходимости.
Благодаря тому, что мы читаем конфигурацию в init() первый же подключение нашего файла в проекте заставит его прочитать конфигурацию. Есть подводные камни в такой реализации, но сейчас опустим их.
Дальше нам понадобится сама работа с базой. Я очень люблю ORM от uptrace, которая называется bun. Более того, я фанат еще со времен go-pg от них же. Поэтому в качестве основы буду отталкиваться от этой библиотеки. Она ни к чему вас не обяжет, но жизнь сильно упростит во многих аспектах.
31go get github.com/uptrace/bun
2go get github.com/uptrace/bun/dialect/pgdialect
3go get github.com/uptrace/bun/driver/pgdriver
Добавим сразу еще uuid так как будем их использовать для типизации идентификаторов
11go get github.com/google/uuid
Теперь пора создать небольшую абстракцию, для работы с базой.
Добавим файл и новый пакет по адресу internal/db/db.go
151package db
2
3import (
4 "database/sql"
5 "github.com/uptrace/bun"
6 "github.com/uptrace/bun/dialect/pgdialect"
7 "github.com/uptrace/bun/driver/pgdriver"
8 "example-event-sourcing/internal/config"
9)
10
11func Connect() *bun.DB {
12 sqlDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(config.EnvConfigs.DbDsn)))
13 db := bun.NewDB(sqlDB, pgdialect.New())
14 return db
15}
В перспективе этот файл может расширяться и обрастать дополнительными функциями, плюс так проще прокидывать зависимость по проекту будет.
Теперь сразу закроем вопрос с миграциями, ибо я не хочу тащить никакой дополнительный инструмент для этого, а в bun все есть.
Создайте новый файл cmd/migrations.go
со следующим содержанием:
1571package cmd
2
3import (
4 "context"
5 "example-event-sourcing/internal/db"
6 "example-event-sourcing/migrations"
7 "fmt"
8 "github.com/spf13/cobra"
9 "github.com/uptrace/bun"
10 "github.com/uptrace/bun/migrate"
11 "log"
12)
13
14const MigrationsGroup = "migrations"
15
16var migrationNameSql *string
17
18func getMigrator(db *bun.DB) *migrate.Migrator {
19 return migrate.NewMigrator(db, migrations.Migrations)
20}
21
22var migrationGroup = &cobra.Group{
23 ID: MigrationsGroup,
24 Title: "Migrations",
25}
26
27var migrateInit = &cobra.Command{
28 Use: "migrate/init",
29 Short: "create migration tables",
30 GroupID: MigrationsGroup,
31 Run: func(cmd *cobra.Command, args []string) {
32 database := db.Connect()
33 defer func(database *bun.DB) {
34 err := database.Close()
35 if err != nil {
36 panic(err)
37 }
38 }(database)
39 migrator := getMigrator(database)
40 c := context.Background()
41 err := migrator.Init(c)
42 if err != nil {
43 log.Fatalln(err.Error())
44 }
45 },
46}
47
48var migrateUp = &cobra.Command{
49 Use: "migrate/up",
50 Short: "migrate database",
51 GroupID: MigrationsGroup,
52 Run: func(cmd *cobra.Command, args []string) {
53 database := db.Connect()
54 defer func(database *bun.DB) {
55 err := database.Close()
56 if err != nil {
57 panic(err)
58 }
59 }(database)
60 migrator := getMigrator(database)
61 c := context.Background()
62
63 if err := migrator.Lock(c); err != nil {
64 log.Fatalln(err.Error())
65 }
66 defer func(migrator *migrate.Migrator, ctx context.Context) {
67 err := migrator.Unlock(ctx)
68 if err != nil {
69 log.Fatalln(err.Error())
70 }
71 }(migrator, c)
72
73 group, err := migrator.Migrate(c)
74 if err != nil {
75 log.Fatalln(err.Error())
76 }
77 if group.IsZero() {
78 fmt.Printf("there are no new migrations to run (database is up to date)\n")
79 }
80 fmt.Printf("migrated to %s\n", group)
81 },
82}
83
84var migrateDown = &cobra.Command{
85 Use: "migrate/down",
86 Short: "rollback the last migration group",
87 GroupID: MigrationsGroup,
88 Run: func(cmd *cobra.Command, args []string) {
89 database := db.Connect()
90 defer func(database *bun.DB) {
91 err := database.Close()
92 if err != nil {
93 panic(err)
94 }
95 }(database)
96 migrator := getMigrator(database)
97 c := context.Background()
98
99 if err := migrator.Lock(c); err != nil {
100 log.Fatalln(err.Error())
101 }
102 defer func(migrator *migrate.Migrator, ctx context.Context) {
103 err := migrator.Unlock(ctx)
104 if err != nil {
105 log.Fatalln(err.Error())
106 }
107 }(migrator, c)
108
109 group, err := migrator.Rollback(c)
110 if err != nil {
111 log.Fatalln(err.Error())
112 }
113 if group.IsZero() {
114 fmt.Printf("there are no groups to roll back\n")
115 }
116 fmt.Printf("rolled back %s\n", group)
117 },
118}
119
120var migrateCreateSql = &cobra.Command{
121 Use: "migrate/create_sql",
122 Short: "create up and down SQL migrations",
123 GroupID: MigrationsGroup,
124 Run: func(cmd *cobra.Command, args []string) {
125 database := db.Connect()
126 defer func(database *bun.DB) {
127 err := database.Close()
128 if err != nil {
129 panic(err)
130 }
131 }(database)
132 migrator := getMigrator(database)
133 c := context.Background()
134 if migrationNameSql == nil {
135 log.Fatalln("name not specified")
136 }
137 files, err := migrator.CreateSQLMigrations(c, *migrationNameSql)
138 if err != nil {
139 log.Fatalln(err.Error())
140 }
141
142 for _, mf := range files {
143 fmt.Printf("created migration %s (%s)\n", mf.Name, mf.Path)
144 }
145
146 },
147}
148
149func init() {
150 rootCmd.AddGroup(migrationGroup)
151 rootCmd.AddCommand(migrateInit)
152 rootCmd.AddCommand(migrateUp)
153 rootCmd.AddCommand(migrateDown)
154 rootCmd.AddCommand(migrateCreateSql)
155
156 migrationNameSql = migrateCreateSql.Flags().StringP("name", "n", "", "migration name")
157}
Теперь у вас есть ряд команд, которые помогут в работе с миграциями. Это не исчерпывающий перечень, но достаточный для работы этого примера. Для расширения перечня команд посмотрите примеры реализаций у bun.
Еще нужно создать саму папку с миграциями ./migrations
и положить туда файл main.go
111package migrations
2
3import "github.com/uptrace/bun/migrate"
4
5var Migrations = migrate.NewMigrations()
6
7func init() {
8 if err := Migrations.DiscoverCaller(); err != nil {
9 panic(err)
10 }
11}
Для этого создадим новый файл и пакет internal/event_listener/event_listener.go
541package event_listener
2
3import (
4 "context"
5 "github.com/uptrace/bun"
6 "github.com/uptrace/bun/driver/pgdriver"
7 "log"
8 "strings"
9)
10
11const (
12 EventsCreated = "events:created"
13)
14
15type EventListener struct {
16 db *bun.DB
17 listener *pgdriver.Listener
18 retryCounter map[string]int
19}
20
21func New(db *bun.DB) *EventListener {
22 return &EventListener{db: db, listener: pgdriver.NewListener(db), retryCounter: make(map[string]int)}
23}
24
25func (el *EventListener) Listen(ctx context.Context, channel string, callback func(string) error) {
26 if err := el.listener.Listen(ctx, channel); err != nil {
27 panic(err)
28 }
29
30 for notification := range el.listener.Channel() {
31 err := callback(notification.Payload)
32 if err != nil && !strings.Contains(err.Error(), `duplicate key value violates unique constraint "locker_pk"`) {
33 log.Println(err)
34 if _, ok := el.retryCounter[notification.Payload]; !ok {
35 el.retryCounter[notification.Payload] = 0
36 }
37 if el.retryCounter[notification.Payload] < 3 {
38 el.retryCounter[notification.Payload]++
39 el.Notify(ctx, channel, notification.Payload)
40 }
41 } else {
42 // fixme unsafe, but will see how it goes
43 go func() {
44 delete(el.retryCounter, notification.Payload)
45 }()
46 }
47 }
48}
49
50func (el *EventListener) Notify(ctx context.Context, channel string, payload string) {
51 if err := pgdriver.Notify(ctx, el.db, channel, payload); err != nil {
52 panic(err)
53 }
54}
и положим рядом еще helpers.go
181package event_listener
2
3import (
4 "strconv"
5 "strings"
6)
7
8func ExtractIdAndTypeFromPayload(payload string) (string, int, string) {
9 data := strings.Split(payload, ",")
10 if len(data) == 3 {
11 eventTypeID, err := strconv.Atoi(data[1])
12 if err != nil {
13 return "", 0, ""
14 }
15 return data[0], eventTypeID, data[2]
16 }
17 return "", 0, ""
18}
Этот файл поможет нам разбирать то, что будет отправлять нам функция из триггера далее.
Теперь подготовим все к тому, что бы запустить веб сервер. Я буду использовать gin как один из популярных роутеров и наиболее мне удобный.
11go get github.com/gin-gonic/gin
Создадим еще один пакет и файл internal/router/router.go
301package router
2
3import (
4 "github.com/gin-gonic/gin"
5 "example-event-sourcing/internal/config"
6)
7
8type Router struct {
9 router *gin.Engine
10}
11
12func New() *Router {
13 return &Router{router: gin.Default()}
14}
15
16func (r *Router) AddMutationRoutes(routes map[string]gin.HandlerFunc) {
17 for path, handler := range routes {
18 r.router.POST(path, handler)
19 }
20}
21
22func (r *Router) AddQueryRoutes(routes map[string]gin.HandlerFunc) {
23 for path, handler := range routes {
24 r.router.GET(path, handler)
25 }
26}
27
28func (r *Router) Run() {
29 r.router.Run(config.EnvConfigs.AppServerPort)
30}
Опять таки, можете эту идею развить и добавить сюда все что понадобится и как вам будет угодно. CORS, дефолтные роуты и т.д.
Отлично, мы почти закончили с подготовительными работами.
Давайте добавим немного базовых интерфейсов и моделей, которые нам пригодятся. Для этого создадим пакет и файл internal/aggregates/models.go
и положим в него следующее:
421package aggregates
2
3import (
4 "context"
5 "encoding/json"
6 "github.com/gin-gonic/gin"
7 "github.com/google/uuid"
8 "github.com/uptrace/bun"
9 "time"
10)
11
12type Domain interface {
13 Run()
14 QueryRoutes() map[string]gin.HandlerFunc
15 MutationRoutes() map[string]gin.HandlerFunc
16}
17
18type Aggregate interface {
19 GetID() string
20}
21
22type Event struct {
23 bun.BaseModel `bun:"table:events,alias:e"`
24 ID string `bun:"id" json:"id"`
25 TypeID int `bun:"type_id" json:"type_id"`
26 AggregateID string `bun:"aggregate_id" json:"aggregate_id"`
27 Payload json.RawMessage `bun:"payload" json:"payload"`
28 CreatedAt time.Time `bun:"created_at" json:"created_at"`
29}
30
31type Locker struct {
32 bun.BaseModel `bun:"table:locker,alias:loc"`
33 EventID uuid.UUID `bun:"event_id,type:uuid" json:"event_id"`
34 AggregateID uuid.UUID `bun:"aggregate_id,type:uuid" json:"aggregate_id"`
35 LockDomain string `bun:"lock_domain" json:"lock_domain"`
36}
37
38type AggregateEvent int
39
40type EventDelete struct {
41 ID string `json:"id"`
42}
Тут вы видите
интерфейс для доменного пакета, который должен будет реализовывать несколько функций, о них будет ниже.
интерфейс для агрегата.
базовую модель для события.
структурка для таблицы блокировки, о ней расскажу тоже чуть позже.
и заготовку для типов событий.
а также базовый тип события для удаления.
Если пока что-то кажется странным, подождите, скоро все станет понятным.
Теперь давайте добавим базовый репозиторий для работы с запросами событий.
Создадим еще один пакет с файлом internal/repositories/event_repository.go
261package repositories
2
3import (
4 "context"
5 "github.com/uptrace/bun"
6 "example-event-sourcing/internal/aggregates"
7)
8
9type EventRepository struct {
10 db *bun.DB
11}
12
13func NewEventRepository(db *bun.DB) *EventRepository {
14 return &EventRepository{db: db}
15}
16
17func (r *EventRepository) GetEventByIdAndType(ctx context.Context, id string, eventType int) (*aggregates.Event, error) {
18 events := &aggregates.Event{}
19 err := r.db.NewSelect().Model(events).Where("id = ? and type_id = ?", id, eventType).Scan(ctx)
20 return events, err
21}
22
23func (r *EventRepository) SaveEvent(ctx context.Context, event *aggregates.Event) (*aggregates.Event, error) {
24 _, err := r.db.NewInsert().Model(event).Exec(ctx)
25 return event, err
26}
А также давайте добавим общий хендлер для событий в пакет internal/handlers/event_handlers.go
321package handlers
2
3import (
4 "github.com/gin-gonic/gin"
5 "example-event-sourcing/internal/aggregates"
6 "example-event-sourcing/internal/repositories"
7)
8
9type EventHandlers struct {
10 eventRepository *repositories.EventRepository
11}
12
13func NewEventHandlers(eventRepository *repositories.EventRepository) *EventHandlers {
14 return &EventHandlers{
15 eventRepository: eventRepository,
16 }
17}
18
19func (e *EventHandlers) SaveEvent(c *gin.Context) {
20 event := &aggregates.Event{}
21 err := c.BindJSON(event)
22 if err != nil {
23 c.JSON(500, gin.H{
24 "error": err.Error(),
25 })
26 return
27 }
28 _, err = e.eventRepository.SaveEvent(c, event)
29 c.JSON(200, gin.H{
30 "result": "Event saved",
31 })
32}
В моей картинке мира, когда нибудь весь этот код может понадобится растащить на микросервисы, а значит будет логичнее если я сразу домены буду отделять друг от друга. Вдруг они будут крутиться в разных контейнерах. Это просто упростит рефакторинг в будущем. Шаг не обязательный и может быть упрощен, но я оставлю все так как есть.
Доменный пакет - это пакет содержащий в себе все модели, запросы к бд, хендлеры для роутов и справочную информацию по конкретной предметной области.
Представим, что у нас есть пока только одна сущность для примера - это пользователи. Поэтому будем оперировать ей.
Создадим пакет со следующей структурой:
41internal/domains/users/aggregator/users.go
2internal/domains/users/models/users.go
3internal/domains/users/repository/users.go
4internal/domains/users/users.go
В файле internal/domains/users/models/users.go
объявим нашу модель представления данных:
201package models
2
3import (
4 "github.com/google/uuid"
5 "github.com/uptrace/bun"
6)
7
8type User struct {
9 bun.BaseModel `bun:"table:users,alias:u"`
10
11 ID uuid.UUID `bun:"id,type:uuid" json:"id"`
12 Username string `bun:"username" json:"username"`
13 Password string `bun:"password" json:"-"`
14 Email string `bun:"email" json:"email"`
15
16}
17
18func (u *User) GetID() string {
19 return u.ID.String()
20}
В файл internal/domains/users/repository/users.go
добавим запросы к базе даных:
991package repository
2
3import (
4 "context"
5 "github.com/google/uuid"
6 "github.com/uptrace/bun"
7 "example-event-sourcing/internal/aggregates"
8 "example-event-sourcing/internal/domains/users/models"
9)
10
11type Repository struct {
12 db *bun.DB
13}
14
15func New(db *bun.DB) *Repository {
16 return &Repository{db: db}
17}
18
19func (r *Repository) Find(ctx context.Context, id string) (aggregates.Aggregate, error) {
20 user := &models.User{}
21 err := r.db.NewSelect().Model(user).Where("id = ?", id).Scan(ctx)
22 if err != nil {
23 return nil, err
24 }
25 return user, nil
26}
27
28func (r *Repository) Save(ctx context.Context, aggregate aggregates.Aggregate, eventID string) error {
29 user := aggregate.(*models.User)
30 tx, err := r.db.BeginTx(ctx, nil)
31 if err != nil {
32 tx.Rollback()
33 return err
34 }
35 _, err = r.Find(ctx, user.GetID())
36 if err != nil {
37 _, err := tx.NewInsert().Model(user).Exec(ctx)
38 if err != nil {
39 tx.Rollback()
40 return err
41 }
42 }
43 _, err = tx.NewUpdate().Model(user).Where("id = ?", user.ID).Exec(ctx)
44 if err != nil {
45 tx.Rollback()
46 return err
47 }
48
49 _, err = tx.NewInsert().Model(&aggregates.Locker{
50 EventID: uuid.MustParse(eventID),
51 AggregateID: uuid.MustParse(aggregate.GetID()),
52 LockDomain: "users",
53 }).Exec(ctx)
54
55 if err != nil {
56 tx.Rollback()
57 return err
58 }
59
60 return tx.Commit()
61}
62
63func (r *Repository) Delete(ctx context.Context, id, eventID string) error {
64 tx, err := r.db.BeginTx(ctx, nil)
65 if err != nil {
66 tx.Rollback()
67 return err
68 }
69 _, err = tx.NewDelete().Model(&models.User{}).Where("id = ?", id).Exec(ctx)
70 if err != nil {
71 tx.Rollback()
72 return err
73 }
74 _, err = tx.NewInsert().Model(&aggregates.Locker{
75 EventID: uuid.MustParse(eventID),
76 AggregateID: uuid.MustParse(id),
77 LockDomain: "users",
78 }).Exec(ctx)
79
80 if err != nil {
81 tx.Rollback()
82 return err
83 }
84 return tx.Commit()
85}
86
87func (r *Repository) All(ctx context.Context) ([]aggregates.Aggregate, error) {
88 data := make([]aggregates.Aggregate, 0)
89 users := make([]*models.User, 0)
90 err := r.db.NewSelect().Model(&users).Scan(ctx)
91 if err != nil {
92 return nil, err
93 }
94 for _, user := range users {
95 data = append(data, user)
96 }
97
98 return data, nil
99}
Их всего 4:
Получить запись по идентифкатору
Получить все записи
Сохранить данные
Удалить данные
И вот пришла пора пояснить за таблицу блокировок и почему наши запросы Save и Delete реализованы таким образом.
Связка выглядит так:
А нужна для того, что бы обезопасить данные от двойной перезаписи в случае дублирования события по каким-то причинам или в случае запуска двух или более подписчиков (например копий приложения).
Логика такая, что во время обновления, вставки или удаления данных мы опираемся на связку в виде идентификаторов аггрегата, события и названия представления данных домена (таблицы для вставки по сути в текущем примере). И транзакция изменения данных в представлении, помимо обновления самой таблицы, требует вставки данных в таблицу блокировки. Если же составной основной ключ из трех полей выше уже существует, то транзакция не выполнится и мы не будем лишний раз перестраивать данные. Это лишь один из вариантов реализации защиты, не самый лучший, но достаточный для начала.
Попутно эта таблица блокировок будет являться некоторым источником истины на предмет, - а все ли события были применены к представлению?
А также хеш сумма ключей по аггрегату может являть ключом ревизии данных, нам это сейчас не нужно, но в будущем может пригодиться :-)
Теперь же давайте добавим саму логику обработки событий.
В файле internal/domains/users/aggregator/users.go
разместим следущий код:
1541package aggregator
2
3import (
4 "context"
5 "encoding/json"
6 "github.com/uptrace/bun"
7 "example-event-sourcing/internal/aggregates"
8 "example-event-sourcing/internal/domains/users/models"
9 "example-event-sourcing/internal/domains/users/repository"
10 "example-event-sourcing/internal/event_listener"
11 "example-event-sourcing/internal/repositories"
12)
13
14const Factor = 1000
15
16const (
17 UsersCreated aggregates.AggregateEvent = iota + Factor
18 UsersUpdatePassword
19 UsersUpdateUsername
20 UsersUpdateEmail
21 UsersDeleted
22
23 UsersUnusedEvent
24)
25
26type EventUpdatePassword struct {
27 Password string `json:"password"`
28}
29
30type EventUpdateUsername struct {
31 Username string `json:"username"`
32}
33
34type EventUpdateEmail struct {
35 Email string `json:"email"`
36}
37
38func ConvertFromInt(in int) aggregates.AggregateEvent {
39 return aggregates.AggregateEvent(in)
40}
41
42func IsEventTypeValid(eventType aggregates.AggregateEvent) bool {
43 return eventType >= UsersCreated && eventType < UsersUnusedEvent
44}
45
46type Aggregator struct {
47 db *bun.DB
48 eventListener *event_listener.EventListener
49 repository *repository.Repository
50 eventRepository *repositories.EventRepository
51}
52
53func New(
54 db *bun.DB,
55 eventListener *event_listener.EventListener,
56 repository *repository.Repository,
57 eventRepository *repositories.EventRepository,
58) *Aggregator {
59 return &Aggregator{
60 db: db,
61 eventListener: eventListener,
62 repository: repository,
63 eventRepository: eventRepository,
64 }
65}
66
67func (a *Aggregator) Run() {
68 ctx := context.Background()
69 a.eventListener.Listen(ctx, event_listener.EventsCreated, a.buildUsers)
70}
71
72func (a *Aggregator) GetUserByID(ctx context.Context, id string) (*models.User, error) {
73 userAggregate, err := a.repository.Find(ctx, id)
74 if err != nil {
75 return nil, err
76 }
77 user := userAggregate.(*models.User)
78 return user, nil
79}
80
81func (a *Aggregator) ExtractUserAndUpdatedModel(ctx context.Context, eventID string, eventType int, aggregateID string, eventUpdate interface{}) (*models.User, interface{}, error) {
82 event, err := a.eventRepository.GetEventByIdAndType(ctx, eventID, eventType)
83 if err != nil {
84 return nil, nil, err
85 }
86
87 user, err := a.GetUserByID(ctx, aggregateID)
88 if err != nil {
89 return nil, nil, err
90 }
91
92 payloadJson := event.Payload
93 err = json.Unmarshal(payloadJson, eventUpdate)
94 if err != nil {
95 return nil, nil, err
96 }
97
98 return user, eventUpdate, nil
99}
100
101func (a *Aggregator) buildUsers(payload string) error {
102 ctx := context.Background()
103 id, eventType, aggregateID := event_listener.ExtractIdAndTypeFromPayload(payload)
104 convertedType := ConvertFromInt(eventType)
105 if !IsEventTypeValid(convertedType) {
106 return nil
107 }
108 switch convertedType {
109 case UsersCreated:
110 event, err := a.eventRepository.GetEventByIdAndType(ctx, id, eventType)
111 if err != nil {
112 return err
113 }
114 user := &models.User{}
115 payloadJson := event.Payload
116 err = json.Unmarshal(payloadJson, user)
117 if err != nil {
118 return err
119 }
120 return a.repository.Save(ctx, user, id)
121
122 case UsersUpdatePassword:
123 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdatePassword{})
124 if err != nil {
125 return err
126 }
127 user.Password = userUpdate.(*EventUpdatePassword).Password
128 return a.repository.Save(ctx, user, id)
129 case UsersUpdateEmail:
130 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdateEmail{})
131 if err != nil {
132 return err
133 }
134 user.Email = userUpdate.(*EventUpdateEmail).Email
135 return a.repository.Save(ctx, user, id)
136 case UsersUpdateUsername:
137 user, userUpdate, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &EventUpdateUsername{})
138 if err != nil {
139 return err
140 }
141 user.Username = userUpdate.(*EventUpdateUsername).Username
142 return a.repository.Save(ctx, user, id)
143 case UsersDeleted:
144 user, _, err := a.ExtractUserAndUpdatedModel(ctx, id, eventType, aggregateID, &aggregates.EventDelete{})
145 if err != nil {
146 return err
147 }
148 return a.repository.Delete(ctx, user.ID.String(), id)
149 default:
150 panic("unhandled default case")
151 }
152
153 return nil
154}
Тут нас больше всего волнует сама функция Run()
, которая вызывает некоторую логику сборки представления.
Суть заключается в понимании того, что за тип события мы получили выполнения какой-то логики и вызывание метода Save. В реальность рекомендуется разносить тела case в switch по разным функциям конечно же.
Остался основной файл internal/domains/users/users.go
Который по сути должен реализовать интерфейс Domain
681package users
2
3import (
4 "github.com/gin-gonic/gin"
5 "github.com/uptrace/bun"
6 "example-event-sourcing/internal/aggregates"
7 "example-event-sourcing/internal/domains/users/aggregator"
8 "example-event-sourcing/internal/domains/users/models"
9 "example-event-sourcing/internal/domains/users/repository"
10 "example-event-sourcing/internal/event_listener"
11 "example-event-sourcing/internal/handlers"
12 "example-event-sourcing/internal/repositories"
13)
14
15type UserDomain struct {
16 db *bun.DB
17 eventListener *event_listener.EventListener
18 aggregator *aggregator.Aggregator
19 repository *repository.Repository
20 eventRepository *repositories.EventRepository
21 eventHandlers *handlers.EventHandlers
22}
23
24func New(
25 db *bun.DB,
26 eventListener *event_listener.EventListener,
27 eventRepository *repositories.EventRepository,
28 eventHandlers *handlers.EventHandlers,
29) *UserDomain {
30 repo := repository.New(db)
31 agg := aggregator.New(db, eventListener, repo, eventRepository)
32
33 return &UserDomain{
34 db: db,
35 eventListener: eventListener,
36 repository: repo,
37 aggregator: agg,
38 eventRepository: eventRepository,
39 eventHandlers: eventHandlers,
40 }
41}
42
43func (u *UserDomain) Run() {
44 u.aggregator.Run()
45}
46
47func (u *UserDomain) QueryRoutes() map[string]gin.HandlerFunc {
48 routes := make(map[string]gin.HandlerFunc)
49 routes["/users"] = func(c *gin.Context) {
50 data, err := u.repository.All(c)
51 if err != nil {
52 c.JSON(500, gin.H{
53 "error": err.Error(),
54 })
55 return
56 }
57 c.JSON(200, gin.H{
58 "result": data,
59 })
60 }
61 return routes
62}
63
64func (u *UserDomain) MutationRoutes() map[string]gin.HandlerFunc {
65 routes := make(map[string]gin.HandlerFunc)
66 routes["/users"] = u.eventHandlers.SaveEvent
67 return routes
68}
Тут все предельно просто. Не считая того, зачем мы отделили запись событий в отдельный роут? Это не обязательно, но опять таки более правильно с точки зрения развития идеи дальше, т.к. скорее всего разные роуты могут иметь разные права и вероятно иметь этот уровнь абстрации будет удобнее, нежели же просто одну точку для вброса всех событий. Опять таки конечная реализация все равно на ваших плечах :-)
Мы подходим к непосредственной работе с базой, а значит пора обзавестись каким-либо окружением. Для этого добавим docker-compose.yaml
в корень проекта в котором просто поднимем дефолтный контейнер с postgres.
171version'3.1'
2
3volumes
4 example-event-sourcing_pg_db
5
6services
7 example-event-sourcing_db
8 image postgres
9 restart always
10 environment
11 POSTGRES_PASSWORD=postgres
12 POSTGRES_USER=postgres
13 POSTGRES_DB=example
14 volumes
15 example-event-sourcing_pg_db:/var/lib/postgresql/data
16 ports
17 $ POSTGRES_PORT:-5434 :5432
И теперь, зная явки и пароли, создадим в корне проекта файл .env
21DB_DSN=postgres://postgres:postgres@localhost:5434/example?sslmode=disable
2APP_SERVER_PORT=":7755"
Запустим наш контейнер с базой данных:
11docker-compose up -d
Далее нужно инициализировать миграции как таковые, что бы создались все необходимые таблицы в базе:
11go run main.go migrate/init
Ну и переходим к добавим новых миграций через:
11go run main.go migrate/create_sql -n events
В папке migrations
добавится два файлы, формата YYYYMMDDHHmmss_events.(up|down).sql
В добавленный файл up вставим нашу структуру для работы с событиям
321CREATE TABLE IF NOT EXISTS events
2(
3 id uuid not null,
4 type_id smallserial not null,
5 aggregate_id uuid not null,
6 payload json not null,
7 created_at timestamp default CURRENT_TIMESTAMP not null,
8 constraint events_pk
9 primary key (id, aggregate_id, type_id)
10);
11
12CREATE FUNCTION events_after_insert_trigger()
13 RETURNS TRIGGER AS $$
14BEGIN
15 PERFORM pg_notify('events:created', CONCAT(NEW.id::text, ',', NEW.type_id::text, ',', NEW.aggregate_id::text));
16 RETURN NULL;
17END;
18$$
19 LANGUAGE plpgsql;
20
21CREATE TRIGGER events_after_insert_trigger
22 AFTER INSERT ON events
23 FOR EACH ROW EXECUTE PROCEDURE events_after_insert_trigger();
24
25CREATE TABLE IF NOT EXISTS locker
26(
27 event_id uuid not null,
28 aggregate_id uuid not null,
29 lock_domain varchar(50) not null,
30 constraint locker_pk
31 primary key (event_id, aggregate_id, lock_domain)
32);
Самая мякотка тут это строчка
11pg_notify('events:created', CONCAT(NEW.id::text, ',', NEW.type_id::text, ',', NEW.aggregate_id::text));
В которой мы отправляем в канал events:created
строку формата uuid,int,uuid
. И как раз этот формат разбирается нашим хелпером в файле internal/event_listener/helpers.go
.
Опять таки это можно расширять и модицифировать как угодно.
Для down файла содержание будет обратным добавлению
41DROP TRIGGER IF EXISTS events_after_insert_trigger ON events;
2DROP FUNCTION IF EXISTS events_after_insert_trigger();
3DROP TABLE IF EXISTS locker;
4DROP TABLE IF EXISTS events;
Надо не забыть про таблицу с представлением домена пользователей.
11go run main.go migrate/create_sql -n users
up:
91CREATE TABLE IF NOT EXISTS users
2(
3 id uuid
4 constraint users_pk
5 primary key,
6 username varchar(100),
7 password varchar(255),
8 email varchar(100)
9);
down:
11DROP TABLE IF EXISTS users;
Ну и добавляем наши миграции к базе:
11go run main.go migrate/up
Ну и вишенка на торте. Теперь мы можем исправить код файла cmd/run.go
471package cmd
2
3import (
4 "example-event-sourcing/internal/aggregates"
5 "example-event-sourcing/internal/db"
6 "example-event-sourcing/internal/domains/users"
7 "example-event-sourcing/internal/event_listener"
8 "example-event-sourcing/internal/handlers"
9 "example-event-sourcing/internal/repositories"
10 "example-event-sourcing/internal/router"
11 "github.com/spf13/cobra"
12 "github.com/uptrace/bun"
13)
14
15func getAllDomain(dbConnection *bun.DB) []aggregates.Domain {
16 eventRepository := repositories.NewEventRepository(dbConnection)
17 eventHandlers := handlers.NewEventHandlers(eventRepository)
18 eventListener := event_listener.New(dbConnection)
19 domains := []aggregates.Domain{
20 users.New(dbConnection, eventListener, eventRepository, eventHandlers),
21 }
22
23 return domains
24}
25
26// runCmd represents the run command
27var runCmd = &cobra.Command{
28 Use: "run",
29 Short: "Run application",
30 Run: func(cmd *cobra.Command, args []string) {
31 dbConnection := db.Connect()
32 defer dbConnection.Close()
33 domains := getAllDomain(dbConnection)
34 server := router.New()
35 for _, domain := range domains {
36 go domain.Run()
37 server.AddQueryRoutes(domain.QueryRoutes())
38 server.AddMutationRoutes(domain.MutationRoutes())
39 }
40
41 server.Run()
42 },
43}
44
45func init() {
46 rootCmd.AddCommand(runCmd)
47}
Тут нам интересен исключительно момент формирования списка доменов в функции getAllDomain
. При расширении функционала мы просто напросто должны добавить в переменную domains инициализацию другого домена.
Теперь можете запустить приложение:
11go run main.go run
Ну или собрать его и запускать бинарь.
Если вы работаете в JetBrains'овских IDE. То создайте файл test.http
в корне проекта и можете поиграться с примерами API запросов:
361### GET request to example server to featch all users
2GET http://localhost:7755/users
3
4###
5
6### POST request to example server to create a new user
7POST http://localhost:7755/users
8Content-Type: application/json
9
10{
11 "id": "000000aa-1111-2222-3333-000000000001",
12 "aggregate_id": "100000bb-1111-2222-3333-000000000001",
13 "type_id": 1000,
14 "payload": {
15 "id": "100000bb-1111-2222-3333-000000000001",
16 "username": "test",
17 "email": "test@test.test"
18 }
19}
20
21###
22
23### POST request to example server to update the password for existing aggregate
24POST http://localhost:7755/users
25Content-Type: application/json
26
27{
28 "id": "000000aa-1111-2222-3333-000000000002",
29 "aggregate_id": "100000bb-1111-2222-3333-000000000001",
30 "type_id": 1001,
31 "payload": {
32 "password": "test_password"
33 }
34}
35
36###
В примере выше мы отдаем клиенту на откуп генерацию идентификаторов для аггрегата и для события при запросе. Тому есть такое объяснение:
Генерация автоматических идентификаторов на стороне бекенда черевата дублированием при повторе одного и того же запроса. Поэтому при выполнении запросов клиент должен сам позаботиться об их генерации. Это немного усложняет логику на клиенте, но буквально на пару функций. Но бенефитов от этого больше.
В частности по этому я использую в примере uuid'ы, что в целом должно свести практически к нулю коллизии по пересечению идентификаторов. Использование serial и автоинкрементов в бд приведет к необходимости добавить больше логики для фильтрации дублей событий.
Идея более чем жизнеспособна, но имеет свои нюансы и особенности. Использовать её или нет нужно принимать решение взвешенно и обдуманно. Не стоит воспринимать это как что-то хорошее или плохое. Это просто вариация на тему и при том не самая лучшая с точки зрения кода, можно написать лучше. Но я старался донести лишь саму идею и показать как это может быть.
Получилось ли достичь целей?
Хочу иметь минимум инфраструктуры ✅
Хочу иметь максимально взрослую архитектуру приложения сразу ✅
Это должно быть быстро разрабатываемо ✅
Это должно быть понятно и просто, т.е. перечень технологий должен быть минимален ✅
Хорошо, но это явно не финишная прямая.
Далее разовьем эту идею и пример с точки зрения того, как не тратиться на написание спецификаций и использовать кодогенерацию для взаимодействия с фронтендом.