l

2022年7月14日 星期四

事件溯源(17):讀取手刻Event Store所儲存的事件

July 7 18::09~19:21;21:15~23:23;July 8 13:45~16:27

▲在Event Store儲存Checkpoint

 

前言

雖然使用EventStoreDB這種專為Event Sourcing與CQRS所設計的資料庫可以減少許多開發工作,但實務上開發人員可能因為公司要求或專案限制,只能使用關聯式資料庫。在這種情況下,就必須要自己用關聯式資料庫模擬Event Store。

Teddy在<事件溯源(4):將Aggregate儲存至Outbox Store>介紹過如何使用關聯式資料庫同時儲存傳統ORM的表格資料與領域事件,但還沒談到如何讀取這些事件的方法,今天就來介紹這個議題。

 

***

保證事件的儲存順序

Teddy以ezKanbna使用的Message DB這個開源軟體(https://github.com/message-db/message-db)為例,介紹它如何在寫入時確保所有事件的順序。圖1是Message DB用來建立儲存事件表格的指令,第7行global_position欄位在每次新增一筆資料的時候帶入該欄位目前最大的數值加1(簡單想成這是一個自動增加的欄位),透過這個欄位來維持事件的順序。

沒了,就這麼簡單。

 

▲圖1:Message DB產生儲存事件表格的指令

 

***

至少一次(At least once)

Message DB本身只包含使用PostgreSQL當成Event Store所需的程式碼,並沒有提供客戶端程式,使用者必須要自行撰寫,也就沒有直接支援at least once。Message DB原本屬於Eventide Project裡面的一個模組(子專案),Eventide是支援Ruby語言的Event Sourcing與Pub/Sub開源軟體,其使用方法可參考Eventide官方文件

Eventide的Consumer程式是用Ruby開發,Teddy沒有用過,從它的官方文件(圖2)也看不出來是否有提供at least once的功能。圖2第4點提到:「Consumer會定期將客戶端讀取的位置自動寫入到後端」,在這種情況下,假設後端已讀取資料但尚未處理,但Consumer卻將客戶端讀取的資料位置寫入後端(代表資料被讀走),這樣可能會造成訊息遺失。

 

▲圖2:從Eventide的文件看不出來是否有支援at lease once

 

***

 

先不管Message DB的「原生家庭」Eventide提供的Ruby客戶端程式否有支援at least once,Teddy在此說明如何做到at least once的常見方法。在《Enterprise Integration Patterns》提到的方式是使用Transactional Client設計模式,它的概念就是在上一集<事件溯源(16):分散式系統的事件語意與Idempotent>中Teddy介紹的Pulsar做法,客戶端確定事件處理完畢之後要向伺服器發出ack,類似資料庫的commit指令,完成這筆交易,如圖3所示。

 

▲圖3:Consumer向Server發出ack之後被讀出的事件才會從Topic刪除

 

***

 

如果是自己實作讀取資料庫中事件表格的驅動程式,要怎麼做出類似效果?做法也不難,只要針對每一個Consumer儲存一個checkpoint代表它目前讀取到第幾個事件。當Consumer把事件讀走且處理完畢之後,再把這個checkpoint加1(如果是批次處理事件則可以一次加N),代表它已經讀走某個事件了。事件並沒有真的從資料庫中被刪除,而是用checkpoint的數值代表每一個Consumer讀取的位置。

基本上checkpoint就好像讀取陣列時所使用的index,一個event stream同時間可支援多個Consumer讀取,每一個Consumer讀取的進度(位置)都不相同。所以每個Consumer要取唯一的名字,用這個名字當作checkpoint的名字來記錄每個Consumer的讀取進度

採用這種作法,如果要重讀整個stream,只要把checkpoint刪掉就可以了。很簡單,對不對。

現在問題來了,這個checkpoint要存在哪裡?參考EventStoreDB的官方文件作法,可以把checkpoint存在server端或client端,形成兩種不同的subscription(Consumer):

  • Persistent Subscription:在Server端儲存checkpoint,如圖4所示第113行呼叫ack方法就可以標註哪一個事件已經處理完畢並更新Server端checkpoint的數值。但是因為EventStoreDB的Persistent Subscription支援上一集介紹過的Competing Consumer(競爭消費者)且有自動rerety(重送事件)的功能,所以並不保證事件的順序(Consumer收到的事件順序可能和資料庫中儲存的順序不一樣)。EventStoreDB的文件建議,如果要保證順序請使用它的Catch-up Subscription。
  • Catch-up Subscription:Catch-up Subscription並不會在Server端儲存checkpoint,要由Consumer自行保存checkpoint。Consumer在連線到資料庫的時候告訴資料庫要讀取哪一個event stream,以及要從哪一個位置開始讀起,如圖5所示。

 

▲圖4:EventStoreDB的Persistent Subscription在Server端保存checkpoint。

 

▲圖5:EventStoreDB官方網站的Catch-up Subscription指定讀取位置程式範例

 

***


實作Persistent Consumer

講了這麼多,接下來要寫Persistent Consumer將checkpoint存在Message DB。因為是Event Sourcing的系統,所以在資料庫端checkpoint也會存在某個代表Consumer的event stream裡面。請參考圖6,stream_name欄位的值是$$Checkpoint-ezkanban-11,其中「$$Checkpoint-」這個前置字串代表它是一個系統產生用來儲存checkpoint的stream,ezKanban-11則是Consumer的名字。type欄位的值是$System$Checkpointed,代表它是一個產生或更新checkpoint的事件。最後,data欄位儲存 {“position”: 5},代表checkpoint的數值,目前讀到event stream第5個位置。

理論上,因為是Event Sourcing系統,所以每次更新checkpoint的數值應該是寫入一筆新的事件,然後這個checkpoint stream的最後一筆資料就是目前最新的讀取位置。在這裡Teddy採用傳統CRUD的做法,只儲存最新的一筆checkpoint資料。也就是說,更新checkpoint不會寫入一筆新的事件,而是直接更新原有事件的data欄位。

 

       

▲圖6:Checkpoint儲存在代表Consumer的event stream裡面

 

 

圖7是產生PresistentConsumer的程式碼,第50行判斷這個Consumer是否是第一次建立,如果是在第51行呼叫_writeMessage方法產生一個新的event stream並寫入一筆checkpoint=0的資料。

 

▲圖7:Checkpoint儲存在代表Consumer的event stream裡面

 

為了從event stream讀取事件,PresistentConsumer必須定期向Event Store查詢,如圖8所示。第35行到41行取得checkpoint的值,第43行執行一個while(true)迴圈,在45行從指定的checkpoint位置讀取$all stream的事件(這個Consumer的用途是用來監聽所有系統事件)。讀到資料之後就可以處理它們,處理完畢之後第51行呼叫ack方法寫入新的checkpoint位置到資料庫中。這裡有一個實作細節要注意:正常情況下第44行程式在讀取事件的時候會多設定「一次最多讀幾筆資料」的batch size參數,以免萬一event stream裡面有非常巨量的資料,造成Consumer卡住甚至當機(可能用光記憶體)的問題。

若監聽的event stream裡面沒有新的事件,則會直接sleep一段時間(polling interval)再重查一次。

 

▲圖8:PresistentConsumer的run方法

 

圖9為ack程式碼,首先判斷event stream是否存在(第76~79),以及checkpoint的位置是否大於event stream裡面最後一個事件的位置(第81~84),最後檢查stream name不是系統內建的stream。如果都沒問題,就寫入新的checkpoint(第90行)。



▲圖9:ack方法

 

***

 

下集預告

介紹完在如何在伺服器端紀錄checkpoint以達到事件至少傳遞一次,下一集介紹如何實做Idempotent。

***

友藏內心獨白:連載已經接近尾聲了。



沒有留言:

張貼留言