從 CloudEvents 到 Apache Kafka 記錄,第一部分 ¶
發佈於:2023-03-08 , 修訂於:2024-01-17
從 CloudEvents 到 Apache Kafka 記錄,第一部分¶
作者:Daniele Zonca,Red Hat 資深首席軟體工程師,Matthias Weßendorf,Red Hat 首席軟體工程師
在本部落格文章中,您將學習如何使用 KafkaSink 元件輕鬆地將傳入的 CloudEvents 儲存到 Apache Kafka 主題中。
Apache Kafka 用於許多非常不同的用例,但採用 Kafka 協議的需求可能成為障礙,尤其是在第三方元件的擴展可能性有限時。
有些事件產生者不支援 Kafka 協議,而 HTTP 可能是一個更靈活的選擇。Strimzi 專案有一個 Bridge 元件,透過 HTTP 公開生產者/消費者 API,但它是 Kafka 專用的,所以本質上是相同的協議(具有消費者群組、偏移量等)。
您認為 CloudEvents 要求可能會是問題嗎?CloudEvents 也為 HTTP 格式定義了 繫結,尤其是在二進位模式下,大多數 HTTP 酬載可能已經是有效的 CloudEvents!
適用於 Apache Kafka 的 Knative Sink 是 CloudEvent 入口的 Kafka 原生實作,並將事件持久化為可設定主題上的 Apache Kafka 記錄。
設定 Apache Kafka 主題¶
為了使用 KafkaSink
元件,您需要有一個 Apache Kafka 的主題,並且具有適當的存取權。在這篇文章中,我們使用的是由 Strimzi 支援的本機 Apache Kafka 安裝,如 此處 所述。一旦 Apache Kafka 叢集在您的 Kubernetes 環境中執行,就可以建立主題了。為此,我們使用 Strimzi 的 KafkaTopic
CRD 以標準宣告式 Kubernetes 方式建立主題
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
這將建立一個簡單的主題,其 partitions
和 replicas
都設定為 1
,這在生產環境中是不建議的。
注意:有關 Knative Kafka Broker 的生產就緒配置,請參閱此部落格。
一旦將清單套用到 Kubernetes 叢集,就可以像這樣查詢
kubectl get kafkatopics.kafka.strimzi.io -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
my-topic my-cluster 1 1 True
設定 KafkaSink 元件¶
適用於 Apache Kafka 的 Knative Sink 的安裝說明請參閱此處。
接下來,我們將建立 KafkaSink
的實例,並將其繫結到本機 Strimzi 型 Apache Kafka 叢集上的 my-topic
主題
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: my-topic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
KafkaSink
是一個 Addressable
類型,可以透過 HTTP 接收傳入的 CloudEvents 到其 status.address.url
欄位中定義的位址
kubectl get kafkasinks.eventing.knative.dev
NAME URL AGE READY REASON
my-kafka-sink http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-sink 13s True
Kn 事件外掛程式¶
此時,我們可以使用 Kubernetes 叢集內安裝了 curl
程式的 Pod,並將事件傳送到 KafkaSink
的 URL
。
不過,我們改用具有 kn
client CLI 及其用於從命令列管理雲端事件的 事件外掛程式
kn event send \
--to KafkaSink:eventing.knative.dev/v1alpha1:my-kafka-sink \
--type=dev.knative.blog.post \
-f message="Hello"
透過上述命令,我們將 message
作為具有 dev.knative.blog.post
的 CloudEvents 傳送到我們的 my-kafka-sink
物件。kn event
外掛程式會從此調用產生有效的 CloudEvents,並將其直接傳送到參照 sink 的可定址 URL。
使用 kcat 進行事件處理¶
kcat 是以前稱為 kafkacat 的專案,提供用於從 Apache Kafka 生產和消費記錄的命令列模式。
這讓我們可以消費儲存在 Apache Kafka 叢集 my-topic
主題中的 Apache Kafka 記錄(作為 CloudEvent)
kubectl run kcat -ti --image=docker.io/edenhill/kcat:1.7.1 --rm=true --restart=Never -- -C -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 -t my-topic -f '\nHeaders: %h\nMessage value: %s\n\n '
Headers: ce_specversion=1.0,ce_id=ce5026d0-234e-4997-975a-c005f515fedf,ce_source=kn-event/v1.9.0,ce_type=ype=dev.knative.blog.post,content-type=application/json,ce_time=2023-02-13T12:52:20.654526321Z
Message value: {"message":"Hello"}
% Reached end of topic my-topic [0] at offset 2
CloudEvents 二進位模式¶
請務必注意,KafkaSink
預設會使用二進位內容模式,將傳入的 CloudEvents 儲存為 Kafka 記錄,因為它在傳輸或路由方面有經過優化,效率更高,且能避免 JSON 解析。使用 二進位內容模式
表示所有 CloudEvents 的屬性和擴充功能都會被映射為 Kafka 記錄的標頭,而 CloudEvent 的 data
則對應到 Kafka 記錄的實際值。相較於 結構化內容模式
,這也是使用 二進位內容模式
的另一個好處,因為它較不具阻礙性,因此可以與不理解 CloudEvents 的系統相容。
展望¶
由 Knative KafkaSink
元件支援的 Kafka 主題中儲存的訊息,可以輕鬆地被 Apache Kafka 社群中更廣泛的任何消費者應用程式所使用。本文章的下一篇文章將展示如何使用 Apache Kafka 的 Knative Broker 實作來儲存傳入的事件,並利用 Knative Eventing 工具根據 CloudEvents 元資料進行路由,因為此過濾功能並非直接建置在 Apache Kafka 本身中。