跳至內容

適用於 Apache Kafka 的 Knative Broker

適用於 Apache Kafka 的 Knative Broker 是 Knative Broker API 的實作,其原生目標是 Apache Kafka,以減少網路躍點,並為 Broker 和 Trigger API 模型提供更好的 Apache Kafka 整合。

值得注意的功能包括:

Knative Kafka Broker 使用 二進位內容模式將傳入的 CloudEvents 儲存為 Kafka 記錄,因為它對於傳輸或路由的優化更有效率,並避免 JSON 剖析。使用 二進位內容模式表示所有 CloudEvent 屬性和擴充功能都會對應為 Kafka 記錄的標頭,而 CloudEvent 的 data 則對應到 Kafka 記錄的實際值。這是使用 二進位內容模式 而非 結構化內容模式的另一個好處,因為它比較不具干擾性,因此與不了解 CloudEvents 的系統相容。

先決條件

  1. 您已安裝 Knative Eventing。
  2. 您可以存取 Apache Kafka 叢集。

提示

如果您需要設定 Kafka 叢集,您可以按照Strimzi 快速入門頁面上的說明執行此操作。

安裝

  1. 輸入以下命令來安裝 Kafka 控制器

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 輸入以下命令來安裝 Kafka Broker 資料平面

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
    
  3. 輸入以下命令來驗證 kafka-controllerkafka-broker-receiverkafka-broker-dispatcher 是否正在執行

    kubectl get deployments.apps -n knative-eventing
    

    範例輸出

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-broker-dispatcher        1/1     1            1           4s
    kafka-broker-receiver          1/1     1            1           5s
    

建立 Kafka Broker

Kafka Broker 物件如下所示

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    # Optional annotation to point to an externally managed kafka topic:
    # kafka.eventing.knative.dev/external.topic: <topic-name>
  name: default
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

設定 Kafka Broker

spec.config 應該參考任何namespace中看起來像以下的任何 ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

ConfigMap 安裝在叢集中的 Knative Eventing SYSTEM_NAMESPACE 中。您可以根據您的需求編輯全域設定。您也可以在每個 broker 基礎上覆寫這些設定,方法是在您的 Kafka Broker 的 spec.config 欄位中參考不同 namespace 上或具有不同 name 的不同 ConfigMap

注意

default.topic.replication.factor 值必須小於或等於叢集中 Kafka broker 執行個體的數量。例如,如果您只有一個 Kafka broker,則 default.topic.replication.factor 值不應大於 1

Knative 支援您的 Kafka 版本支援的完整主題設定選項集。若要設定任何這些選項,您需要在 configmap 中新增一個具有 default.topic.config. 前置詞的金鑰。例如,若要設定 retention.ms 值,您會修改 ConfigMap,使其看起來如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
  # Here is our retention.ms config
  default.topic.config.retention.ms: "3600"

設定為預設 broker 實作

若要將 Kafka broker 設定為 Knative 部署中所有 broker 的預設實作,您可以透過修改 knative-eventing 命名空間中的 config-br-defaults ConfigMap 來套用全域設定。

這可讓您避免設定每個 broker 的個別或每個命名空間設定,例如 metadata.annotations.eventing.knative.dev/broker.classspec.config

以下 YAML 是使用 Kafka broker 作為預設實作的 config-br-defaults ConfigMap 範例。

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-br-defaults
  namespace: knative-eventing
data:
  default-br-config: |
    clusterDefault:
      brokerClass: Kafka
      apiVersion: v1
      kind: ConfigMap
      name: kafka-broker-config
      namespace: knative-eventing
    namespaceDefaults:
      namespace1:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing
      namespace2:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing

安全性

Apache Kafka 支援不同的安全性功能,Knative 支援以下功能

若要啟用安全性功能,在 broker.spec.config 參考的 ConfigMap 中,我們可以參考 Secret

apiVersion: v1
kind: ConfigMap
metadata:
   name: kafka-broker-config
   namespace: knative-eventing
