l

2022年6月30日 星期四

事件溯源(3):將Aggregate儲存至EventStoreDB

June 29 05:39~09:08


▲ezKanban團隊討論Repository設計的記錄 

 

前言

今天要將上一集寫好的Tag Aggregate儲存到EventStoreDB (https://eventstore.com) 這個專門為Event Sourcing所設計的特殊用途開源資料庫。

***

 

準備環境

首先安裝EventStoreDB,官方已經製作好現成的Docker影像檔,無須安裝執行以下指令便可直接使用:

 

docker run --name esdb-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:latest --insecure --run-projections=All --enable-external-tcp --enable-atom-pub-over-http --start-standard-projections

 

如圖1所示,打開瀏覽器輸入以下網址連上EventStoreDB管理畫面: http://localhost:2113/web/index.html#/dashboard

 

▲圖1:EventStoreDB管理畫面 

 

切換到Stream Browser頁面,如圖2所示。之後儲存至EventStoreDB的Aggregate將會出現在這個頁面。


▲圖2:EventStoreDB的Stream Browser頁面

 

接著安裝EventStoreDB的客戶端驅動程式,Teddy採用Maven管理專案依賴,在pom.xml檔案加入以下依賴:

<dependency>
    < groupId>com.eventstore</groupId>
    < artifactId>db-client-java</artifactId>
    <version>2.0.0</version>
</dependency>

目前EventStoreDB最新版的Java驅動程式是3.0.0,Teddy範例程式使用的是2.0.0版本。

***

 

Repository設計與實作

在領域驅動設計中(Domain-Driven Design;DDD)透過Repository設計模式儲存與讀取Aggregate,圖3是Teddy設計的AbstractRepository介面,所有Concrete Repository皆會實作此介面。

 

▲圖3:AbstractRepository介面   

 

圖4為TagRepository介面,它實作AbstractRepository。


▲圖4:TagRepository介面

 

接下來討論TagRepository的實作。雖然本系列的主題是Event Sourcing,但實務上很多開發人員可能因為公司規定或是個人偏好,還是習慣將資料儲存在關聯式資料庫。因此Teddy希望TagRepository的實作能夠同時支援Event Sourcing以及State Sourcing加上Transactional Outbox

最後的設計圖如圖5,TagRepository有兩個實作,分別是TagEventSourcingRepository與TagOutboxRepository。前者支援Event Sourcing,後者支援State Sourcing加Transactional Outbox。今天先討論TagEventSourcingRepository的實作,下一集再來看TagOutboxRepository。

 

▲圖5:支援Event Sourcing與State Sourcing加Transactional Outbox的Repository架構設計圖   

***

 

TagEventSourcingRepository實作

 

TagEventSourcingRepository程式如圖6所示,基本上它把要做的事情都委託給GenericEventSourcingRepository來完成。


▲圖6:TagEventSourcingRepository程式碼

 

圖7為GenericEventSourcingRepository實作,首先看到26行的findById,它透過eventSourcingStore依據Aggregate stream name從資料庫中讀出所有的領域事件(第28~29行)。由於Teddy套用Clean Architecture,所以寫入與讀取自事件溯源資料庫中的資料有一個統一的介面AggregateRootData

第33~34行將AggregateRootData身上的DomainEventData轉成DomainEvent,然後第36行使用Java Reflection技術產生所指定的Aggregate instance,並將剛剛從資料庫中所讀取的領域事件傳給它。Aggregate的建構函數收到領域事件之後會重播(replay)這些事件以便計算出最新的狀態。最後回傳Aggregate instance,如此便成功從事件溯源資料庫中讀取Aggregate。

第48行的save方法比較簡單,首先在51行將傳入的Aggregate轉成AggregateData,然後第52行透過eventSourcingStore儲存這個AggregateData。儲存完畢第53行後重設Aggregate版本,然後在54行清除Aggregate身上的領域事件(因為Aggregate的狀態已經儲存到資料庫,所以要清除它身上的領域事件,否則相同Aggregate若再儲存一次會儲存重複的領域事件)。


▲圖7:GenericEventSourcingRepository程式碼

 

***

 

EsdbStore實作

以上看了老半天鄉民們可能會問:「我還是沒看到領域事件到底如何存到EventStoreDB啊?」因為ezKanban支援不同的事件溯源資料庫,所以採取事件溯源儲存方式的Repository實作是透過EventSourcingStore介面來操作資料庫(請參考圖5)。

圖8中的EsdbStore類別是ezKanban針對EventStoreDB所提供的EventSourcingStore實作,第14行的EventStoreDBClient類別就是EventStoreDB官方提供的EventStoreDB驅動程式,開發人員最終要透過它跟EventStoreDB連線與讀、寫資料。如果鄉民們沒有ezKanban套用Clean Architecture與支援多種事件溯源資料庫的需求,就可以直接在Repository實作類別中使用EventStoreDBClient操作EventStoreDB,這樣子會簡單很多。

接下來看第16行的save方法,它接受AggregateRootData當作輸入參數。這個類別Teddy剛剛介紹過,它是ezKanban為了符合Clean Architecture的跨層原則,設計用來將Aggregate轉成AggregateRootData再傳給資料庫的類別。第21~28行將AggregateRootData身上的DomainEventData轉成EventDataEventData是EventStoreDB驅動程式所設計的資料結構,它是真正存到EventStoreDB的物件。

在操作關聯式資料庫的時候,開發人員要指定Table(資料表),EventStoreDB操作的對象不是Table,而是event stream(事件流,可以想像成它就是一個append only的檔案)。開啟event stream的時候可以指定開啟模式,在EventStoreDB的驅動程式中以AppendToStreamOptions類別表示,如圖8的第31~38行。

EventStoreDB支援樂觀鎖定(optimistic locking),若第32行aggregateRootData.getVersion() 回傳-1代表產生一個新的Aggregate,第34行expectedRevision的參數設定為ExpectedRevision.ANY即可。在此模式底下寫入資料的時候EventStoreDB不會做樂觀鎖定的檢查。如果不是產生新的Aggregate,如37行所示則將expectedRevision的參數設定為new StreamRevision(aggregateRootData.getVersion())。StreamRevision是EventStoreDB驅動程式用來代表event stream中事件版本的物件,在此直接把aggregateRootData.getVersion() 當成參數傳給它即可。設定了expectedRevision之後,寫入資料的時候EventStoreDB就會啟動樂觀鎖定檢查。

最後,第39~41行透過EventStoreDBClient類別的appendToStream方法把資料寫入資料庫中。


▲圖8:EsdbStore程式碼

 

接下來看圖8第57行的load方法,它從Aggregate instance所屬的event stream中讀取資料(每一個Aggregate instance都有一個專屬的event stream儲存它自己的領域事件)。第63~71行呼叫getResolvedEvents方法(參考圖9)讀取資料,讀出的資料格式是EventStoreDB驅動程式所定義的ResolvedEvent類別。

第76行新增aggregateRootData,然後在77~79行將剛剛從資料庫讀出的ResolvedEvent轉成DomainEventData,最後在第84行回傳aggregateRootData。

 


▲圖9:EsdbStore類別的getResolvedEvents方法

 

***

 

執行測試案例

完成TagRepository的整個實作之後,改寫上一集<事件溯源(2):實作Event Sourced Aggregate>的測試案例,把原本InMemoryTagRepository換成TagEventSourcingRepository,如圖10所示。

 


▲圖10:將測試案例中的tagRepository換成TagEventSourcingRepository實作 

 

執行完測試案例打開EventStoreDB管理畫面,看到新增一筆event stream,如圖11所示。

 


▲圖11:從EventStoreDB管理畫面看到剛剛新增的event stream

 

點開這筆event stream,看到裡面有一筆資料,它的型態的TagEvents$TagCreated事件,資料內容採用JSON格式儲存,如圖12所示。



▲圖12:用EventStoreDB管理畫面觀看event stream的內容 

 

***

下集預告

下一集介紹用State Sourcing加Transactional Outbox方式實作TagRepository。

 

***

友藏內心獨白:這一篇居然寫了這麼久。

2022年6月29日 星期三

事件溯源(2):實作Event Sourced Aggregate

June 28 10:19~12:19


   ▲圖1:ezKanban簡化版的domain model

 

練習題目:Tag Aggregate

在領域驅動設計(Domain-Driven Design)中Aggregate(聚合)是一群物件的集合,它們的狀態改變必須在同一個交易中完成。也就是說Aggregate形成交易邊界,也是DDD中儲存狀態的最小物件單位。Aggregate的狀態交由Repository設計模式來儲存與讀取,一個Aggregate type對應到一個Repository

圖1是ezKanban系統core domain的領域模型,一共有四個aggregate,今天要用Event Sourcing的方式實作最簡單的Tag aggregate。

***

CreateTagUseCase Test

Teddy上【領域驅動設計與簡潔架構入門實作班】的時候採用TDD串起DDD、Event Storming與Clean Architecture的開發流程,要撰寫Tag aggregate依照慣例先幫它寫驗收測試案例,從CreateTagUseCaseTest開始。

圖2為CreateTagUseCase的驗收測試案例,Tag的屬性有tag id, board id, name, color這四個欄位。第37行執行完畢後一個新增的Tag會透過TagRepository被加入資料庫中。第29行透過CreateTagUseCase的建構函數將TagRepository注入給CreateTagUseCase。此時尚不需要決定TagRepository的實作細節,因此先用一個InMemoryTagRepository來「欺騙」CreateTagUseCase。

測試案例剛寫好的時候會有語法錯誤無法編譯,這是因為production code還沒寫,接下來撰寫CreateTagUseCase來消除語法錯誤。

 

▲圖2:CreateTagUseCase驗收測試案例 

***

CreateTagUseCase程式碼如圖3所示,很簡單就是新增一個Tag然後把它存入repository。同樣的,此時由於Tag尚未撰寫因此會有語法錯誤無法編譯程式。

 


▲圖3:CreateTagUseCase 

***

接著撰寫Tag單元測試,如圖4。

 

▲圖4:Tag單元測試 

 

最後撰寫Tag,圖圖5所示,它繼承ezKanban內建的AggregateRoot類別,第22行程式產生TagCreated領域事件代表Tag狀態改變。如果不管Event Sourcing,CreateTagUseCase使用案例就寫好了。但是因為Tag想要套用Event Sourcing,所以程式撰寫方式要加以修正。


▲圖5:沒有支援Event Sourcing的Tag aggregate 

 

***

Event Sourced Tag

要讓Aggregate支援Event Sourcing,只要把握以下原則即可:

首先,Aggregate的public Command(改變系統狀態的操作)只負責產生領域事件,然後呼叫定義在AggregateRoot身上的apply方法去套用這個領域事件,如圖6所示。

 

▲圖6:支援Event Sourcing的Tag aggregate建構函數 

 

其次,如圖7所示AggregateRoot的apply方法是一個Template Method,它依次呼叫ensureInvairant、when、ensureInvairant、addDomainEvent。在此先忽略ensureInvairant,接下來的when method是一個Template Method設計模式裡面的primitive operation, 子類別必須實作when用來撰寫事件處理程式(event handler)。addDomainEvnet則是將領域事件先保存在AggregateRoot身上,最後Repository可讀取AggregateRoot的領域事件來儲存狀態。

 

▲圖7:AggregateRoot的apply method 

 

最後,如圖8所示,Tag實作when方法來處理TagCreated領域事件。處理方式很簡單,只要把圖5中原本Tag建構函數的初始化程式碼移到when裡面即可。

 

▲圖8:Tag實作when方法 

***

實作rename方法

可以新增Tag之後,接下來看Tag的rename method,如圖9所示,首先判斷傳入的newName和Tag現有名字是否相等,如果是就直接離開(因為狀態沒有改變),若否則apply TagRenamed領域事件。

 

▲圖9:Tag的rename方法 

***

 

刪除Tag

傳統關聯式資料庫的刪除最簡單的做法就是把該筆資料直接從資料庫刪除(delete),但在Event Sourcing系統中基本上只會append事件不會把事件刪除,所以實作刪除的方式就是在AggregateRoot定義markAsDeleted方法,然後讓它的子類別去實作該方法。如圖10所示,Tag的markAsDeleted方法apply TagDeleted領域事件。Tag最後的when方法處哩TagCreated、TagRenamed、TagDeleted三個領域事件,如圖11所示。

 

▲圖10:Tag的rename方法 

 


▲圖11:Tag的when方法,處理三個領域事件 

 

***

從領域事件回復狀態

將Tag改成上述寫法之後,要透過領域事件回復狀態就非常簡單。請參考圖12單元測試案例,第69與71行分別新增TagCreated與TagRenamed領域事件,模擬從資料庫中讀取Tag的領域事件。第74行新增Tag,將剛剛產生的兩個領域事件傳給Tag建構函數。最後驗證tag的狀態是否正確。


▲圖12:Tag的when方法,處理三個領域事件 

 

如圖13所示,Tag建構函數接受一個,List<DomainEvent>,它直接呼叫super,也就是圖14。


▲圖13:Tag建構函數 

 

如圖14所示,AggregateRoot建構函數收到List<DomainEvent>之後,第38行跑一個for each逐一將領域事件傳給apply方法。也就是說「將事件重播(replay)以計算出目前狀態」


▲圖14:AggregateRoot建構函數 

***

下集預告

撰寫好支援事件溯源的Tag Aggregate之後,下集介紹如何將狀態儲存到EventStoreDB (https://eventstore.com) 資料庫中。

***

友藏內心獨白:是不是很簡單。

2022年6月28日 星期二

事件溯源(1):Event Sourcing的好處

June 28 06:30~09:30

▲採用Event Sourcing的系統狀態由重播(重新套用)事件所計算而來 

 

前言

今年下半年Teddy準備開新課程—事件溯源與命令查詢責任分離架構實作班,開課之前先把教材內容整理成文章。這系列文章分成Part 1與Part 2,先介紹Event Sourcing在介紹CQRS。

 

***

 

兩種常見儲存狀態的方式

Event Sourcing,翻譯成「事件溯源」或「事件來源」,是一種儲存狀態的方式。在Event Sourcing流行之前,大部分的開發人員儲存系統狀態的方式稱為State Sourcing(狀態來源)或Domain Sourcing(領域來源)。如圖1所示,State Sourcing採用將系統目前狀態儲存至資料庫中。這種儲存狀態的重點是資料庫中只儲存目前狀態,系統的狀態是直接從資料庫中取得,至於所使用的資料庫是關聯式資料庫或是NoSQL資料庫都可以。

在State Sourcing系統中,狀態改變會直接覆寫物件現有的狀態,例如圖1中Teddy的email如果改成ted@gmail.com則在資料庫中id=001的這一筆資料,其email欄位的值就被改成ted@gmail.com,舊有的值則被覆蓋掉。


▲圖1:以State Sourcing保存系統狀態 

 

***

 

如圖2所示,Event Sourcing在資料庫中保存的不是系統的目前狀態,而是儲存曾經造成系統狀態改變的所有事件。至於系統的目前狀態,則是透過把這些事件從頭到尾重新執行過一次(稱為replay events)所計算出來。儲存事件的資料庫一般稱為Event Store,它可以是關聯式資料庫,例如message-db (https://github.com/message-db/message-db)、NoSQL資料庫,或是專門為Event Sourcing所設計的特殊用途資料庫,例如EventStoreDB (https://eventstore.com)。

在Event Sourcing系統中,基本上採取append only的方式來儲存事件。事件只可寫入Event Store,不可刪除或修改。例如圖2中Teddy的email如果改成ted@gmail.com,則在資料庫中代表Teddy帳戶的event stream會被寫入一筆新的事件:EmailChanged { id=001, email=ted@gmail.com}


▲圖2:以Event Sourcing保存系統狀態 

 

***

稽核

Event Sourcing並不是新的技術,像是銀行的存款帳戶儲存客戶存款金額的方式就是採用Event Sourcing,存摺上面一筆、一筆的交易紀錄(transaction log)就是系統所儲存的事件。.

由於所有針對系統所造成的狀態改變皆以事件的形式儲存起來,因此可以達到稽核的效果。以銀行存款為例,如果銀行採用State Sourcing的方式儲存你的存款餘額,你認為自己在銀行存款餘額還有一百萬,但是銀行的資料紀錄你只剩下一百塊,怎麼辦,以誰的紀錄為準?為了稽核,銀行必須儲存每筆交易紀錄,既使是錯誤的交易紀錄也不能直接刪除,而是要用另外一筆紀錄去沖銷。例如,銀行帳務系統錯誤,不小心轉了五萬到你的戶頭。銀行不能直接從你的戶頭扣掉五萬然後刪除這筆交易紀錄,而是要新增一筆負五萬的交易紀錄來抵銷原本錯誤的交易。

 

***

 

狀態同步

既然Event Sourcing不是新技術,在特定領域中也應用了很長一段時間,為什麼近幾年來這個名詞變得越來越流行?它流行的原因和DDD(領域驅動設計)流行的原因相同,主要受惠於微服務架構的熱潮。

在DDD中,Aggregate狀態改變會產生領域事件(Domain Event),透過領域事件可以在不同Aggregate之間達到狀態最終一致性,這個特性正好可以應用在分散式或微服務架構中作為狀態同步之用。

如圖3所示,如果DDD的Aggregate採用State Sourcing,在儲存物件狀態的時候,除了原本物件的目前狀態以外,還需要儲存領域事件,這兩個動作必須要在同一個交易(transaction)中一起完成,系統狀態才不會錯誤。在微服務物架構中Transactional Outbox設計模式就是用來解決這個問題。

 

▲圖3:State Sourcing保存系統狀態與領域事件必須在同一個交易中 

 

如圖4所示,採用Event Sourcing的系統在儲存領域事件的同時就等於儲存系統狀態,也就避免圖3的問題。

 

▲圖4:Event Sourcing保存領域事件也就同時保存系統狀態 

 

在這裡有一點要注意,不管是採用Transactional Outbox或是Event Sourcing,在這種情況下Event Store同時也扮演簡易Message Bus或Message Broker的功能。也就是Event Store需要支援客戶端去資料庫中讀取領域事件,然後再將領域事件轉發給其他「下游」的Event Handler或微服務,如圖五。

 

▲圖5:Event Store也是Event Bus/Event Broker 

***

 

偏好寫入

Event Sourcing還有寫入快速、簡單、方便的好處。在DDD的情境下採用Event Sourcing,每一個Aggregate的instance在Event Store中會有一個專屬的event stream用來儲存它的領域事件 ,這個event stream通常以Aggregate Type-Aggregate ID的格式來命名,例如一個Account Aggregate instance,它的id等於3104ca15-df4a-4878-9342-b6d6d650b4cc,那麼在Event Store中就會新增一個Account-3104ca15-df4a-4878-9342-b6d6d650b4cc的event stream,用來儲存該Account instance的領域事件,如圖6所示。

 

▲圖6:EventStoreDB的Stream Browser畫面

 

相較於使用關聯式資料庫需要將物件結構轉成關聯式表格(Object Relational Mapping;ORM),Event Sourcing只需儲存領域事件,省去ORM繁瑣的設定。另外,由於寫入資料一次只針對一個Aggregate,所以也不會有採用關聯式資料庫在寫入時可能需要鎖定多個表格的現象,因此「理論上」Event Sourcing的寫入效能會比較高。

 

***

下集預告

關於Event Sourcing的基本觀念先介紹到這裡,下集就要開始寫程式,以ezKanban的Tag Aggregate為例子,說明撰寫Event Sourced Aggregate以及其相對應Repository的步驟。

 

***

友藏內心獨白:終於又開工了。