跳至內容

使用 Knative Eventing、Functions 和 Apache Camel K 取用 AWS S3 事件

發布於:2024-06-14

使用 Knative Eventing、Functions 和 Apache Camel K 取用 AWS S3 事件

作者:Matthias Weßendorf,Red Hat 高級首席軟體工程師

在這篇部落格文章中,您將學習如何使用 Knative Eventing 和 Functions,輕鬆地從 Knative Function 中的 AWS S3 儲存貯體取用事件。這篇部落格文章是基於 Apache Camel K 和 Knative 系列的第一篇文章

無伺服器函數的典型使用案例之一是回應由外部事件來源傳遞的事件。一個常見的例子是在無伺服器函數中接收來自 AWS S3 儲存貯體的通知。但是,您如何在內部環境中接收這些事件,其中函數不是在 AWS 上執行,而是在自訂 Kubernetes 設定上執行?

安裝

Apache Camel K 的安裝提供了一些選擇,例如 CLI、Kustomize、OLM 或 Helm。例如,Helm 安裝是

$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k

除了 Camel K 之外,我們還需要安裝 Knative Eventing,如文件中所述。

建立 Knative Broker 執行個體

我們使用 Knative Broker 作為我們系統的核心,充當事件產生者和事件取用者的事件網格

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  namespace: default
  name: my-broker

現在事件產生者可以將事件傳送到它,而事件取用者可以接收事件。

使用 Kamelets 作為 AWS S3 的事件來源

為了將 Kamelet *繫結*到 Knative 元件(例如我們上面建立的 Broker),我們使用 Pipe API。Pipe 允許宣告式地將資料從 Kamelet 描述的系統 *移至* Knative 資源,*或* 從 Knative 資源移至 Kamelet 描述的另一個(外部)系統。

下面是一個 Pipe,它使用現成的 Kamelet,即 aws-s3-source

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: aws-s3-source-pipe
  annotations:
    trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
  integration:
    dependencies:
    - "camel:cloudevents"
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-s3-source
    properties:
      bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
      accessKey: "${aws.s3.accessKey}"
      secretKey: "${aws.s3.secretKey}"
      region: "${aws.s3.region}"
    dataTypes:
      out:
        scheme: aws2-s3
        format: application-cloudevents
  sink:
    dataTypes:
      in:
        scheme: http
        format: application-cloudevents
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: my-broker

aws-s3-source Kamelet 被引用為 Pipesource,並在引用的儲存貯體中有活動時,將 CNCF CloudEvents 發送到出站 sink。在這裡,我們使用 Knative Broker,它接受 CloudEvents。稍後,我們將 Knative Function 連接到 Broker 以接收來自它的事件。

注意

AWS S3 屬性儲存在一個機密中,該機密透過 trait.camel.apache.org/mount.config 註釋掛載到 Pipe 中。

建立 Knative Function 作為取用者

為了使用 Knative Function 從 Knative Broker 取用訊息,我們需要建立一個簡單的 Golang 函數。由於有效負載作為 CloudEvents 發送到函數,我們使用內建的 cloudevents 範本,方法是執行以下命令

$ func create -l go -t cloudevents s3-logger

這會在您目前目錄的 s3-logger 資料夾中為您建立一個新專案,其中包含 Knative Function 的 Golang 檔案

package function

import (
    "context"
    "fmt"

    "github.com/cloudevents/sdk-go/v2/event"
)

// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    /*
     * YOUR CODE HERE
     *
     * Try running `go test`.  Add more test as you code in `handle_test.go`.
     */

    fmt.Println("Received event")
    fmt.Println(ce) // echo to local output
    return &e, nil // echo to caller
}

現在我們可以修改 Handle 函數以列印出 CloudEvent 的一些屬性

func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    fmt.Println("Received S3 event notification")
    fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
    fmt.Println("CloudEvent Source attribute:  " + ce.Source())

  // Some processing of the payload of the CloudEvent...

    return nil, nil

上面的 Handle 函數只會列印檔案名稱(由 subject 屬性表示)和儲存貯體的完整名稱(儲存為接收到的 CloudEvent 的 source 屬性)。

注意

目前,AWS-S3-Source 會傳送整個資料,因此也可以透過 data 屬性存取整個檔案。

訂閱 Knative Function 以接收 AWS S3 事件

為了能夠接收 S3 事件通知,我們需要將函數訂閱到提供事件的 Broker。但是,我們如何知道哪些事件可用?為此,我們檢查系統中可用的 Knative EventTypes

$ kubectl get eventtypes.eventing.knative.dev 
NAME                                            TYPE                                      SOURCE                       SCHEMA   REFERENCE NAME   REFERENCE KIND   DESCRIPTION                             READY   REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263   org.apache.camel.event.aws.s3.getObject   aws.s3.bucket.<bucketname>            my-broker        Broker           Event Type auto-created by controller   True

現在我們知道在我們的命名空間中,my-broker Broker 上有 org.apache.camel.event.aws.s3.getObject 事件可用。

現在我們可以執行函數從給定的 Broker 訂閱該事件類型

$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker

這會更新專案的 func.yaml 中繼資料,並且在部署函數時,CLI 將建置並部署取用者,並為其建立一個具有相符篩選引數的 Knative Trigger

$ func deploy

部署後,對於*每個* S3 事件,在 Knative Function 的終端機上會顯示以下輸出

Received S3 event notification

CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute:  aws.s3.bucket.<bucketname>

結論

使用 Knative Eventing、Functions 和 Apache Camel K,可以將來自第三方雲端服務(例如 AWS S3)的通知觸發到在您自己的內部 Kubernetes 叢集上執行的函數。

我們使用分析和 Cookie 來瞭解網站流量。關於您使用我們網站的資訊會與 Google 共用以達到此目的。瞭解詳情。