適用於 Apache Kafka 的 Knative 來源¶
KafkaSource
讀取儲存在現有 Apache Kafka 主題中的訊息,並將這些訊息以 CloudEvents 的形式透過 HTTP 傳送到其設定的 sink
。KafkaSource
會保留儲存在主題分割區中的訊息順序。它會先等待來自 sink
的成功回應,才會傳遞相同分割區中的下一個訊息。
安裝 KafkaSource 控制器¶
-
輸入下列命令以安裝
KafkaSource
控制器kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
輸入下列命令以安裝 Kafka Source 資料平面
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
-
輸入下列命令以驗證
kafka-controller
和kafka-source-dispatcher
是否正在執行kubectl get deployments.apps,statefulsets.apps -n knative-eventing
範例輸出
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/kafka-controller 1/1 1 1 3s NAME READY AGE statefulset.apps/kafka-source-dispatcher 1/1 3s
選用:建立 Kafka 主題¶
注意
建立 Kafka 主題的章節假設您使用 Strimzi 來操作 Apache Kafka,但是可以使用 Apache Kafka CLI 或任何其他工具複製對等操作。
如果您使用 Strimzi
-
建立
KafkaTopic
YAML 檔案apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
執行下列命令以部署
KafkaTopic
YAML 檔案其中kubectl apply -f <filename>.yaml
<filename>
是您的KafkaTopic
YAML 檔案的名稱。範例輸出
kafkatopic.kafka.strimzi.io/knative-demo-topic created
-
執行下列命令以確保
KafkaTopic
正在執行kubectl -n kafka get kafkatopics.kafka.strimzi.io
範例輸出
NAME CLUSTER PARTITIONS REPLICATION FACTOR knative-demo-topic my-cluster 3 1
建立服務¶
-
將
event-display
服務建立為 YAML 檔案apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
-
執行下列命令以套用 YAML 檔案
其中kubectl apply -f <filename>.yaml
<filename>
是您在上一個步驟中建立的檔案名稱。範例輸出
service.serving.knative.dev/event-display created
-
執行下列命令以確保服務 Pod 正在執行
kubectl get pods
Pod 名稱的前綴為
event-display
NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
Kafka 事件來源¶
-
根據 bootstrap 伺服器、主題等等修改
source/event-source.yaml
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
-
部署事件來源
kubectl apply -f event-source.yaml
範例輸出
kafkasource.sources.knative.dev/kafka-source created
-
驗證 KafkaSource 是否已就緒
kubectl get kafkasource kafka-source
範例輸出
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
擴展¶
若要排程更多或更少的消費者,可以擴展 KafkaSource,並將它們配置給不同的分配器 Pod。kafkasource 狀態會在 status.placements 鍵下顯示此配置。
您可以使用下列符號,透過 kubectl 擴展 KafkaSource
kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions
或者,如果您使用 GitOps 方法,您可以新增 consumers
鍵,如下列範例所示,並將其提交至您的儲存庫
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
consumers: 12 # Number of replicas
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
使用 KEDA 自動擴展¶
您可以使用 KEDA 自動擴展 KafkaSource。如需如何啟用和設定此功能的資訊,請閱讀 此處的指示。
驗證¶
-
將訊息 (
{"msg": "This is a test!"}
) 產生至 Apache Kafka 主題,如下列範例所示kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
提示
如果您沒有看到命令提示字元,請嘗試按下 Enter。
-
驗證服務是否已從事件來源接收到訊息
kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
範例輸出
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
處理傳遞失敗¶
KafkaSource
實作 Delivery
規格,可讓您為其設定事件傳遞參數,這些參數會在事件無法傳遞時套用
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: example-sink
backoffDelay: <duration>
backoffPolicy: <policy-type>
retry: <integer>
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
處理傳遞失敗章節會討論 delivery
API。
選用:指定鍵的反序列化器¶
當 KafkaSource
從 Kafka 接收到訊息時,它會將金鑰轉儲到稱為 Key
的事件擴充功能中,並將 Kafka 訊息標頭轉儲到以 kafkaheader
開頭的擴充功能中。
您可以在四種類型中指定鍵的反序列化器
string
(預設值) 用於 UTF-8 編碼的字串int
用於 32 位元和 64 位元帶正負號的整數float
用於 32 位元和 64 位元浮點數byte-array
用於 Base64 編碼的位元組陣列
若要指定鍵的反序列化器,請將標籤 kafkasources.sources.knative.dev/key-type
新增至 KafkaSource
定義,如下列範例所示
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
選用:指定初始偏移量¶
預設情況下,KafkaSource
會從每個分割區中的最新偏移量開始取用。如果您想要從最早的偏移量開始取用,請將 initialOffset 欄位設定為 earliest
,例如
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
注意
initialOffset
的有效值為 earliest
和 latest
。任何其他值都會導致驗證錯誤。只有在該消費者群組沒有已提交的偏移量時,才會接受此欄位。
連線至啟用 TLS 的 Kafka Broker¶
KafkaSource 支援 TLS 和 SASL 驗證方法。若要啟用 TLS 驗證,您必須有下列檔案
- CA 憑證
- 用戶端憑證和金鑰
KafkaSource 預期這些檔案為 PEM 格式。如果它們是其他格式 (例如 JKS),請將它們轉換為 PEM。
-
執行下列命令,將憑證檔案建立為 KafkaSource 將設定於其中的命名空間中的秘密
kubectl create secret generic cacert --from-file=caroot.pem
kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
-
套用 KafkaSource。請根據情況修改
bootstrapServers
和topics
欄位。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
為 KafkaSources 啟用 SASL¶
Apache Kafka 使用簡易驗證和安全層 (SASL) 進行驗證。如果您在叢集上使用 SASL 驗證,使用者必須向 Knative 提供認證才能與 Kafka 叢集通訊,否則無法產生或取用事件。
先決條件¶
- 您可以存取具有簡易驗證和安全層 (SASL) 的 Kafka 叢集。
步驟¶
-
執行下列命令,建立使用 Kafka 叢集 SASL 資訊的秘密
STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
kubectl create secret -n default generic <secret_name> \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="example-user"
-
建立或修改 KafkaSource,使其包含下列規格選項
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <secret_name> key: user password: secretKeyRef: name: <secret_name> key: password type: secretKeyRef: name: <secret_name> key: saslType tls: enable: true caCert: secretKeyRef: name: <secret_name> key: ca.crt ...
其中
<secret_name>
是上一個步驟中產生的秘密名稱。
清除步驟¶
-
刪除 Kafka 事件來源
kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
範例輸出
"kafka-source" deleted
-
刪除
event-display
服務kubectl delete -f source/event-display.yaml service.serving.knative.dev
範例輸出
"event-display" deleted
-
(可選)移除 Apache Kafka 主題
kubectl delete -f kafka-topic.yaml
範例輸出
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted