跳至內容

適用於 Apache Kafka 的 Knative 來源

stage version

KafkaSource 讀取儲存在現有 Apache Kafka 主題中的訊息,並將這些訊息以 CloudEvents 的形式透過 HTTP 傳送到其設定的 sinkKafkaSource 會保留儲存在主題分割區中的訊息順序。它會先等待來自 sink 的成功回應,才會傳遞相同分割區中的下一個訊息。

安裝 KafkaSource 控制器

  1. 輸入下列命令以安裝 KafkaSource 控制器

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 輸入下列命令以安裝 Kafka Source 資料平面

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
    
  3. 輸入下列命令以驗證 kafka-controllerkafka-source-dispatcher 是否正在執行

    kubectl get deployments.apps,statefulsets.apps -n knative-eventing
    

    範例輸出

    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/kafka-controller               1/1     1            1           3s
    
    NAME                                       READY   AGE
    statefulset.apps/kafka-source-dispatcher   1/1     3s
    

選用:建立 Kafka 主題

注意

建立 Kafka 主題的章節假設您使用 Strimzi 來操作 Apache Kafka,但是可以使用 Apache Kafka CLI 或任何其他工具複製對等操作。

如果您使用 Strimzi

  1. 建立 KafkaTopic YAML 檔案

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: knative-demo-topic
      namespace: kafka
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 3
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  2. 執行下列命令以部署 KafkaTopic YAML 檔案

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您的 KafkaTopic YAML 檔案的名稱。

    範例輸出

    kafkatopic.kafka.strimzi.io/knative-demo-topic created
    

  3. 執行下列命令以確保 KafkaTopic 正在執行

    kubectl -n kafka get kafkatopics.kafka.strimzi.io
    

    範例輸出

    NAME                 CLUSTER      PARTITIONS   REPLICATION FACTOR
    knative-demo-topic   my-cluster   3            1
    

建立服務

  1. event-display 服務建立為 YAML 檔案

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - # This corresponds to
              # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
              image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    
  2. 執行下列命令以套用 YAML 檔案

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一個步驟中建立的檔案名稱。

    範例輸出

    service.serving.knative.dev/event-display created
    

  3. 執行下列命令以確保服務 Pod 正在執行

    kubectl get pods
    

    Pod 名稱的前綴為 event-display

    NAME                                            READY     STATUS    RESTARTS   AGE
    event-display-00001-deployment-5d5df6c7-gv2j4   2/2       Running   0          72s
    

