從 CloudEvents 到 Apache Kafka 紀錄,第二部分 ¶
發佈於:2023-04-03 , 修訂於: 2024-01-17
從 CloudEvents 到 Apache Kafka 紀錄,第二部分¶
作者:Daniele Zonca,Red Hat 高級首席軟體工程師,Matthias Weßendorf,Red Hat 高級首席軟體工程師
在本部落格文章中,您將學習如何輕鬆將傳入的 CloudEvents 儲存到 Apache Kafka 主題中,並使用 Knative Broker 和 Trigger API 進行基於內容的事件路由。
這篇文章的第一部分解釋了 Knative 如何幫助將 CloudEvents 擷取到 Apache Kafka 主題以進行進一步處理。該文章展示了如何使用 Apache Kafka 生態系統中的標準工具(例如 kcat
CLI)處理 CloudEvents Kafka 記錄。此外,這篇文章還解釋了 Knative 預設使用的 CloudEvents 二進位內容模式的優點。現在,在這篇文章中,我們展示了一種不同的方法來處理擷取的 CloudEvents,即利用 Knative Broker 和 Trigger API 進行事件路由。
設定 Apache Kafka 和 Knative Broker¶
為了使用 Apache Kafka 的 Knative Broker,您需要先安裝 Apache Kafka。在這篇文章中,我們使用由 Strimzi 提供支援的本機 Apache Kafka 安裝,如這裡所述。該文章還討論了如何為本機開發環境安裝 Apache Kafka 的 Knative Broker。
注意
有關適用於生產環境的 Apache Kafka Knative Broker 組態,請參閱此部落格。
設定 Knative Broker 元件¶
上述文章將所有 Knative Broker
配置為 Kafka
類別的形式,因此為 Apache Kafka 建立新的 Broker 非常簡單
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-demo-kafka-broker
annotations:
eventing.knative.dev/broker.class: Kafka
spec: {}
Broker
是一種 Addressable
類型,可以透過 HTTP 將傳入的 CloudEvents 接收到其 status.address.url
欄位中定義的位址
kubectl get brokers.eventing.knative.dev
NAME URL AGE READY REASON
my-demo-kafka-broker http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker 7s True
但是我們看不到有關 Apache Kafka 主題的任何資訊。原因是 Broker 實作使用的主題被視為實作細節。讓我們看一下實際的 Broker 物件
kubectl get brokers.eventing.knative.dev my-demo-kafka-broker -o yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: my-demo-kafka-broker
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
status:
address:
url: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker
annotations:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
default.topic: knative-broker-default-my-demo-kafka-broker
default.topic.partitions: "10"
default.topic.replication.factor: "1"
以上提供了 YAML 表示法的簡化版本,但請注意 spec.config
:它指向叢集中所有啟用 Kafka 的 Knative Broker 的預設組態。kafka-broker-config
ConfigMap 透過定義 partition
或 replication factor
之類的旋鈕,來配置底層主題的概念。但是,在 Broker 的 status
中,您會看到主題的名稱:knative-broker-default-my-demo-kafka-broker
。該名稱遵循 knative-broker-<命名空間>-<Broker 名稱>
的慣例。
注意
預設情況下,Knative Kafka Broker 會建立自己的內部主題,但是此操作在某些環境中可能會受到限制。對於這種情況和任何其他類似的使用案例,可以自帶主題。
設定取用者應用程式¶
現在我們有了 Broker
,它將作為擷取 CloudEvents 的 HTTP 端點,現在是時候定義一個接收和處理 CloudEvents 的應用程式了
apiVersion: v1
kind: Pod
metadata:
name: log-receiver
labels:
app: log-receiver
spec:
containers:
- name: log-receiver
image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
name: log-receiver
spec:
selector:
app: log-receiver
ports:
- port: 80
protocol: TCP
targetPort: log-receiver
name: http
在這裡,我們定義一個簡單的 Pod
及其 Service
,它指向接收 CloudEvents 的 HTTP 伺服器。如您所見,這不是特定於 Kafka 的取用者,可以使用任何語言的任何 HTTP Web 伺服器來處理來自 Apache Kafka 主題的 CloudEvents。
取用者應用程式的開發人員不需要知道任何有關如何編寫 Kafka 取用者應用程式的詳細資訊。Knative 及其針對 Apache Kafka 的 Broker 實作會透過充當取用者應用程式的 HTTP Proxy 來抽象化這一點。這大大簡化了這些重點明確且自包含的取用者應用程式的工程工作。
使用 Apache Kafka 定義訊息路由規則¶
Apache Kafka 中的主題通常用於包含可能指向相同有界上下文的不同類型的事件(如果您應用領域驅動設計原則)。這表示每個取用者都將接收所有事件,只是為了篩選和處理它們的子集。
這是 Apache Kafka 協定的缺點之一:沒有用於路由記錄的直接篩選 API。為了處理或篩選事件並將其路由到不同的目的地或其他 Kafka 主題,需要實作完整的 Kafka 取用者用戶端。或需要使用其他程式庫,如 Kafka Streams。
您可以想像,這是一個相當常見的模式,Knative Eventing 使其成為 API 的一部分。Trigger
API 定義了一組強大的篩選器,用於根據 CloudEvents 的中繼資料路由它們
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: log-trigger
spec:
broker: my-demo-kafka-broker
filter:
attributes:
type: <cloud-event-type>
<ce-extension>: <ce-extension-value>
subscriber:
ref:
apiVersion: v1
kind: Service
name: log-receiver
我們看到一個觸發器 (Trigger) 定義了一組 filter
規則,如果符合這些規則,來自 Kafka 主題的 CloudEvent 將會透過 HTTP 路由到我們引用的網路伺服器應用程式。Knative 中還有一個實驗性功能,可以使用 Trigger
API 中的 filters
欄位來啟用新的類似 SQL 的篩選功能,該功能實作了 CloudEvents Subscriptions API。
注意
強烈建議在 Trigger
上針對 CloudEvents 元數據屬性和擴展應用篩選屬性。如果沒有提供篩選器,所有發生的 CloudEvents 都會路由到引用的訂閱者,這是一個糟糕的應用程式設計,除非您明確地想為 Broker 中的所有事件建立記錄器。
對於由 Apache Kafka 的 Knative Broker 執行的 Trigger
,也可以使用 Trigger
上的 kafka.eventing.knative.dev/delivery.order
註解來配置已傳遞事件的順序。
Kn 事件外掛程式¶
為了發送事件,我們也不需要使用 Apache Kafka Producer API,因為我們是使用 HTTP 將 CloudEvents 擷取到 Broker。其中一種選擇是在 Kubernetes 叢集中使用安裝了 curl
程式的 Pod
,並將事件發送到 Broker
的 URL
。但是,我們改為使用kn
client CLI 及其用於從命令列管理雲事件的事件外掛程式。
kn event send \
--to Broker:eventing.knative.dev/v1:my-demo-kafka-broker \
--type=dev.knative.blog.post \
-f message="Hello"
透過上述命令,我們將一則 message
作為類型為 dev.knative.blog.post
的 CloudEvents 發送到我們的 my-demo-kafka-broker
物件。 kn event
外掛程式會從此調用產生有效的 CloudEvent,並將其直接發送到引用元件的可定址 URL,在此範例中為我們的 Broker
。
結論¶
該範例展示了從發送事件到接收事件的簡單流程。訊息會持久保存在 Knative Broker 後面的 Kafka 主題上。從那裡,任何標準的 Apache Kafka API 也可以使用它。然而,Knative 提供的抽象簡化了事件驅動應用程式的開發過程。無需太多額外的配置,也可以根據事件的元數據篩選和路由事件。
此外,採用 Trigger
/Filter
不僅僅是避免在所有消費者中重新實作相同模式的方法,而且還使整個訊息處理更加高效,因為只有在必要時才會調用消費者,如果它是 Knative 服務,甚至可以擴展到零!