l

2022年7月26日 星期二

事件溯源(19):在InMemoryRepository實做樂觀鎖

July 26 15:52~16:39

▲圖1:樂觀鎖測試案例

 

前言

這系列文章原本是Teddy為了製作【事件溯源與命令查詢責任分離架構實作班】課程範例而撰寫,課程範例已經完成,這系列文章也已經連載結束。前幾天Teddy回頭把課程範例改成新的寫法,修改之前先跑測試,居然有一個錯誤!

仔細一看才想起來Teddy之前練習的時候注入在測試案例中InMemoryTagRepository,它沒有支援樂觀鎖,所以原本的樂觀鎖定測試案例會失敗,只要換回正常的Repository就好了,這個問題以前也遇到過。

但是這次Teddy突然想到:「為什麼InMemoryRepository不能支援樂觀鎖?」花了幾分鐘改一下Code,測試案例就通過了。今天就追加一篇,談如何讓InMemoryRepository支援樂觀鎖。 

***

實作樂觀鎖

Teddy在<事件溯源(7):樂觀鎖>中介紹過如何在關聯式資料庫與事件溯源資料庫實作樂觀鎖,基本上就是要在Aggregate身上加一個Version欄位,每次儲存Aggregate的時候比對它身上Version的數值與資料庫中的數值是否相等。如果相等,就代表這個Aggregate上次從資料庫讀出之後並沒有其他人寫入,因此它目前的版本是最新的,可以直接儲存到資料庫中。反之,則代表目前Aggregate的版本比較舊,無法儲存,系統要丟出樂觀鎖定失敗例外。

請參考圖1測試案例,從tagRepository根據相同的tagId拿出tagV1與tagV2兩個相同的物件。先把tagV1改名後儲存起來,接著再儲存tagV2,此時tagV2身上的Version數值會小於tagRepository所儲存的數值,因此會丟出RepositorySaveException例外。

 

首先修改InMemoryTageRepository的findById方法,如圖2所示。原本InMemoryTagRepository將Tag儲存在List裡面,findById回傳的記憶體中Tag的參考(reference)。這種直接回傳記憶體參考物件無法測試樂觀鎖,因為圖1中tagV1和tagV2會參考到同一個tag,也就是說改了tagV1會同時改變tagV2的值。所以findById要改成回傳一個新的Tag物件,而不是原本Tag物件的參考。


▲圖2:修改InMemoryTagRepository的findById方法以支援樂觀鎖

  

其次修改InMemoryTagRepository的save方法,如圖3所示。如果要儲存的tag已經存在InMemoryTagRepository,而且它的版本不等於記憶體中的版本,則丟出RepositorySaveException。反之,先把tag從記憶體中移除(如果不移除,相同的Tag會出現在InMemoryTagRepository兩次),然後將它的版本加1,然後把它儲存起來,最後清掉tag身上的領域事件。


▲圖3:修改InMemoryTagRepository的sava方法以支援樂觀鎖

 

就這樣,這麼簡單。

 

***

結論

本集介紹如何讓InMemoryRepository也具備樂觀鎖,但在這裡Teddy實作的InMemoryTagRepository只支援State Sourcing的儲存方式,並沒有支援Event Sourcing。如果是要實作InMemoryEventSourcingRepository,基本上也不會太困難,應該只需要:

  • 把資料結構由List改成Map<String, List<DomainEvent>>,Map的Key是Event Stream Name,Value是Aggregate的領域是件。至於Aggregate的版本就是List<DomainEvent>的大小。
  • 在儲存Aggregate的時候,不需要更新版本號碼,因為讀取(fndById)的時候Aggregate的版本號碼就是它所屬的List<DomainEvent>的大小。

InMemoryEventSourcingRepository的實作就交給鄉民自行練習。

 

***

友藏內心獨白:這一集算番外篇。

2022年7月19日 星期二

《Clean Architecture實作篇:在整潔的架構上弄髒你的手》程式碼分析

July 19 09:07~10:58

▲圖1:《Get Your Hands Dirty on Clean Architecture》程式範例目錄結構

 

前言

2021年9月Teddy幫《Get Your Hands Dirty on Clean Architecture》這本書寫了書評(請參考<【還少一本書】Get Your Hands Dirty on Clean Architecture>),前陣子博碩出版社告知這本書中文版將於今年八月上市,請Teddy幫它寫推薦序。

一年多前讀過的英文版,書中有些細節已經不太記得,這幾天花了點時間把中文版用力讀過一次。這本書中文版翻譯得很好,對Clean Architecture有興趣的朋友可以參考。昨天Teddy在部落格文章<再談Clean Architecture實作>提到《Clean Architecture》書中圖22的問題,今天要談《Get Your Hands Dirty on Clean Architecture》這本書程式範例中「不那麼乾淨」的問題。

 

***

程式範例結構

Get Your Hands Dirty on Clean Architecture》的程式範例在此:https://github.com/thombergs/buckpal,鄉民們可以自行下載閱讀。

首先分析程式結構,請參考圖1。可以看出來程式碼依照書中的建議,先package by feature再package by layer。account這個package就代表一個feature(功能),因為這本書的範例程式規模很小,只示範在不同帳戶之間的轉帳功能,所以從目錄結構上看起來會覺得package by feature的味道很薄,但它真的有package by feature。

至於account裡面的domain, application, adapter相當於Clean Architecture的entity layer, use case layer, interface adapter layer。放在package最外層的BuckPalAccplication就是Clean Architecture所說的Main Component。

***

Entity Layer

接著看到Entity Layer,也就是這本書的domain layer程式碼,請參考圖2。在這一層有四個物件:Account, Activity, ActivityWindow, Mondy,書中並沒有套用Domain-Driven Design(DDD;領域驅動設計),也就是說在Entity Layer沒有Aggregate。但是從DDD的角度來看,Account似乎可以當成AggregateRoot。

但這不是重點,重點是作者在Entity Layer用了lombok這個框架用來自動產生getter/setter/constructor等。嚴格講起來在最核心的Entity Layer是不應該相依於外部工具與框架,但lombok在Java社群中是非常流行的工具,使用它可以少寫很多煩人的程式碼。lombok以annotation的形式存在程式碼中,相對而言是比較輕微的入侵。

對於框架的使用,在Clean Architecture書中有提到,使用框架之後你的系統就跟這個框架結婚。新婚的時候可能很快樂,但如果不幸日後鬧翻要離婚,那離婚手續可就很麻煩,你的財產甚至要分對方一半。以ezKanban為例,Teddy並沒有使用lombok,但為了自動序列化/反序列化將物件與JSON互轉格式,在少數Jackson無法自動判別的類別身上還是貼了Jackson annotation,如圖3所示(Jackson是一個處理JSON的工具)。

所以,如果可能在Entity Layer儘量不要使用外部框架或工具,如果真的要用,也要有廝守一輩子的心理準備。



 ▲圖2:Account程式範例

 

▲圖3:ezKanban在Entity Layer的類別上使用jackson的annotation

***

 

Use Case Layer

Use Case Layer在書中稱為application layer,請參考圖4。《Get Your Hands Dirty on Clean Architecture》書中套用六角形架構,因此在application layer底下有兩個子package:

  • port:存放application layer對外層依賴反轉的介面,又分為in port和out port。in/out的區分是從application layer的角度來看,如果這個介面是讓外層(例如web controller)用來呼叫內層,它就是一個in port(由外往內);如果它是讓application layer的物件呼叫外部服務的介面,例如用來存取資料庫的repository,這就是一個out port(由內往外)。
  • service:實作port的物件稱為service,例如實作SendMoneyUseCase介面的物件叫做SendMoneyService,你也可以把它叫做SendMonyUseCaseImpl,看你喜歡何種命名方式。



 ▲圖4:Use Case Layer (Application Layer)結構

 

接下來Teddy要開始挑毛病了,請參考圖5,SendMoneyUseCase是書中主要用來當作範例解釋的使用案例,它只有一個sendMoney方法,輸入參數是SendMoneyCommand(請參考圖6),輸出為boolean。

 

▲圖5:SendMoneyUseCase程式碼


 

▲圖6:SendMoneyCommand程式碼

 

把圖5與圖6對照到Teddy昨天<再談Clean Architecture實作>畫過的圖7:

  • Input Port:SendMoneyUseCas
  • Input Data:SendMoneyCommand
  • Output Data:boolean

 

看到這裡鄉民有沒有發現什麼問題?


▲圖7:Teddy修正《Clean Architecture》書中圖22之後的結果

 

請參考圖8,在《Clean Architecture》書中提到跨層的資料結構通常是簡單的資料結構,SendMoneyCommand是跨越application layer(use case layer)與adapter layer的物件,但是它身上的屬性卻有AccoundId, Money這兩個位於Entity Layer的Value Object。也就是說Entity Layer的物件透過SendMoneyCommand傳遞到第三層,這麼做雖然沒有違反《Clean Architecture》的依賴原則(相依性由外往內),但是卻違反了跨層原則,這個由SendMoneyUseCase所形成的Input Port(Input Boundary),不是一個完整的雙向介面。

在《Clean Architecture》書中提到理想上介面應該是雙向隔離,一開始要採用單向隔離的介面也可以,日後再隨需要調整成雙向介面。在目前的ezKanban中,Entity Layer的物件傳遞離開Use Case Layer之前一定都經過轉換,往UI層轉成DTO物件,往資料庫層轉成Data物件,領域事件傳遞到其他Bounded Context則是轉成RemoteDomainEvent物件,

 

▲圖8:《Clean Architecture》中文版第172頁

***

Interface Adapter Layer

最後看到位於Interface Adapter Layer的SendMoneyController程式碼,如圖9所示。可以很清楚看出來,SendMoneyController在第26行產生一個SendMoneyCommand物件,然後位於Entity Layer的AccountId與Money物件也被第三層(Interface Adapter Layer)的SendMoneyController給參考到。如同Teddy在上一小節所提到的,SendMoneyCommand是Input Port介面上的資料結構,它應該使用基本資料型別就好,不要使用Entity Layer的物件,以免造成系統架構不乾淨

 

▲圖9:SendMoneyController程式碼

***

結論

Get Your Hands Dirty on Clean Architecture》是一本好書,但如同所有好書一樣,讀書時必須抱持著「盡信書不如無書」的態度,如此才可深入閱讀並增進自己的思考能力。