Kafka 事件來源

  1. 根據 bootstrap 伺服器、主題等等修改 source/event-source.yaml

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    
  2. 部署事件來源

    kubectl apply -f event-source.yaml
    

    範例輸出

    kafkasource.sources.knative.dev/kafka-source created
    

  3. 驗證 KafkaSource 是否已就緒

    kubectl get kafkasource kafka-source
    

    範例輸出

    NAME           TOPICS                   BOOTSTRAPSERVERS                            READY   REASON   AGE
    kafka-source   ["knative-demo-topic"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             26h
    

擴展

若要排程更多或更少的消費者,可以擴展 KafkaSource,並將它們配置給不同的分配器 Pod。kafkasource 狀態會在 status.placements 鍵下顯示此配置。

您可以使用下列符號,透過 kubectl 擴展 KafkaSource

kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions

或者,如果您使用 GitOps 方法,您可以新增 consumers 鍵,如下列範例所示,並將其提交至您的儲存庫

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 
      consumers: 12    # Number of replicas
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

使用 KEDA 自動擴展

您可以使用 KEDA 自動擴展 KafkaSource。如需如何啟用和設定此功能的資訊,請閱讀 此處的指示

驗證

  1. 將訊息 ({"msg": "This is a test!"}) 產生至 Apache Kafka 主題,如下列範例所示

    kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
    

    提示

    如果您沒有看到命令提示字元,請嘗試按下 Enter

  2. 驗證服務是否已從事件來源接收到訊息

    kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    

    範例輸出

    ☁️ cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      subject: partition:0#564
      id: partition:0/offset:564
      time: 2020-02-10T18:10:23.861866615Z
      datacontenttype: application/json
    Extensions,
      key:
    Data,
        {
          "msg": "This is a test!"
        }
    

處理傳遞失敗

KafkaSource 實作 Delivery 規格,可讓您為其設定事件傳遞參數,這些參數會在事件無法傳遞時套用

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      delivery:
        deadLetterSink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: example-sink
        backoffDelay: <duration>
        backoffPolicy: <policy-type>
        retry: <integer>
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

處理傳遞失敗章節會討論 delivery API。

選用:指定鍵的反序列化器

KafkaSource 從 Kafka 接收到訊息時,它會將金鑰轉儲到稱為 Key 的事件擴充功能中,並將 Kafka 訊息標頭轉儲到以 kafkaheader 開頭的擴充功能中。

您可以在四種類型中指定鍵的反序列化器

  • string (預設值) 用於 UTF-8 編碼的字串
  • int 用於 32 位元和 64 位元帶正負號的整數
  • float 用於 32 位元和 64 位元浮點數
  • byte-array 用於 Base64 編碼的位元組陣列

若要指定鍵的反序列化器,請將標籤 kafkasources.sources.knative.dev/key-type 新增至 KafkaSource 定義,如下列範例所示

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
  kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

選用:指定初始偏移量

預設情況下,KafkaSource 會從每個分割區中的最新偏移量開始取用。如果您想要從最早的偏移量開始取用,請將 initialOffset 欄位設定為 earliest,例如

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

注意

initialOffset 的有效值為 earliestlatest。任何其他值都會導致驗證錯誤。只有在該消費者群組沒有已提交的偏移量時,才會接受此欄位。

連線至啟用 TLS 的 Kafka Broker

KafkaSource 支援 TLS 和 SASL 驗證方法。若要啟用 TLS 驗證,您必須有下列檔案

  • CA 憑證
  • 用戶端憑證和金鑰

KafkaSource 預期這些檔案為 PEM 格式。如果它們是其他格式 (例如 JKS),請將它們轉換為 PEM。

  1. 執行下列命令,將憑證檔案建立為 KafkaSource 將設定於其中的命名空間中的秘密

    kubectl create secret generic cacert --from-file=caroot.pem
    
    kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    
  2. 套用 KafkaSource。請根據情況修改 bootstrapServerstopics 欄位。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
     name: kafka-source-with-tls
    spec:
     net:
       tls:
         enable: true
         cert:
           secretKeyRef:
             key: tls.crt
             name: kafka-secret
         key:
           secretKeyRef:
             key: tls.key
             name: kafka-secret
         caCert:
           secretKeyRef:
             key: caroot.pem
             name: cacert
     consumerGroup: knative-group
     bootstrapServers:
     - my-secure-kafka-bootstrap.kafka:443
     topics:
     - knative-demo-topic
     sink:
       ref:
         apiVersion: serving.knative.dev/v1
         kind: Service
         name: event-display
    

為 KafkaSources 啟用 SASL

Apache Kafka 使用簡易驗證和安全層 (SASL) 進行驗證。如果您在叢集上使用 SASL 驗證,使用者必須向 Knative 提供認證才能與 Kafka 叢集通訊,否則無法產生或取用事件。

先決條件

  • 您可以存取具有簡易驗證和安全層 (SASL) 的 Kafka 叢集。

步驟

  1. 執行下列命令,建立使用 Kafka 叢集 SASL 資訊的秘密

    STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
    
    SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
    
    kubectl create secret -n default generic <secret_name> \
        --from-literal=ca.crt="$STRIMZI_CRT" \
        --from-literal=password="$SASL_PASSWD" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="example-user"
    
  2. 建立或修改 KafkaSource,使其包含下列規格選項

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <secret_name>
              key: user
          password:
            secretKeyRef:
              name: <secret_name>
              key: password
          type:
            secretKeyRef:
              name: <secret_name>
              key: saslType
        tls:
          enable: true
          caCert:
            secretKeyRef:
              name: <secret_name>
              key: ca.crt
    ...
    

    其中 <secret_name> 是上一個步驟中產生的秘密名稱。

清除步驟

  1. 刪除 Kafka 事件來源

    kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    

    範例輸出

    "kafka-source" deleted
    

  2. 刪除 event-display 服務

    kubectl delete -f source/event-display.yaml service.serving.knative.dev
    

    範例輸出

    "event-display" deleted
    

  3. (可選)移除 Apache Kafka 主題

    kubectl delete -f kafka-topic.yaml
    

    範例輸出

    kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
    

我們使用分析和 Cookie 來了解網站流量。您使用我們網站的資訊將與 Google 分享以達到此目的。了解更多。