[BEE-19053] Outbox 模式與交易式訊息傳遞
INFO
Outbox 模式解決了雙重寫入問題——無法在單一操作中同時原子性地更新資料庫並向訊息代理發佈訊息——其方式是將事件與業務資料寫入同一筆資料庫交易中的「outbox」資料表,再由獨立的中繼行程(relay)將其發佈至代理。
背景
事件驅動微服務中一個反覆出現的故障模式:服務成功更新資料庫,然後嘗試向訊息代理發佈事件。代理呼叫失敗。資料庫現在反映的狀態沒有任何下游消費者知曉。事件就此遺失。反之,發佈成功但資料庫寫入失敗或被回滾。下游服務已處理了一個實際上不存在的交易的事件。兩種結果都會破壞系統狀態的一致性。
這就是雙重寫入問題:服務必須寫入兩個獨立的系統——資料庫和訊息代理——而沒有任何分散式交易能同時跨越這兩者提供 ACID 保證。「重試」這個直覺解法並不奏效:若服務在發佈後、資料庫提交前崩潰,重試將產生重複事件;若在資料庫提交後、發佈前崩潰,事件將被靜默遺失。
Martin Kleppmann 在《Designing Data-Intensive Applications》(2017 年)第 11 章中明確點出此故障模式:在沒有協調機制的情況下對獨立系統進行雙重寫入,會產生難以重現、幾乎無法透過標準監控偵測的微妙一致性錯誤。
Outbox 模式由 Chris Richardson 在其 microservices.io 模式目錄中正式整理並推廣,透過將兩系統寫入轉為單系統寫入來解決此問題。服務將業務資料和 outbox 記錄在同一個資料庫交易中寫入。由於 outbox 資料表在同一個資料庫中,這次寫入是原子性的:業務資料列和 outbox 記錄要麼都提交,要麼都不提交。然後,一個獨立的行程——中繼器或發佈者——再從 outbox 資料表讀取並獨立發佈至訊息代理。中繼器可重試且具有崩潰安全性,因為 outbox 會持久保存直到事件被確認已發佈。
此模式提供至少一次交付(at-least-once delivery):中繼器可能在發佈事件後、將其標記為已發送前崩潰,導致下次中繼器運行時再次發佈。消費者必須設計為能處理重複訊息——這一要求同樣適用於所有基於訊息的系統(BEE-10007)。
設計思考
兩種中繼策略
從 outbox 讀取並發佈至代理的中繼行程可以用兩種方式實作:
輪詢中繼:背景執行緒或排程工作查詢 outbox 資料表以獲取未發佈的事件,發佈後將其標記為已發送。實作簡單,適用於任何資料庫。缺點:輪詢會增加資料庫的讀取負載,並引入等於輪詢間隔(通常為 1–5 秒)的延遲。在高事件率下,輪詢可能成為瓶頸。
基於 CDC 的中繼(高吞吐量推薦):CDC 系統(Debezium、AWS DMS)監聽資料庫的預寫日誌(WAL)以追蹤 outbox 資料表的插入操作,並在它們出現時立即發佈至代理。無輪詢迴圈;事件在提交後的毫秒內即被捕獲。Debezium 的 Outbox Event Router 是一個 Kafka Connect Single Message Transform(SMT),它提取事件載荷、路由至正確的主題,並刪除或標記 outbox 記錄——整個過程無需輪詢。代價是:CDC 引入了較高的維運複雜度(WAL 設定、連接器部署)。
| 輪詢中繼 | 基於 CDC 的中繼 | |
|---|---|---|
| 延遲 | 輪詢間隔(通常 1–5 秒) | 毫秒級 |
| 資料庫負載 | 定期讀取 | WAL 串流(低) |
| 複雜度 | 低 | 較高(連接器基礎設施) |
| 排序 | 在輪詢批次內 | 從 WAL 保留 |
| 資料庫需求 | 任何關聯式資料庫 | 需支援邏輯複製 |
Outbox 資料表設計
outbox 資料表是資料庫內部的訊息佇列。其 Schema 必須包含足夠讓中繼器正確發佈事件的資訊:
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL, -- 例如訂單 ID,用於主題路由
event_type VARCHAR(255) NOT NULL, -- 例如 "OrderPlaced"
payload JSONB NOT NULL, -- 序列化的事件資料
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ -- NULL = 未發佈;由中繼器設定
);
-- 輪詢中繼的索引:只掃描未發佈的事件
CREATE INDEX ON outbox (created_at) WHERE published_at IS NULL;中繼器查詢 WHERE published_at IS NULL ORDER BY created_at 並依序發佈。在代理成功確認後,設定 published_at = now()。舊的已發佈記錄可由定期清理工作刪除。
排序保證
在單一 aggregate_id(例如訂單 ID)內,事件應按照插入 outbox 的順序發佈。輪詢中繼透過查詢中的 ORDER BY created_at 實現此目標。CDC 中繼自動保留 WAL 順序,因為資料列以提交順序串流。
跨不同 aggregate ID 的排序保證通常不提供也不需要。支援分區主題的訊息代理(Kafka、AWS Kinesis)在中繼器以 aggregate_id 為鍵路由事件時,自然地保留每個鍵的順序。
最佳實踐
必須(MUST)在與業務資料相同的資料庫交易中寫入 outbox 資料表。 這是此模式的核心不變條件。在交易邊界之外寫入 outbox 會重新引入雙重寫入問題。
不得(MUST NOT)單獨使用資料庫的 LISTEN/NOTIFY 作為中繼機制。 PostgreSQL 的 LISTEN/NOTIFY 通知在 NOTIFY 發出時若無監聽器連接,可能會被丟棄。outbox 資料表提供耐久性;LISTEN/NOTIFY 可作為中繼器的低延遲喚醒信號來補充,但中繼器必須退回輪詢資料表作為後備。
必須(MUST)確保中繼器在代理端具有冪等性。 中繼器可能在發佈事件後、將其標記為已發送前崩潰。重啟後,它將再次發佈該事件。代理和消費者必須處理重複訊息——透過代理層去重(Kafka 的冪等生產者)、消費者層冪等性(BEE-10007),或在載荷中包含唯一事件 ID 供消費者去重。
應該(SHOULD)在每個 outbox 記錄中包含唯一事件 ID,並將其作為訊息 ID 傳播至代理。 大多數代理允許設定訊息鍵或 ID。將其設為 outbox 記錄的 UUID,可啟用代理端去重,並使消費者端冪等性變得簡單:儲存已處理的 ID 並跳過重複項。
應該(SHOULD)在事件吞吐量超過每秒數百個事件,或需要次秒級延遲時,使用基於 CDC 的中繼(Debezium)。 以 1 秒間隔輪詢會引入系統性延遲。CDC 中繼從 WAL 串流讀取,並在提交後的毫秒內發佈,對資料庫的讀取負載極低。
應該(SHOULD)按排程清理舊的 outbox 記錄。 outbox 資料表是暫時性緩衝區,不是永久事件儲存。累積數百萬筆已發佈記錄會增加資料表大小、減慢索引掃描並浪費儲存空間。每日執行 DELETE FROM outbox WHERE published_at < NOW() - INTERVAL '7 days' 可保持資料表精簡。
應該(SHOULD)監控 outbox 資料表中超過閾值仍未發佈的事件。 中繼器中斷或代理故障將導致積壓增長。對 COUNT(*) WHERE published_at IS NULL AND created_at < NOW() - INTERVAL '5 minutes' 超過閾值發出告警。這能在大規模不一致發生之前及早發現中繼器靜默故障。
可以(MAY)為每種聚合類型使用獨立的 outbox 資料表(例如 order_outbox、payment_outbox),以簡化中繼路由和按領域清理。代價是要管理更多資料表;好處是每個聚合可有獨立的中繼設定。
視覺化
範例
業務寫入 + outbox 插入在單一交易中(Python / psycopg3):
import uuid
import json
from psycopg import Connection
def place_order(conn: Connection, order_data: dict) -> str:
order_id = str(uuid.uuid4())
event_id = str(uuid.uuid4())
with conn.transaction(): # 原子性:兩者都寫入,或都不寫入
conn.execute(
"""INSERT INTO orders (id, customer_id, amount, status)
VALUES (%s, %s, %s, 'pending')""",
(order_id, order_data["customer_id"], order_data["amount"]),
)
conn.execute(
"""INSERT INTO outbox (id, aggregate_id, event_type, payload)
VALUES (%s, %s, %s, %s)""",
(
event_id,
order_id,
"OrderPlaced",
json.dumps({
"event_id": event_id, # 傳播以供代理去重
"order_id": order_id,
"customer_id": order_data["customer_id"],
"amount": order_data["amount"],
}),
),
)
return order_id輪詢中繼(Python):
import time
POLL_INTERVAL_SECONDS = 1
BATCH_SIZE = 100
def run_relay(conn: Connection, broker) -> None:
while True:
with conn.transaction():
rows = conn.execute(
"""SELECT id, aggregate_id, event_type, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT %s
FOR UPDATE SKIP LOCKED""", # 防止並發中繼器重複處理
(BATCH_SIZE,),
).fetchall()
for row in rows:
# 以 outbox 記錄的 UUID 作為訊息 ID 發佈
broker.publish(
topic=row["event_type"],
key=row["aggregate_id"],
value=row["payload"],
message_id=str(row["id"]), # 啟用代理端去重
)
conn.execute(
"UPDATE outbox SET published_at = NOW() WHERE id = %s",
(row["id"],),
)
# 交易在此提交:published_at 以原子方式為批次設定
if not rows:
time.sleep(POLL_INTERVAL_SECONDS)Debezium Outbox Event Router 設定(Kafka Connect):
{
"name": "order-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.dbname": "orders_db",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true"
}
}Outbox 記錄清理(透過 pg_cron 每日執行):
-- 刪除 7 天前已發佈的事件
DELETE FROM outbox
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days';
-- 監控未發佈積壓(超過閾值時發出告警)
SELECT COUNT(*) AS unpublished_backlog
FROM outbox
WHERE published_at IS NULL
AND created_at < NOW() - INTERVAL '5 minutes';實作說明
PostgreSQL + Debezium:高吞吐量 outbox 中繼的推薦技術堆疊。啟用 PostgreSQL 邏輯複製(wal_level = logical)。Debezium 的 PostgresConnector 串流 WAL 變更;Outbox Event Router SMT 將 outbox 插入轉換為正確路由的 Kafka 訊息。連接器處理排序、重試和確認。刪除模式(transforms.outbox.delete.handling.mode = rewrite)將刪除操作重寫為墓碑訊息,而非轉發刪除事件。
MySQL + Debezium:使用 MySqlConnector 的等效設定。MySQL 的 binlog 等同於 PostgreSQL 的 WAL。相同的 Outbox Event Router SMT 可跨連接器使用。
MongoDB:MongoDB Change Streams 可作為 outbox collection 的 CDC 來源。Debezium MongoDB 連接器支援相同的 Outbox Event Router 模式,監聽指定的 outbox collection。
SQLAlchemy(Python):將 outbox 插入與業務寫入包裝在同一個 Session 中並一起提交。Session 的工作單元模式確保原子性。
Spring Data / Hibernate(Java):使用 @Transactional 跨越實體儲存和 OutboxEvent 實體儲存。Spring 的 ApplicationEventPublisher 搭配 @TransactionalEventListener(phase = AFTER_COMMIT) 是一種常見模式——但請注意,AFTER_COMMIT 監聽器在交易之外觸發,並未解決雙重寫入問題。正確做法是在交易內部寫入 outbox 記錄,而不是在 AFTER_COMMIT 監聽器中。
相關 BEE
- BEE-19018 -- 變更資料擷取(CDC):CDC(Debezium、基於 WAL)是高吞吐量下 Outbox 模式的推薦中繼機制;outbox 資料表是 CDC 來源
- BEE-10007 -- 冪等訊息處理:Outbox 模式提供至少一次交付;消費者必須實作冪等性以處理重複事件
- BEE-8004 -- Saga 模式:Outbox 模式是以原子方式發佈 Saga 步驟事件與對應資料庫狀態變更的標準實作機制
- BEE-19052 -- 分散式工作流程中的編排 vs 協調:在基於編排的 Saga 中,每個服務透過 Outbox 模式發佈事件,以保證事件永遠不會被靜默遺失