適用於 Apache Kafka 的 Knative Broker¶
適用於 Apache Kafka 的 Knative Broker 是 Knative Broker API 的實作,其原生目標是 Apache Kafka,以減少網路躍點,並為 Broker 和 Trigger API 模型提供更好的 Apache Kafka 整合。
值得注意的功能包括:
- 控制平面高可用性
- 水平可擴充的資料平面
- 可廣泛設定
- 根據 CloudEvents 分割擴充功能排序事件傳遞
- 支援任何 Kafka 版本,請參閱相容性矩陣
- 支援 2 種資料平面模式:每個命名空間的資料平面隔離或共用資料平面
Knative Kafka Broker 使用 二進位內容模式將傳入的 CloudEvents 儲存為 Kafka 記錄,因為它對於傳輸或路由的優化更有效率,並避免 JSON 剖析。使用 二進位內容模式
表示所有 CloudEvent 屬性和擴充功能都會對應為 Kafka 記錄的標頭,而 CloudEvent 的 data
則對應到 Kafka 記錄的實際值。這是使用 二進位內容模式
而非 結構化內容模式
的另一個好處,因為它比較不具干擾性
,因此與不了解 CloudEvents 的系統相容。
先決條件¶
- 您已安裝 Knative Eventing。
- 您可以存取 Apache Kafka 叢集。
提示
如果您需要設定 Kafka 叢集,您可以按照Strimzi 快速入門頁面上的說明執行此操作。
安裝¶
-
輸入以下命令來安裝 Kafka 控制器
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
輸入以下命令來安裝 Kafka Broker 資料平面
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
-
輸入以下命令來驗證
kafka-controller
、kafka-broker-receiver
和kafka-broker-dispatcher
是否正在執行kubectl get deployments.apps -n knative-eventing
範例輸出
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-broker-dispatcher 1/1 1 1 4s kafka-broker-receiver 1/1 1 1 5s
建立 Kafka Broker¶
Kafka Broker 物件如下所示
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
# Optional annotation to point to an externally managed kafka topic:
# kafka.eventing.knative.dev/external.topic: <topic-name>
name: default
namespace: default
spec:
# Configuration specific to this broker.
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
# Optional dead letter sink, you can specify either:
# - deadLetterSink.ref, which is a reference to a Callable
# - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dlq-service
設定 Kafka Broker¶
spec.config
應該參考任何namespace
中看起來像以下的任何 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
此 ConfigMap
安裝在叢集中的 Knative Eventing SYSTEM_NAMESPACE
中。您可以根據您的需求編輯全域設定。您也可以在每個 broker 基礎上覆寫這些設定,方法是在您的 Kafka Broker 的 spec.config
欄位中參考不同 namespace
上或具有不同 name
的不同 ConfigMap
。
注意
default.topic.replication.factor
值必須小於或等於叢集中 Kafka broker 執行個體的數量。例如,如果您只有一個 Kafka broker,則 default.topic.replication.factor
值不應大於 1
。
Knative 支援您的 Kafka 版本支援的完整主題設定選項集。若要設定任何這些選項,您需要在 configmap 中新增一個具有 default.topic.config.
前置詞的金鑰。例如,若要設定 retention.ms
值,您會修改 ConfigMap
,使其看起來如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
# Here is our retention.ms config
default.topic.config.retention.ms: "3600"
設定為預設 broker 實作¶
若要將 Kafka broker 設定為 Knative 部署中所有 broker 的預設實作,您可以透過修改 knative-eventing
命名空間中的 config-br-defaults
ConfigMap 來套用全域設定。
這可讓您避免設定每個 broker 的個別或每個命名空間設定,例如 metadata.annotations.eventing.knative.dev/broker.class
或 spec.config
。
以下 YAML 是使用 Kafka broker 作為預設實作的 config-br-defaults
ConfigMap 範例。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespaceDefaults:
namespace1:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespace2:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
安全性¶
Apache Kafka 支援不同的安全性功能,Knative 支援以下功能
若要啟用安全性功能,在 broker.spec.config
參考的 ConfigMap
中,我們可以參考 Secret
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Other configurations
# ...
# Reference a Secret called my_secret
auth.secret.ref.name: my_secret
Secret
my_secret
必須存在於 broker.spec.config
參考的 ConfigMap
的相同命名空間中,在此情況下:knative-eventing
。
注意
憑證和金鑰必須採用 PEM
格式。
使用 SASL 進行身分驗證¶
Knative 支援以下 SASL 機制
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
若要使用特定 SASL 機制,請將 <sasl_mechanism>
取代為您選擇的機制。
使用 SASL 進行身分驗證而不加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SASL 進行身分驗證並使用 SSL 進行加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SSL 進行加密而不進行用戶端身分驗證¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
使用 SSL 進行身分驗證和加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注意
可以省略 ca.crt
,以恢復使用系統的根 CA 集。
攜帶您自己的主題¶
預設情況下,Knative Kafka Broker 會建立自己的內部主題,但是,可以使用 kafka.eventing.knative.dev/external.topic
註釋指向外部管理的主題
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
kafka.eventing.knative.dev/external.topic: <my-topic-name>
name: default
namespace: default
spec:
# other spec fields ...
注意
使用外部主題時,Knative Kafka Broker 不擁有該主題,也不負責管理該主題。這包括主題生命週期或其一般有效性。其他對主題一般存取的限制可能適用。請參閱有關使用存取控制清單 (ACL)的文件。
消費者偏移量認可間隔¶
Kafka 消費者會透過認可偏移量來追蹤上次成功傳送的事件。
Knative Kafka Broker 每 auto.commit.interval.ms
毫秒認可一次偏移量。
注意
為了防止對效能造成負面影響,不建議每次將事件成功傳送給訂閱者時都認可偏移量。
可以透過修改 knative-eventing
命名空間中的 config-kafka-broker-data-plane
ConfigMap
來變更間隔,方法是修改參數 auto.commit.interval.ms
,如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-broker-data-plane
namespace: knative-eventing
data:
# Some configurations omitted ...
config-kafka-broker-consumer.properties: |
# Some configurations omitted ...
# Commit the offset every 5000 millisecods (5 seconds)
auto.commit.interval.ms=5000
注意
Knative Kafka Broker 保證至少傳遞一次,這表示您的應用程式可能會收到重複的事件。較高的認可間隔表示收到重複事件的可能性較高,因為當消費者重新啟動時,它會從上次認可的偏移量重新啟動。
Kafka 生產者和消費者設定¶
Knative 會公開所有可用的 Kafka 生產者和消費者設定,可以修改這些設定以符合您的工作負載。
您可以修改 knative-eventing
命名空間中的 config-kafka-broker-data-plane
ConfigMap
來變更這些設定。
此 ConfigMap
中可用設定的文件可在 Apache Kafka 網站上找到,特別是 生產者設定 和 消費者設定。
啟用資料平面元件的偵錯日誌¶
以下 YAML 顯示資料平面元件的預設日誌設定,該設定是在安裝步驟期間建立的
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
若要將日誌層級變更為 DEBUG
,您必須
-
套用以下
kafka-config-logging
ConfigMap
,或將ConfigMap
kafka-config-logging
中的level="INFO"
取代為level="DEBUG"
apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
輸入以下指令,重新啟動
kafka-broker-receiver
和kafka-broker-dispatcher
kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
設定已傳遞事件的順序¶
在分派事件時,可以設定 Kafka Broker 以支援不同的傳遞順序保證。
您可以使用 Trigger
物件上的 kafka.eventing.knative.dev/delivery.order
註釋來設定事件的傳遞順序
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-service-trigger
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
spec:
broker: my-kafka-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-service
支援的消費者傳遞保證為
unordered
:無序消費者是非阻塞消費者,會以無序方式傳遞訊息,同時保留適當的偏移管理。當需要高度並行消費且不需要明確排序時很有用。一個例子可以是點擊分析的處理。ordered
:有序消費者是每個分割區的阻塞消費者,它會在傳遞分割區的下一則訊息之前,等待 CloudEvent 訂閱者的成功回應。當需要更嚴格的排序或事件之間存在關係或群組時很有用。一個例子可以是客戶訂單的處理。
unordered
傳遞是預設的排序保證。
資料平面隔離與共用資料平面¶
Knative Kafka Broker 實作有 2 個平面:控制平面和資料平面。控制平面由與 Kubernetes API 通訊、監看自訂物件並管理資料平面的控制器組成。
資料平面是接聽傳入事件、與 Apache Kafka 通訊以及將事件傳送到事件接收器的元件集合。這就是事件流動的地方。Knative Kafka Broker 資料平面由 kafka-broker-receiver
和 kafka-broker-dispatcher
部署組成。
當使用 Broker 類別 Kafka
時,Knative Kafka Broker 會使用共用資料平面。這表示 knative-eventing
命名空間中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署會用於叢集中的所有 Kafka Broker。
但是,當 KafkaNamespaced
設定為 Broker 類別時,Kafka broker 控制器會為每個存在 broker 的命名空間建立新的資料平面。此資料平面由該命名空間中的所有 KafkaNamespaced
broker 使用。
這提供了資料平面之間的隔離,這表示使用者命名空間中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署僅用於該命名空間中的 broker。
注意
由於有單獨的資料平面,此安全性功能會建立更多部署並使用更多資源。除非您有此類隔離要求,否則建議使用具有 Kafka
類別的「一般」Broker。
若要建立 KafkaNamespaced
broker,您必須將 eventing.knative.dev/broker.class
註釋設定為 KafkaNamespaced
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: KafkaNamespaced
name: default
namespace: my-namespace
spec:
config:
# the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
apiVersion: v1
kind: ConfigMap
name: my-config
# namespace: my-namespace # no need to define, defaults to Broker's namespace
注意
spec.config
中指定的 configmap
必須與 Broker
物件位於相同的命名空間中
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: my-namespace
data:
...
在建立第一個具有 KafkaNamespaced
類別的 Broker
後,kafka-broker-receiver
和 kafka-broker-dispatcher
部署會在命名空間中建立。之後,相同命名空間中所有具有 KafkaNamespaced
類別的 broker 都會使用相同的資料平面。當命名空間中沒有 KafkaNamespaced
類別的 broker 時,命名空間中的資料平面將會被刪除。
設定 KafkaNamespaced
broker¶
適用於 Kafka
Broker 類別的所有設定機制也適用於具有 KafkaNamespaced
類別的 broker,但有以下例外情況
- 此頁面說明如何透過修改
knative-eventing
命名空間中的config-kafka-broker-data-plane
configmap 來完成生產者和消費者設定。由於 Kafka Broker 控制器會將此 configmap 傳播到使用者命名空間中,因此目前無法設定每個命名空間的生產者和消費者設定。knative-eventing
命名空間中config-kafka-broker-data-plane
ConfigMap
中設定的任何值也會在使用者命名空間中使用。 - 由於相同的傳播,也無法設定每個命名空間的消費者偏移量提交間隔。
- 會傳播更多 configmap:
config-tracing
和kafka-config-logging
。這表示追蹤和記錄也無法針對每個命名空間設定。 - 同樣地,資料平面部署會從
knative-eventing
命名空間傳播到使用者命名空間。這表示資料平面部署無法針對每個命名空間設定,且將與knative-eventing
命名空間中的部署相同。
使用 KEDA 啟用和設定觸發程序的自動調整¶
若要使用 KEDA 啟用和設定參照 Kafka Broker 的觸發程序自動調整,請依照此處的指示。
其他資訊¶
- 若要回報錯誤或要求功能,請在 eventing-kafka-broker 儲存庫中開啟問題。