書中範例其實還有其他問題Teddy也沒時間逐一指出,例如範例程式包含一個GetAccountBalanceService程式,請參考圖10。Teddy以為可以看到CQRS裡面的Query範例,但是這個程式有介面(GetAccountBalanceQuery)也有實作(GetAccountBalanceService),但卻沒有使用它的Controller(範例程式中沒有任何人使用到GetAccountBalanceService)。請注意它的介面回傳Money,一個位於Entity Layer的物件。它有被轉成DTO往UI傳遞嗎?誰來做這個轉換?《Clean Architecture》書中的Presenter怎麼實作?這些問題都沒有包含在程式範例裡面,鄉民們看完之後可能還是不知道怎麼做。

 

▲圖10:GetAccountBalanceService程式碼

 

最後打個廣告,如果想知道完整又乾淨的《Clean Architecture》架構與實作方法,歡迎參加【領域驅動設計與簡潔架構入門實作班】。

***

友藏內心獨白:程式碼很少的時候都看不出問題。

2022年7月18日 星期一

再談Clean Architecture實作

July 18 18:20~19:41

▲圖1:《Clean Architecture》書中最近接實作的一張圖

 

前言

圖1是《Clean Architecture》書中最近接實作的一張圖,雖然Teddy一開始實作Clean Architecture主要就是參考這張圖,但Teddy一直覺得這張圖畫得有問題。一開始只看出幾個很明顯的問題,例如:

  • 分層不清楚:Data Access到底在第三還是第四層?如果是第三層,為什麼違反依賴原則直接存取資料庫?如果是第四層,為什麼直接跨層實作在第二層的Data Access Interface?
  • 誰來維持跨層原則:Data Access Interface直接參考最內層的Entity,那麼當Entity離開Use Case Layer往外傳的時後是誰負責轉換?
  • 用語不一致:這張圖過於強調 Use Case Interactor與Input Boundary, Output Boundary以及Controller和Presenter彼此之間的關係,忽略了其他Boundary。例如,Data Access Interface應該也是一種Output Boundary,但卻沒有用Output Boundary這個名稱而是用Data Access Interface這種很特定的稱呼。
    這個現象導致讀者可能以為Output Boundary與Data Access Interface是兩種不同的概念,但實際上依據六角形架構的講法,他們應該都是Output Port。

***

因為要幫《Clean Architecture實作篇:在整潔的架構上弄髒你的手》這本翻譯的新書(預計八月出版)寫推薦序,Teddy昨天用力把這本書看完。這本書英文版一年多前Teddy就讀過,還寫了書評:<【還少一本書】Get Your Hands Dirty on Clean Architecture>。但過了一年多書中有些細節已經不復記憶,昨晚將書中的做法與圖1仔細對照,突然覺得Teddy之前被圖1給制約了,ezKanban目前套用Clean Architecture的方式還可以進一步優化。今天就來談如何簡化Clean Architecture的實作。

***

 

更多地雷

除了上述提到的問題以外,在《Clean Architecture》中,關於Use Case Layer與外界溝通的介面,使用了如圖1中的Input BoundaryOutput Boundary的術語,但是在書中圖22.1(請參考圖2)右下角又使用Use Case Input PortUse Case Output Port這個與六角形架構中比較接近的用語(使用port)。但不管圖1還是圖2,這種Use Case Interactor與Use Case Input Port和Use Case Output Port之間關係的表達方式,很容易讓人誤會以下幾點:

  • 看圖1與圖2會讓人以為實作Use Case Output Port的物件就只有Presenter,但實際上一個Use Case Interactor可以使用任意個Output Port。例如,將物件儲存到資料庫中也是一種Output Port。
  • Use Case Input Port到底是什麼?是Command嗎?還是Use Case本身的介面?
  • 在圖1中,感覺Input Boundary只需要定義Input Data,Output Boundary則是定義Output Data。但以一個介面而言,介面上會包含輸入資料(Input Data)與回傳資料(Output Data),例如 int getFileSize(File file)這個介面,其中File就是Input Data,int就是Output Data。

 

▲圖2:《Clean Architecture》書中圖22.1

 

***

看圖說故事

圖3是Teddy將圖1重新修正後的版本,重點如下:

  • 介面就只有兩種:Input Port(Input Boundary)與Output Port(Output Boundary)。
  • 每一個Input Port與Output Port都有Input Data與Output Data(宣告在介面上的輸入與輸出資料)。
  • Presenter不需要實作Output Port,它只要接收Input Port的Output Data就可以建出Read Model。讓Controller直接與Presenter產生耦合,將Input Port的回傳資料(Output Data)直接傳給Presenter,Use Case Interactor不需要注入Presenter。
  • 在套用CQRS的情況下,Projection (一種Output Port) 可以直接產出前端所需要的 View Model,不需再經過Presenter。
     

 

▲圖3:修正後的圖1

***

程式範例

Teddy花了四個小時把【領域驅動設計與簡潔架構入門實作班】(https://teddysoft.tw/courses/clean-architecture/) 的課程範例改成新的寫法,覺得程式碼又更乾淨了一些。接下來將程式範例與圖3對照,首先參考圖4,它是ezKanban中CreateCardUseCase的介面宣告,相當於圖3的Input Port。其中CreateCardInput是Input Data,CqrsCommandOutput是Output Data。



 ▲圖4:Input Port

 

圖5為package結構,Input Port的檔案放在in package中。CqrsCommandOutput因為是系統核心共用物件,所以沒有出現在圖5裡面。另外,CreateCardUseCaseImpl就是圖3的Use Case Interactor,也有人會取CreateCardUseCaseService這樣的名稱。


▲圖5:Package結構

 

CreateCardUseCaseImpl程式碼如圖6所示,它實作Input Port(CreateCardUseCase),也使用了一個Output Port(CardRepository)。


▲圖6:CreateCardUseCaseImpl程式碼

 

參考圖3的架構,再搭配程式範例,Teddy覺得比原本《Clean Architecture》書中提到的做法要具體很多。


***

 

工商服務

領域驅動設計與簡潔架構入門實作班】課程招生中,本梯次內容將採用最新簡化過的Clean Architecture實作方式做為課程範例,歡迎舊雨新知多加利用。

***

友藏內心獨白:持續改善不是嘴巴講講而已。

2022年7月15日 星期五

事件溯源(18):實做Idempotent

July 9 14:20~16:20


      ▲NotifyBoard實做Idempotent架構圖

 

前言

Teddy在<事件溯源(16):分散式系統的事件語意與Idempotent>介紹過為什麼Event Handler需要具備Idempotent,這集以ezKanban系統中產生GetBoardContent的Event Handler—NotifyBoard為例(請參考<事件溯源(10):實作Projector>),介紹如何實做Idempotent。

 

***

記住你做過的事

實做Idempotent可以從兩個方向著手:

  • 操作本身即是Idempotent:如果一個系統的操作本身就是Idempotent,哪麼Event Handler就不需要特別處理看過的事件,只要確定事件順序正確,收到事件之後閉著眼睛執行一次即可。例如,delete操作本身是Idempotent,收到CardDeleted事件(卡片被刪除)直接套用一次即可。就算重複執行相同的CardDeleted也不會造成系統狀態錯誤。
  • 記憶已處理過的事件:在一般通用系統中,不太容易把所有系統操作都設計成具備Idempotent特性,因此Event Handler需要紀錄它曾經處理過的事件代號,然後每次收到事件之後要去查看該事件是否已經處理過了。如過是,則丟棄該事件;若否,才處理該事件然後把事件代號紀錄下來。這裡有兩個地方要注意,首先事件代號需要唯一,不可重複,才可判斷是否曾經處理過。其次,處理事件造成的系統狀態改變和儲存事件這兩件事,必須要在同一個交易(transaction)中完成,否則可能造成系統狀態改變但卻沒有把處理過的事件紀錄下來,這樣下次再收到相同事件變會重新執行一次,就沒有達到Idempotent。

在本文中Teddy要採用第二種方式實做Idempotent。

 

***

先看測試結果

鄉民們讀到「在同一個交易中儲存系統狀態改變與事件代號」這句話,是不是有種似曾相似的感覺?沒錯,這和Teddy在<事件溯源(4):將Aggregate儲存至Outbox Store>介紹過的方法是類似的。

圖1是NotifyBoardContent的測試案例,產生一個Board,一個Workflow,三個Stage,然後新增一張卡片。

 

▲圖1:NotifyBoardContent測試案例

 

圖1測試案例所投影出的BoardContentViewModel如圖2所示。

 

▲圖2:BoardContentViewModel(JSON檔案)

 

除了在資料庫投影出BoardContentViewModel,因為NotifyBoardContent支援Idempotent,所以資料庫的Idempotent表格同時也紀錄了NotifyBoardContent所處理過由測試案例所產生的七個事件,如圖3所示。這七個事件分別是:BoardCreated、BoardMemberAdded、WorkflowCreated、StageCreated、StageCreated、StageCreated、CardCreated。

 


▲圖3:資料庫Idempotent表格紀錄Event Handler讀過哪些事件


***

 

實作

NotifyBoardContent的project方法如圖4所示,57行縮起來的switch敘述是負責投影的程式邏輯,這邊要關注的是:

  • 第53~54行:呼叫boardContentStateRepository的isEventHandled方法判斷領域事件是否已經處理過,如果已經處理過就直接return。
  • 第241行:更新boardContentState身上的IdempotentData資料結構,它用來記錄Event Handler正在處理哪一個領域事件,如圖5所示。
  • 第242行:把boardContentState儲存到資料庫。boardContentStateRepository.save方法會在同一個交易中將boardContentState儲存在board_content表格中(圖2的那個JSON檔案),以及將IdempotentData儲存在idempotent表格中,如圖6所示。

 

▲圖4:NotifyBoardContent的project方法

 

 

▲圖5:IdempotentData類別

 

 

▲圖6:儲存boardContentViewData與IdempotentData要在同一個交易中完成

 

***


好像沒有很難?

看完上面實作方法,感覺要讓Event Handler達到Idempotent好像沒有很難。但是,還是有一些實做細節需要考慮。在ezKanban中,boardContentViewData與IdempotentData都被存放在PostgreSQL資料庫,因此可以用關聯式資料庫的transaction確保這兩個操作的狀態一致性。但如果鄉民將read model儲存在NoSQL資料庫,例如document-based NoSQL資料庫,這種資料庫不一定會提供「跨document」的交易控制功能,這時候就可能需要把處理過的事件編號儲存在代表read model狀態的document身上。

 

***

 

第一季結束

這一系列寫到這裡也就差不多了,Event Sourcing與CQRS的重點還有實做細節都交代過,之後如果還有想到什麼再補充說明。

 

***

友藏內心獨白:感謝收看。

2022年7月14日 星期四

事件溯源(17):讀取手刻Event Store所儲存的事件

July 7 18::09~19:21;21:15~23:23;July 8 13:45~16:27

▲在Event Store儲存Checkpoint

 

前言

雖然使用EventStoreDB這種專為Event Sourcing與CQRS所設計的資料庫可以減少許多開發工作,但實務上開發人員可能因為公司要求或專案限制,只能使用關聯式資料庫。在這種情況下,就必須要自己用關聯式資料庫模擬Event Store。

Teddy在<事件溯源(4):將Aggregate儲存至Outbox Store>介紹過如何使用關聯式資料庫同時儲存傳統ORM的表格資料與領域事件,但還沒談到如何讀取這些事件的方法,今天就來介紹這個議題。

 

***

保證事件的儲存順序

Teddy以ezKanbna使用的Message DB這個開源軟體(https://github.com/message-db/message-db)為例,介紹它如何在寫入時確保所有事件的順序。圖1是Message DB用來建立儲存事件表格的指令,第7行global_position欄位在每次新增一筆資料的時候帶入該欄位目前最大的數值加1(簡單想成這是一個自動增加的欄位),透過這個欄位來維持事件的順序。

沒了,就這麼簡單。

 

▲圖1:Message DB產生儲存事件表格的指令

 

***

至少一次(At least once)

Message DB本身只包含使用PostgreSQL當成Event Store所需的程式碼,並沒有提供客戶端程式,使用者必須要自行撰寫,也就沒有直接支援at least once。Message DB原本屬於Eventide Project裡面的一個模組(子專案),Eventide是支援Ruby語言的Event Sourcing與Pub/Sub開源軟體,其使用方法可參考Eventide官方文件

Eventide的Consumer程式是用Ruby開發,Teddy沒有用過,從它的官方文件(圖2)也看不出來是否有提供at least once的功能。圖2第4點提到:「Consumer會定期將客戶端讀取的位置自動寫入到後端」,在這種情況下,假設後端已讀取資料但尚未處理,但Consumer卻將客戶端讀取的資料位置寫入後端(代表資料被讀走),這樣可能會造成訊息遺失。

 

▲圖2:從Eventide的文件看不出來是否有支援at lease once

 

***

 

先不管Message DB的「原生家庭」Eventide提供的Ruby客戶端程式否有支援at least once,Teddy在此說明如何做到at least once的常見方法。在《Enterprise Integration Patterns》提到的方式是使用Transactional Client設計模式,它的概念就是在上一集<事件溯源(16):分散式系統的事件語意與Idempotent>中Teddy介紹的Pulsar做法,客戶端確定事件處理完畢之後要向伺服器發出ack,類似資料庫的commit指令,完成這筆交易,如圖3所示。

 

▲圖3:Consumer向Server發出ack之後被讀出的事件才會從Topic刪除

 

***

 

如果是自己實作讀取資料庫中事件表格的驅動程式,要怎麼做出類似效果?做法也不難,只要針對每一個Consumer儲存一個checkpoint代表它目前讀取到第幾個事件。當Consumer把事件讀走且處理完畢之後,再把這個checkpoint加1(如果是批次處理事件則可以一次加N),代表它已經讀走某個事件了。事件並沒有真的從資料庫中被刪除,而是用checkpoint的數值代表每一個Consumer讀取的位置。

基本上checkpoint就好像讀取陣列時所使用的index,一個event stream同時間可支援多個Consumer讀取,每一個Consumer讀取的進度(位置)都不相同。所以每個Consumer要取唯一的名字,用這個名字當作checkpoint的名字來記錄每個Consumer的讀取進度

採用這種作法,如果要重讀整個stream,只要把checkpoint刪掉就可以了。很簡單,對不對。

現在問題來了,這個checkpoint要存在哪裡?參考EventStoreDB的官方文件作法,可以把checkpoint存在server端或client端,形成兩種不同的subscription(Consumer):

  • Persistent Subscription:在Server端儲存checkpoint,如圖4所示第113行呼叫ack方法就可以標註哪一個事件已經處理完畢並更新Server端checkpoint的數值。但是因為EventStoreDB的Persistent Subscription支援上一集介紹過的Competing Consumer(競爭消費者)且有自動rerety(重送事件)的功能,所以並不保證事件的順序(Consumer收到的事件順序可能和資料庫中儲存的順序不一樣)。EventStoreDB的文件建議,如果要保證順序請使用它的Catch-up Subscription。
  • Catch-up Subscription:Catch-up Subscription並不會在Server端儲存checkpoint,要由Consumer自行保存checkpoint。Consumer在連線到資料庫的時候告訴資料庫要讀取哪一個event stream,以及要從哪一個位置開始讀起,如圖5所示。

 

▲圖4:EventStoreDB的Persistent Subscription在Server端保存checkpoint。

 

▲圖5:EventStoreDB官方網站的Catch-up Subscription指定讀取位置程式範例

 

***


實作Persistent Consumer

講了這麼多,接下來要寫Persistent Consumer將checkpoint存在Message DB。因為是Event Sourcing的系統,所以在資料庫端checkpoint也會存在某個代表Consumer的event stream裡面。請參考圖6,stream_name欄位的值是$$Checkpoint-ezkanban-11,其中「$$Checkpoint-」這個前置字串代表它是一個系統產生用來儲存checkpoint的stream,ezKanban-11則是Consumer的名字。type欄位的值是$System$Checkpointed,代表它是一個產生或更新checkpoint的事件。最後,data欄位儲存 {“position”: 5},代表checkpoint的數值,目前讀到event stream第5個位置。

理論上,因為是Event Sourcing系統,所以每次更新checkpoint的數值應該是寫入一筆新的事件,然後這個checkpoint stream的最後一筆資料就是目前最新的讀取位置。在這裡Teddy採用傳統CRUD的做法,只儲存最新的一筆checkpoint資料。也就是說,更新checkpoint不會寫入一筆新的事件,而是直接更新原有事件的data欄位。

 

       

▲圖6:Checkpoint儲存在代表Consumer的event stream裡面

 

 

圖7是產生PresistentConsumer的程式碼,第50行判斷這個Consumer是否是第一次建立,如果是在第51行呼叫_writeMessage方法產生一個新的event stream並寫入一筆checkpoint=0的資料。

 

▲圖7:Checkpoint儲存在代表Consumer的event stream裡面

 

為了從event stream讀取事件,PresistentConsumer必須定期向Event Store查詢,如圖8所示。第35行到41行取得checkpoint的值,第43行執行一個while(true)迴圈,在45行從指定的checkpoint位置讀取$all stream的事件(這個Consumer的用途是用來監聽所有系統事件)。讀到資料之後就可以處理它們,處理完畢之後第51行呼叫ack方法寫入新的checkpoint位置到資料庫中。這裡有一個實作細節要注意:正常情況下第44行程式在讀取事件的時候會多設定「一次最多讀幾筆資料」的batch size參數,以免萬一event stream裡面有非常巨量的資料,造成Consumer卡住甚至當機(可能用光記憶體)的問題。

若監聽的event stream裡面沒有新的事件,則會直接sleep一段時間(polling interval)再重查一次。

 

▲圖8:PresistentConsumer的run方法

 

圖9為ack程式碼,首先判斷event stream是否存在(第76~79),以及checkpoint的位置是否大於event stream裡面最後一個事件的位置(第81~84),最後檢查stream name不是系統內建的stream。如果都沒問題,就寫入新的checkpoint(第90行)。



▲圖9:ack方法

 

***

 

下集預告

介紹完在如何在伺服器端紀錄checkpoint以達到事件至少傳遞一次,下一集介紹如何實做Idempotent。

***

友藏內心獨白:連載已經接近尾聲了。



2022年7月13日 星期三

事件溯源(16):分散式系統的事件語意與Idempotent

July 6 23:14~24:00;July 7 00:00~01:39

▲快寫到體力不支了 XD

 

前言

鄉民們之所以要採用Event Sourcing與CQRS,很多情況都是為了開發微服務。微服務架構屬於分散式系統,相較於集中式系統,分散式系統具有異質性、容易擴展、比較強健(robust)、容錯性高且系統模組之間的耦合性較低等特性。但相對地,分散式系統的開發也比較複雜且一不小心就容易「出錯」。

在DDD中,Aggregate之間的狀態透過領域事件達到最終一致性(eventual consistency)。相似地,在微服務架構下,各個微服務之間的狀態同步也是透過事件或是訊息達到最終一致性。但在分散式系統中,事件傳遞本身也可能發生遺失,導致接收者收不到事件,也就做不到最終一致性。

本集介紹在分散式系統中為了正確做到最終一致性,事件傳遞與事件處理器(event handler)須具備那些特性。

 

***

保證事件的順序

首先,事件本身的順序不能亂掉,否則事件接收者的狀態一定會出錯。當事件被寫入Event Store的當下,Event Store本身必須保證在同一個event stream裡面事件必須依據發生(寫入時間點)的先後順序排序,這點基本上沒有問題。看到這裡鄉民們可能會想:「事件在Event Store中既然已經排序過,為什麼還會發生順序亂掉的情況?」

請參考圖1,事件在Event Store中一開始順序是正確的,假設這個事件是ezKanban裡面User Management Bounded Context所產生的事件,像是UserCreated、UserRenamed、UserEmailChanged等。在ezKanban中,Kanban Board Bounded Context(ezKanban的Core Domain)需要在畫面上顯示使用者名稱,所以它會聽UserCreated、UserRenamed等領域事件,然後在自己本地端建立一份User資料的快取。因此,User Management Bounded Context會把內部的UserCreated這些領域事件往外傳,寫到Pulsar的Topic A裡面。

假設在Kanban Board Bounded Context裡面,為了「求快」啟動兩隻consumer(event handler)程式同時間去讀取Topic A的資料。這種consumer在訊息導向架構中稱為Competing Consumer(競爭消費者),它們會搶著處理Topic裡面的資料。圖1中的Consumer A和Consumer B從Topic A處理完事件之後會寫另外一筆事件到Topic B。現在Consumer A拿到了1和3這兩個訊息,Consumer B拿到了2和4這兩個訊息,因為它們執行的速度不同,所以處理完之後最後Topic B裡面事件的順序變成2, 1, 4, 3,和原始事件產生的順序不同。



 ▲圖1:Event out of order示意圖

 

如果今天Topic A存放的是轉檔需求,而Topic B存放的是轉檔完成的結果,在這種情況下通常來說事件順序並不重要。但是如果是要透過Event Broker傳遞事件然後希望接收者達到最終一致性,就要注意在事件傳遞的過程中,不要不小心造成事件順序亂掉的情況。

***

至少一次(At least once)

除了事件順序不能錯,另一個關於事件傳遞的要求,就是事件要保證至少會被接收者看到一次(at least once)

為什麼要這麼麻煩?怎麼不規定恰好一次(exactly once)就好?因為做不到。請參考圖2,Consumer把Event 1從Topic A讀出之後,它事情還沒做完就當掉。Consumer重啟之後,因為Event 1已經被讀走,Topic A裡面最新的事件變成Event 2。但是Event 1還沒被Consumer處理,也就是說Event 1從此就從地球上消失。

 

▲圖2:Consumer讀取事件之後,事情還沒做完就當掉

 

***

要解決事件消失的問題也很簡單,就是Consumer讀出事件之後,Topic不會立刻把事件刪除,一直到Consumer處理完畢並通知(ack)Topic,此時Topic才會把事件刪除,如圖3所示。在圖3中,情況1,Consumer讀出事件1後立刻當掉。但因為Consumer沒有ack,所以事件1還存在Topic A。情況2當Consumer重啟之後,又可以讀到一次事件1(這就是at least once,至少一次,至多不限)。情況3當Consumer執行完畢並且ack,事件1才會從Topic A移除。


▲圖3:Consumer通知Topic之後被讀出的事件才會從Topic刪除

 

***

 

Idempotent

現在新的問題來了,ack的作法雖然可以確保事件至少被Consumer收到一次,但當Consumer重複收到同一個事件怎麼辦?例如收到重複的扣款請求?那就真的變成「詐騙集團」常用的台詞:「系統設定錯誤造成重複扣款XD」。

請參考圖4,在情況4中Consumer收到事件1並且已經把事情做完(更新系統狀態),就在它要ack之前,它當掉了,所以沒有ack成功。情況5當它下次重啟,事件1又被處理一次,這樣顯然不OK。因此Consumer(Event Handler)需要具備Idempotent。


▲圖4:事件1被Consumer處理2次

 

Idempotent意指相同操作就算重複執行也不會影響系統狀態。例如,把任何數字乘以1,最後結果還是不變,因此「乘以1」這個操作就是idempotent。在軟體開發中,傳統的CRUD操作,基本上RUD都是idempotent。R不用說,讀取資料N次也不會改變系統狀態,用固定值更新與刪除同一筆資料N次也不會改變系統狀態。但是C(新增)就不是idempotent。

Consumer需要具備idempotent的意思,就是說Consumer可以很神奇地讓兩次相同新增的效果變成一次。怎麼做到?原則上就是讓Consumer把它讀過的事件編號(event id)記錄下來。每次從Topic讀取事件之後,先到自己本地端資料庫查詢這筆事件以前有沒有看過?如果有就直接ack繼續處理下一筆,如果沒有就正常處理,如圖5所示。

圖5中有一個細節要注意,Consumer儲存處理過的領域事件與因為處理該事件所造成的系統狀態更新,這兩的操作必須要放在同一個交易中,否則又可能會發生Consumer改變狀態但來不及紀錄事件,或是先記錄事件但是來不及保存狀態的錯誤狀況

 

▲圖5:儲存處理過的事件以達到idempotent


 

***


好多細節

看到這理請鄉民們回頭看<Consumer事件溯源(10):實作Projector>,Projector是一種Consumer,因此在實作Projector的時候就必須考慮到本集所提到event ordering、at least once以及Idempotent的問題。

▲圖6:Read Model的Projector需要具備Idempotent

 

***

 

下集預告

如果鄉民使用EventStoreDB,它本身既是Event Store也是一個簡易的Event Broker,因此支援event ordering與at least once。Kafka與Pulsar更是強大的Event Broker,當然也有支援。但是,如果鄉民是自己用Rational Database或是NoSQL「手刻Event Store」,那麼就需要自己確保event ordering與at least once。下集談在手刻Event Store的情況下,要如何做到event ordering與at least once。

***

友藏內心獨白:身為Maker一定要自幹Event Store的啦XD。

2022年7月12日 星期二

事件溯源(15):Apache Kafka可以當做Event Store嗎?

July 5 19:02~20:09

圖片擷取自維基百科

 

前言

Teddy剛開始接觸Event Sourcing時,就是使用EventStoreDB,後來在ezKanban中也支援用PostgreSQL當作Event Store。但是在YouTube聽演講或是看網路文章,有時會發現有人把Apache Kafka做為Event Store。前陣子Teddy在看Apache Pulsar的書,Pulsar是和Kafka類似的Event Broker軟體但感覺好像架構又設計的更好一些。當時Teddy就想:「可以拿Pulsar當Event Store嗎?」

Kafka與Pulsar在message-oriented system中被大量使用,它們具有可靠、高效、可拓展與可永久儲存訊息的能力,很自然地會讓人想把它們當成Event Store使用。Teddy在網路上沒有找到Pulsar是否可做為Event Store的相關討論,倒是看到一篇文章<Apache Kafka Is Not for Event Sourcing>。因為不少作Event Sourcing的人可能也都會有這樣的疑問,Teddy今天就介紹這篇文章中提到不適合的兩點理由。

 

***

載入狀態

在Kafka中,訊息放在Topic裡面,然後Topic通常以entity type做為分類。以ezKanban的Core Domain為例,如果用Kafka當Event Store,會有Boards, Workflows, Cards, Tags這四個Topic。假設要讀取出某張Card,需要從Cards Topic讀出所有卡片的所有訊息,然後依據card id過濾出所需卡片的相關訊息。雖然這樣做是可行的,但有點不切實際,可能會有效率低落的疑慮。

如果學習EventStoreDB的做法,一個Aggregate instance享有一個Topic,這麼一來就會有為數非常可觀的Topic在Kafka伺服器上面。姑且不論Kafka能不能支撐大量的Topic,對於subscriber而言,要如何接收的想要的訊息(領域事件)?舉個例子,想要知道所有的CardMoved事件,在EventStoreDB中,只需要從$et-CardMoved這個系統自動投影出來的event stream就可以拿到。但在Kafka的情況下,subscriber需要去註冊所有Card instance的Topic才能拿到所有的CardMoved事件,這有點窒礙難行。

***


一致性寫入

文章中提到第二點不適合的理由是Teddy在<事件溯源(7):樂觀鎖>介紹過樂觀鎖的問題。在併行處理的系統中,為了避免寫入衝突,通常會採用樂觀鎖的機制,在一般的關聯式資料庫或是EventStoreDB都支援。但依據該文章的說法,Kakfa的Topic寫入沒有支援樂觀鎖,因此也不合適做為Event Store使用。

 

***

使用時機

Kafka或Pulsar這種Event Broker不是說在Event Sourcing系統中都不能使用,只是不要拿來當作Event Store。如下圖所示,領域事件還是存在EventStoreDB或是其他類似的資料庫,至於要往外(下游,downstream)傳遞的領域事件,可以轉發至Kafka或Pulsar。

換句話說,把Kafka或Pulsar拿來當作跨Bounded Context或是跨服務的訊息傳遞,不要拿來做Event Store。

 

 

***

 

下集預告

這一集內容雖然比較簡短,但卻是很重要個觀念,因為選對Event Store當你上天堂,選錯Event Store會讓你住套房。下一集回頭介紹一組相關的基礎觀念:在最終一致性(eventual consistency)的情況下,事件必須滿足event ordering與at least once這兩個條件,以及event handler要如何達到idempotent。

***

友藏內心獨白:很想用一個工具打通關。

2022年7月11日 星期一

事件溯源(14):行為版本控制

July 05 21:59~23:12

▲圖1:行為無法控制

 

前言

上一集討論事件版本控制的議題,這一集要討論行為版本控制(Behavior Versioning)。這個問題Teddy也是讀了Gregory Young的《Versioning in an Event Sourced System》才發現,如果沒注意到程式的「行為版本」,在Event Sourcing系統中,可能會因為程式改版(領域事件不變)導致系統狀態錯誤。

 

***

問題範例

Gregory Young的在《Versioning in an Event Sourced System》書中有一個很簡單的例子來說明行為版本控制,在ezKanban中也有類似的例子但比較複雜一些,在這裡Teddy直接借用Gregory Young的例子。如圖2所示,銷售系統的POS Aggregate身上有sell()方法,它apply一個ItemSold領域事件,其中事件的最後一個參數是這筆銷售的金額小計。

 


▲圖2:sell method

 

圖3是ItemSold的event handler,它從領域事件拿出subTotal並將其乘上0.08以計算營業稅(tax是POS Aggregate身上的屬性);程式這樣寫看起來沒什麼問題。

 

▲圖3:計算稅金,1.0版程式

 

有一天政府把營業稅調高,從8%調整成10%,於是你把程式改成圖4。現在問題來了,當你重新載入POS之後,它會replay所有事件,然後假設原本有一個項目subTotal是100,tax採用1.0版程式計算出來的tax是8。但是程式改版之後,用2.0版程式計算出的tax變成了10。但是這筆交易產生的時間點,營業稅還是8%,所以不應該因為程式碼改變,就造成「竄改歷史」的情況。

 

▲圖4:計算稅金,2.0版程式

 

***


解決方法

如圖5所示,解決方法其實很簡單,就是把tax算好並當成領域事件內容的一部分,這樣就可以了。回到領域事件原始定義:「代表系統狀態改變」,所以領域事件內容應該「至少」要儲存「能夠代表本次狀態改變的所有資料。」在這個例子中,「稅率」是會隨著時間改變的數值,會影響tax的金額。因此ItemSold領域事件應該包含計算後的tax,或是只包含taxRate(稅率),之後再依據taxRate去計算tax也是可以。

 

▲圖5:修改後的版本,不會受到程式行為調整而改變聚合狀態

 

***

以上這個例子算是簡單的,在某些情況底下,如果Aggregate呼叫外部服務,也很可能會造成行為版本控制的問題,因而導致無法replay領域事件重現系統狀態。例如,向第三方金流API請款,如果replay領域事件會不會導致重複請款?這些都是要注意的細節。呼叫外部服務導致程式行為在replay變得不可決定(nondeterministic)的解決方法和上面計算稅金的例子類似,在產生領域事件時呼叫外部服務,並且把外部服務的回傳值儲存在領域事件上。如此可以達到確定性重播(deterministic replay)。總而言之,重點就是重建系統狀態所需的所有資料都要儲存在event stream裡面,如此每次replay才會出現相同的結果,如圖6所示。

 

▲圖6:將外部服務回傳結果存入領域事件中

 

Gregory Young在《Versioning in an Event Sourced System》書中還提到更多關於行為版本控制的細節,有興趣的鄉民請自行參考。

***

 

下集預告

在這一系列的文章中,Teddy使用EventStoreDB與PostgreSQL當作Event Store。早期採用Event Sourcing的開發人員有不少採用Apache Kafka當作Event Store。下一集要談是否合適把Apache Kafka當做Event Store?

***

友藏內心獨白:重建犯罪現場真的沒有那麼簡單啊。