跳至內容

從 CloudEvents 到 Apache Kafka 記錄,第一部分

發佈於:2023-03-08 ,  修訂於:2024-01-17

從 CloudEvents 到 Apache Kafka 記錄,第一部分

作者:Daniele Zonca,Red Hat 資深首席軟體工程師,Matthias Weßendorf,Red Hat 首席軟體工程師

在本部落格文章中,您將學習如何使用 KafkaSink 元件輕鬆地將傳入的 CloudEvents 儲存到 Apache Kafka 主題中。

Apache Kafka 用於許多非常不同的用例,但採用 Kafka 協議的需求可能成為障礙,尤其是在第三方元件的擴展可能性有限時。

有些事件產生者不支援 Kafka 協議,而 HTTP 可能是一個更靈活的選擇。Strimzi 專案有一個 Bridge 元件,透過 HTTP 公開生產者/消費者 API,但它是 Kafka 專用的,所以本質上是相同的協議(具有消費者群組、偏移量等)。

您認為 CloudEvents 要求可能會是問題嗎?CloudEvents 也為 HTTP 格式定義了 繫結,尤其是在二進位模式下,大多數 HTTP 酬載可能已經是有效的 CloudEvents!

適用於 Apache Kafka 的 Knative Sink 是 CloudEvent 入口的 Kafka 原生實作,並將事件持久化為可設定主題上的 Apache Kafka 記錄。

設定 Apache Kafka 主題

為了使用 KafkaSink 元件,您需要有一個 Apache Kafka 的主題,並且具有適當的存取權。在這篇文章中,我們使用的是由 Strimzi 支援的本機 Apache Kafka 安裝,如 此處 所述。一旦 Apache Kafka 叢集在您的 Kubernetes 環境中執行,就可以建立主題了。為此,我們使用 Strimzi 的 KafkaTopic CRD 以標準宣告式 Kubernetes 方式建立主題

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1

這將建立一個簡單的主題,其 partitionsreplicas 都設定為 1,這在生產環境中是建議的。

注意:有關 Knative Kafka Broker 的生產就緒配置,請參閱此部落格

一旦將清單套用到 Kubernetes 叢集,就可以像這樣查詢

kubectl get kafkatopics.kafka.strimzi.io -n kafka
NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
my-topic   my-cluster   1            1                    True

設定 KafkaSink 元件

適用於 Apache Kafka 的 Knative Sink 的安裝說明請參閱此處

接下來,我們將建立 KafkaSink 的實例,並將其繫結到本機 Strimzi 型 Apache Kafka 叢集上的 my-topic 主題

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: my-topic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

KafkaSink 是一個 Addressable 類型,可以透過 HTTP 接收傳入的 CloudEvents 到其 status.address.url 欄位中定義的位址

kubectl get kafkasinks.eventing.knative.dev
NAME            URL                                                                                  AGE   READY   REASON
my-kafka-sink   http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-sink   13s   True

Kn 事件外掛程式

此時,我們可以使用 Kubernetes 叢集內安裝了 curl 程式的 Pod,並將事件傳送到 KafkaSinkURL

不過,我們改用具有 kn client CLI 及其用於從命令列管理雲端事件的 事件外掛程式

kn event send \
  --to KafkaSink:eventing.knative.dev/v1alpha1:my-kafka-sink \
  --type=dev.knative.blog.post \
  -f message="Hello"

透過上述命令,我們將 message 作為具有 dev.knative.blog.post 的 CloudEvents 傳送到我們的 my-kafka-sink 物件。kn event 外掛程式會從此調用產生有效的 CloudEvents,並將其直接傳送到參照 sink 的可定址 URL。

使用 kcat 進行事件處理

kcat 是以前稱為 kafkacat 的專案,提供用於從 Apache Kafka 生產和消費記錄的命令列模式。

這讓我們可以消費儲存在 Apache Kafka 叢集 my-topic 主題中的 Apache Kafka 記錄(作為 CloudEvent)

kubectl run kcat -ti --image=docker.io/edenhill/kcat:1.7.1 --rm=true --restart=Never -- -C -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 -t my-topic -f '\nHeaders: %h\nMessage value: %s\n\n '
上述命令會印出 Kafka 記錄的所有標頭及其值,如下所示

Headers: ce_specversion=1.0,ce_id=ce5026d0-234e-4997-975a-c005f515fedf,ce_source=kn-event/v1.9.0,ce_type=ype=dev.knative.blog.post,content-type=application/json,ce_time=2023-02-13T12:52:20.654526321Z
Message value: {"message":"Hello"}

% Reached end of topic my-topic [0] at offset 2

CloudEvents 二進位模式

請務必注意,KafkaSink 預設會使用二進位內容模式,將傳入的 CloudEvents 儲存為 Kafka 記錄,因為它在傳輸或路由方面有經過優化,效率更高,且能避免 JSON 解析。使用 二進位內容模式 表示所有 CloudEvents 的屬性和擴充功能都會被映射為 Kafka 記錄的標頭,而 CloudEvent 的 data 則對應到 Kafka 記錄的實際值。相較於 結構化內容模式,這也是使用 二進位內容模式 的另一個好處,因為它較不具阻礙性,因此可以與不理解 CloudEvents 的系統相容。

展望

由 Knative KafkaSink 元件支援的 Kafka 主題中儲存的訊息,可以輕鬆地被 Apache Kafka 社群中更廣泛的任何消費者應用程式所使用。本文章的下一篇文章將展示如何使用 Apache Kafka 的 Knative Broker 實作來儲存傳入的事件,並利用 Knative Eventing 工具根據 CloudEvents 元資料進行路由,因為此過濾功能並非直接建置在 Apache Kafka 本身中。

我們使用分析和 cookies 來了解網站流量。您使用我們網站的相關資訊會為了此目的與 Google 分享。了解更多。