data:
   # Other configurations
   # ...

   # Reference a Secret called my_secret
   auth.secret.ref.name: my_secret

Secret my_secret 必須存在於 broker.spec.config 參考的 ConfigMap 的相同命名空間中,在此情況下:knative-eventing

注意

憑證和金鑰必須採用 PEM 格式

使用 SASL 進行身分驗證

Knative 支援以下 SASL 機制

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

若要使用特定 SASL 機制,請將 <sasl_mechanism> 取代為您選擇的機制。

使用 SASL 進行身分驗證而不加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SASL 進行身分驗證並使用 SSL 進行加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SSL 進行加密而不進行用戶端身分驗證

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

使用 SSL 進行身分驗證和加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注意

可以省略 ca.crt,以恢復使用系統的根 CA 集。

攜帶您自己的主題

預設情況下,Knative Kafka Broker 會建立自己的內部主題,但是,可以使用 kafka.eventing.knative.dev/external.topic 註釋指向外部管理的主題

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    kafka.eventing.knative.dev/external.topic: <my-topic-name>
  name: default
  namespace: default
spec:
  # other spec fields ...

注意

使用外部主題時,Knative Kafka Broker 不擁有該主題,也不負責管理該主題。這包括主題生命週期或其一般有效性。其他對主題一般存取的限制可能適用。請參閱有關使用存取控制清單 (ACL)的文件。

消費者偏移量認可間隔

Kafka 消費者會透過認可偏移量來追蹤上次成功傳送的事件。

Knative Kafka Broker 每 auto.commit.interval.ms 毫秒認可一次偏移量。

注意

為了防止對效能造成負面影響,不建議每次將事件成功傳送給訂閱者時都認可偏移量。

可以透過修改 knative-eventing 命名空間中的 config-kafka-broker-data-plane ConfigMap 來變更間隔,方法是修改參數 auto.commit.interval.ms,如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-kafka-broker-data-plane
  namespace: knative-eventing
data:
  # Some configurations omitted ...
  config-kafka-broker-consumer.properties: |
    # Some configurations omitted ...

    # Commit the offset every 5000 millisecods (5 seconds)
    auto.commit.interval.ms=5000

注意

Knative Kafka Broker 保證至少傳遞一次,這表示您的應用程式可能會收到重複的事件。較高的認可間隔表示收到重複事件的可能性較高,因為當消費者重新啟動時,它會從上次認可的偏移量重新啟動。

Kafka 生產者和消費者設定

Knative 會公開所有可用的 Kafka 生產者和消費者設定,可以修改這些設定以符合您的工作負載。

您可以修改 knative-eventing 命名空間中的 config-kafka-broker-data-plane ConfigMap 來變更這些設定。

ConfigMap 中可用設定的文件可在 Apache Kafka 網站上找到,特別是 生產者設定消費者設定

啟用資料平面元件的偵錯日誌

以下 YAML 顯示資料平面元件的預設日誌設定,該設定是在安裝步驟期間建立的

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config-logging
  namespace: knative-eventing
data:
  config.xml: |
    <configuration>
      <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      </appender>
      <root level="INFO">
        <appender-ref ref="jsonConsoleAppender"/>
      </root>
    </configuration>

若要將日誌層級變更為 DEBUG,您必須

  1. 套用以下 kafka-config-logging ConfigMap,或將 ConfigMap kafka-config-logging 中的 level="INFO" 取代為 level="DEBUG"

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 輸入以下指令,重新啟動 kafka-broker-receiverkafka-broker-dispatcher

    kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
    kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
    

設定已傳遞事件的順序

在分派事件時,可以設定 Kafka Broker 以支援不同的傳遞順序保證。

您可以使用 Trigger 物件上的 kafka.eventing.knative.dev/delivery.order 註釋來設定事件的傳遞順序

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
  annotations:
     kafka.eventing.knative.dev/delivery.order: ordered
spec:
  broker: my-kafka-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

