使用 Knative Eventing、Functions 和 Apache Camel K 取用 AWS S3 事件 ¶
發布於:2024-06-14
使用 Knative Eventing、Functions 和 Apache Camel K 取用 AWS S3 事件¶
作者:Matthias Weßendorf,Red Hat 高級首席軟體工程師
在這篇部落格文章中,您將學習如何使用 Knative Eventing 和 Functions,輕鬆地從 Knative Function 中的 AWS S3 儲存貯體取用事件。這篇部落格文章是基於 Apache Camel K 和 Knative 系列的第一篇文章
無伺服器函數的典型使用案例之一是回應由外部事件來源傳遞的事件。一個常見的例子是在無伺服器函數中接收來自 AWS S3 儲存貯體的通知。但是,您如何在內部環境中接收這些事件,其中函數不是在 AWS 上執行,而是在自訂 Kubernetes 設定上執行?
安裝¶
Apache Camel K 的安裝提供了一些選擇,例如 CLI、Kustomize、OLM 或 Helm。例如,Helm 安裝是
$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k
除了 Camel K 之外,我們還需要安裝 Knative Eventing,如文件中所述。
建立 Knative Broker 執行個體¶
我們使用 Knative Broker 作為我們系統的核心,充當事件產生者和事件取用者的事件網格
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
namespace: default
name: my-broker
現在事件產生者可以將事件傳送到它,而事件取用者可以接收事件。
使用 Kamelets 作為 AWS S3 的事件來源¶
為了將 Kamelet *繫結*到 Knative 元件(例如我們上面建立的 Broker),我們使用 Pipe
API。Pipe 允許宣告式地將資料從 Kamelet 描述的系統 *移至* Knative 資源,*或* 從 Knative 資源移至 Kamelet 描述的另一個(外部)系統。
下面是一個 Pipe
,它使用現成的 Kamelet
,即 aws-s3-source
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: aws-s3-source-pipe
annotations:
trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
integration:
dependencies:
- "camel:cloudevents"
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: aws-s3-source
properties:
bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
accessKey: "${aws.s3.accessKey}"
secretKey: "${aws.s3.secretKey}"
region: "${aws.s3.region}"
dataTypes:
out:
scheme: aws2-s3
format: application-cloudevents
sink:
dataTypes:
in:
scheme: http
format: application-cloudevents
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: my-broker
aws-s3-source
Kamelet 被引用為 Pipe
的 source
,並在引用的儲存貯體中有活動時,將 CNCF CloudEvents 發送到出站 sink
。在這裡,我們使用 Knative Broker,它接受 CloudEvents。稍後,我們將 Knative Function 連接到 Broker 以接收來自它的事件。
注意
AWS S3 屬性儲存在一個機密中,該機密透過 trait.camel.apache.org/mount.config
註釋掛載到 Pipe
中。
建立 Knative Function 作為取用者¶
為了使用 Knative Function 從 Knative Broker 取用訊息,我們需要建立一個簡單的 Golang 函數。由於有效負載作為 CloudEvents 發送到函數,我們使用內建的 cloudevents
範本,方法是執行以下命令
$ func create -l go -t cloudevents s3-logger
這會在您目前目錄的 s3-logger
資料夾中為您建立一個新專案,其中包含 Knative Function 的 Golang 檔案
package function
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)
// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
/*
* YOUR CODE HERE
*
* Try running `go test`. Add more test as you code in `handle_test.go`.
*/
fmt.Println("Received event")
fmt.Println(ce) // echo to local output
return &e, nil // echo to caller
}
現在我們可以修改 Handle
函數以列印出 CloudEvent 的一些屬性
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
fmt.Println("Received S3 event notification")
fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
fmt.Println("CloudEvent Source attribute: " + ce.Source())
// Some processing of the payload of the CloudEvent...
return nil, nil
上面的 Handle
函數只會列印檔案名稱(由 subject
屬性表示)和儲存貯體的完整名稱(儲存為接收到的 CloudEvent 的 source
屬性)。
注意
目前,AWS-S3-Source 會傳送整個資料,因此也可以透過 data
屬性存取整個檔案。
訂閱 Knative Function 以接收 AWS S3 事件¶
為了能夠接收 S3 事件通知,我們需要將函數訂閱到提供事件的 Broker。但是,我們如何知道哪些事件可用?為此,我們檢查系統中可用的 Knative EventTypes
$ kubectl get eventtypes.eventing.knative.dev
NAME TYPE SOURCE SCHEMA REFERENCE NAME REFERENCE KIND DESCRIPTION READY REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263 org.apache.camel.event.aws.s3.getObject aws.s3.bucket.<bucketname> my-broker Broker Event Type auto-created by controller True
現在我們知道在我們的命名空間中,my-broker
Broker 上有 org.apache.camel.event.aws.s3.getObject
事件可用。
現在我們可以執行函數從給定的 Broker 訂閱該事件類型
$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker
這會更新專案的 func.yaml
中繼資料,並且在部署函數時,CLI 將建置並部署取用者,並為其建立一個具有相符篩選引數的 Knative Trigger
$ func deploy
部署後,對於*每個* S3 事件,在 Knative Function 的終端機上會顯示以下輸出
Received S3 event notification
CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute: aws.s3.bucket.<bucketname>
結論¶
使用 Knative Eventing、Functions 和 Apache Camel K,可以將來自第三方雲端服務(例如 AWS S3)的通知觸發到在您自己的內部 Kubernetes 叢集上執行的函數。