l

2022年6月30日 星期四

事件溯源(3):將Aggregate儲存至EventStoreDB

June 29 05:39~09:08


▲ezKanban團隊討論Repository設計的記錄 

 

前言

今天要將上一集寫好的Tag Aggregate儲存到EventStoreDB (https://eventstore.com) 這個專門為Event Sourcing所設計的特殊用途開源資料庫。

***

 

準備環境

首先安裝EventStoreDB,官方已經製作好現成的Docker影像檔,無須安裝執行以下指令便可直接使用:

 

docker run --name esdb-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:latest --insecure --run-projections=All --enable-external-tcp --enable-atom-pub-over-http --start-standard-projections

 

如圖1所示,打開瀏覽器輸入以下網址連上EventStoreDB管理畫面: http://localhost:2113/web/index.html#/dashboard

 

▲圖1:EventStoreDB管理畫面 

 

切換到Stream Browser頁面,如圖2所示。之後儲存至EventStoreDB的Aggregate將會出現在這個頁面。


▲圖2:EventStoreDB的Stream Browser頁面

 

接著安裝EventStoreDB的客戶端驅動程式,Teddy採用Maven管理專案依賴,在pom.xml檔案加入以下依賴:

<dependency>
    < groupId>com.eventstore</groupId>
    < artifactId>db-client-java</artifactId>
    <version>2.0.0</version>
</dependency>

目前EventStoreDB最新版的Java驅動程式是3.0.0,Teddy範例程式使用的是2.0.0版本。

***

 

Repository設計與實作

在領域驅動設計中(Domain-Driven Design;DDD)透過Repository設計模式儲存與讀取Aggregate,圖3是Teddy設計的AbstractRepository介面,所有Concrete Repository皆會實作此介面。

 

▲圖3:AbstractRepository介面   

 

圖4為TagRepository介面,它實作AbstractRepository。


▲圖4:TagRepository介面

 

接下來討論TagRepository的實作。雖然本系列的主題是Event Sourcing,但實務上很多開發人員可能因為公司規定或是個人偏好,還是習慣將資料儲存在關聯式資料庫。因此Teddy希望TagRepository的實作能夠同時支援Event Sourcing以及State Sourcing加上Transactional Outbox

最後的設計圖如圖5,TagRepository有兩個實作,分別是TagEventSourcingRepository與TagOutboxRepository。前者支援Event Sourcing,後者支援State Sourcing加Transactional Outbox。今天先討論TagEventSourcingRepository的實作,下一集再來看TagOutboxRepository。

 

▲圖5:支援Event Sourcing與State Sourcing加Transactional Outbox的Repository架構設計圖   

***

 

TagEventSourcingRepository實作

 

TagEventSourcingRepository程式如圖6所示,基本上它把要做的事情都委託給GenericEventSourcingRepository來完成。


▲圖6:TagEventSourcingRepository程式碼

 

圖7為GenericEventSourcingRepository實作,首先看到26行的findById,它透過eventSourcingStore依據Aggregate stream name從資料庫中讀出所有的領域事件(第28~29行)。由於Teddy套用Clean Architecture,所以寫入與讀取自事件溯源資料庫中的資料有一個統一的介面AggregateRootData

第33~34行將AggregateRootData身上的DomainEventData轉成DomainEvent,然後第36行使用Java Reflection技術產生所指定的Aggregate instance,並將剛剛從資料庫中所讀取的領域事件傳給它。Aggregate的建構函數收到領域事件之後會重播(replay)這些事件以便計算出最新的狀態。最後回傳Aggregate instance,如此便成功從事件溯源資料庫中讀取Aggregate。

第48行的save方法比較簡單,首先在51行將傳入的Aggregate轉成AggregateData,然後第52行透過eventSourcingStore儲存這個AggregateData。儲存完畢第53行後重設Aggregate版本,然後在54行清除Aggregate身上的領域事件(因為Aggregate的狀態已經儲存到資料庫,所以要清除它身上的領域事件,否則相同Aggregate若再儲存一次會儲存重複的領域事件)。


▲圖7:GenericEventSourcingRepository程式碼

 

***

 

EsdbStore實作

以上看了老半天鄉民們可能會問:「我還是沒看到領域事件到底如何存到EventStoreDB啊?」因為ezKanban支援不同的事件溯源資料庫,所以採取事件溯源儲存方式的Repository實作是透過EventSourcingStore介面來操作資料庫(請參考圖5)。

圖8中的EsdbStore類別是ezKanban針對EventStoreDB所提供的EventSourcingStore實作,第14行的EventStoreDBClient類別就是EventStoreDB官方提供的EventStoreDB驅動程式,開發人員最終要透過它跟EventStoreDB連線與讀、寫資料。如果鄉民們沒有ezKanban套用Clean Architecture與支援多種事件溯源資料庫的需求,就可以直接在Repository實作類別中使用EventStoreDBClient操作EventStoreDB,這樣子會簡單很多。

接下來看第16行的save方法,它接受AggregateRootData當作輸入參數。這個類別Teddy剛剛介紹過,它是ezKanban為了符合Clean Architecture的跨層原則,設計用來將Aggregate轉成AggregateRootData再傳給資料庫的類別。第21~28行將AggregateRootData身上的DomainEventData轉成EventDataEventData是EventStoreDB驅動程式所設計的資料結構,它是真正存到EventStoreDB的物件。

在操作關聯式資料庫的時候,開發人員要指定Table(資料表),EventStoreDB操作的對象不是Table,而是event stream(事件流,可以想像成它就是一個append only的檔案)。開啟event stream的時候可以指定開啟模式,在EventStoreDB的驅動程式中以AppendToStreamOptions類別表示,如圖8的第31~38行。

EventStoreDB支援樂觀鎖定(optimistic locking),若第32行aggregateRootData.getVersion() 回傳-1代表產生一個新的Aggregate,第34行expectedRevision的參數設定為ExpectedRevision.ANY即可。在此模式底下寫入資料的時候EventStoreDB不會做樂觀鎖定的檢查。如果不是產生新的Aggregate,如37行所示則將expectedRevision的參數設定為new StreamRevision(aggregateRootData.getVersion())。StreamRevision是EventStoreDB驅動程式用來代表event stream中事件版本的物件,在此直接把aggregateRootData.getVersion() 當成參數傳給它即可。設定了expectedRevision之後,寫入資料的時候EventStoreDB就會啟動樂觀鎖定檢查。

最後,第39~41行透過EventStoreDBClient類別的appendToStream方法把資料寫入資料庫中。


▲圖8:EsdbStore程式碼

 

接下來看圖8第57行的load方法,它從Aggregate instance所屬的event stream中讀取資料(每一個Aggregate instance都有一個專屬的event stream儲存它自己的領域事件)。第63~71行呼叫getResolvedEvents方法(參考圖9)讀取資料,讀出的資料格式是EventStoreDB驅動程式所定義的ResolvedEvent類別。

第76行新增aggregateRootData,然後在77~79行將剛剛從資料庫讀出的ResolvedEvent轉成DomainEventData,最後在第84行回傳aggregateRootData。

 


▲圖9:EsdbStore類別的getResolvedEvents方法

 

***

 

執行測試案例

完成TagRepository的整個實作之後,改寫上一集<事件溯源(2):實作Event Sourced Aggregate>的測試案例,把原本InMemoryTagRepository換成TagEventSourcingRepository,如圖10所示。

 


▲圖10:將測試案例中的tagRepository換成TagEventSourcingRepository實作 

 

執行完測試案例打開EventStoreDB管理畫面,看到新增一筆event stream,如圖11所示。

 


▲圖11:從EventStoreDB管理畫面看到剛剛新增的event stream

 

點開這筆event stream,看到裡面有一筆資料,它的型態的TagEvents$TagCreated事件,資料內容採用JSON格式儲存,如圖12所示。



▲圖12:用EventStoreDB管理畫面觀看event stream的內容 

 

***

下集預告

下一集介紹用State Sourcing加Transactional Outbox方式實作TagRepository。

 

***

友藏內心獨白:這一篇居然寫了這麼久。

沒有留言:

張貼留言