支援的消費者傳遞保證為

  • unordered:無序消費者是非阻塞消費者,會以無序方式傳遞訊息,同時保留適當的偏移管理。當需要高度並行消費且不需要明確排序時很有用。一個例子可以是點擊分析的處理。
  • ordered:有序消費者是每個分割區的阻塞消費者,它會在傳遞分割區的下一則訊息之前,等待 CloudEvent 訂閱者的成功回應。當需要更嚴格的排序或事件之間存在關係或群組時很有用。一個例子可以是客戶訂單的處理。

unordered 傳遞是預設的排序保證。

資料平面隔離與共用資料平面

Knative Kafka Broker 實作有 2 個平面:控制平面和資料平面。控制平面由與 Kubernetes API 通訊、監看自訂物件並管理資料平面的控制器組成。

資料平面是接聽傳入事件、與 Apache Kafka 通訊以及將事件傳送到事件接收器的元件集合。這就是事件流動的地方。Knative Kafka Broker 資料平面由 kafka-broker-receiverkafka-broker-dispatcher 部署組成。

當使用 Broker 類別 Kafka 時,Knative Kafka Broker 會使用共用資料平面。這表示 knative-eventing 命名空間中的 kafka-broker-receiverkafka-broker-dispatcher 部署會用於叢集中的所有 Kafka Broker。

但是,當 KafkaNamespaced 設定為 Broker 類別時,Kafka broker 控制器會為每個存在 broker 的命名空間建立新的資料平面。此資料平面由該命名空間中的所有 KafkaNamespaced broker 使用。

這提供了資料平面之間的隔離,這表示使用者命名空間中的 kafka-broker-receiverkafka-broker-dispatcher 部署僅用於該命名空間中的 broker。

注意

由於有單獨的資料平面,此安全性功能會建立更多部署並使用更多資源。除非您有此類隔離要求,否則建議使用具有 Kafka 類別的「一般」Broker。

若要建立 KafkaNamespaced broker,您必須將 eventing.knative.dev/broker.class 註釋設定為 KafkaNamespaced

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: KafkaNamespaced
  name: default
  namespace: my-namespace
spec:
  config:
     # the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
    apiVersion: v1
    kind: ConfigMap
    name: my-config
    # namespace: my-namespace # no need to define, defaults to Broker's namespace

注意

spec.config 中指定的 configmap 必須Broker 物件位於相同的命名空間中

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

在建立第一個具有 KafkaNamespaced 類別的 Broker 後,kafka-broker-receiverkafka-broker-dispatcher 部署會在命名空間中建立。之後,相同命名空間中所有具有 KafkaNamespaced 類別的 broker 都會使用相同的資料平面。當命名空間中沒有 KafkaNamespaced 類別的 broker 時,命名空間中的資料平面將會被刪除。

設定 KafkaNamespaced broker

適用於 Kafka Broker 類別的所有設定機制也適用於具有 KafkaNamespaced 類別的 broker,但有以下例外情況

  • 此頁面說明如何透過修改 knative-eventing 命名空間中的 config-kafka-broker-data-plane configmap 來完成生產者和消費者設定。由於 Kafka Broker 控制器會將此 configmap 傳播到使用者命名空間中,因此目前無法設定每個命名空間的生產者和消費者設定。knative-eventing 命名空間中 config-kafka-broker-data-plane ConfigMap 中設定的任何值也會在使用者命名空間中使用。
  • 由於相同的傳播,也無法設定每個命名空間的消費者偏移量提交間隔。
  • 會傳播更多 configmap:config-tracingkafka-config-logging。這表示追蹤和記錄也無法針對每個命名空間設定。
  • 同樣地,資料平面部署會從 knative-eventing 命名空間傳播到使用者命名空間。這表示資料平面部署無法針對每個命名空間設定,且將與 knative-eventing 命名空間中的部署相同。

使用 KEDA 啟用和設定觸發程序的自動調整

若要使用 KEDA 啟用和設定參照 Kafka Broker 的觸發程序自動調整,請依照此處的指示

其他資訊

我們使用分析和 Cookie 來了解網站流量。關於您使用我們網站的資訊會與 Google 分享以達到該目的。了解更多資訊。