跳至內容

使用 Knative 在 Google Cloud 上建立事件驅動的影像和 BigQuery 處理管線

發佈日期: 2020-07-14 ,  修訂日期: 2024-01-17

使用 Knative 在 Google Cloud 上建立事件驅動的影像和 BigQuery 處理管線

作者:Mete Atamel,Google 軟體工程師兼開發人員宣傳者

在這篇部落格文章中,我將概述我最近使用 Knative Eventing 建構的兩個事件驅動的處理管線。在此過程中,我將解釋 Knative 提供的事件來源、自訂事件和其他元件,這些元件大大簡化了事件驅動架構的開發。

這兩個管線都可以在 GitHub 上找到,包括原始碼、組態和詳細說明,作為我的 Knative 教學的一部分。

使用的 Knative 元件

在建立這些範例管線時,我依賴了一些 Knative 元件,這些元件大大簡化了我的開發。更具體地說

  1. 事件來源 可讓您讀取叢集中的外部事件。Knative-GCP 來源 提供許多可隨時讀取來自各種 Google Cloud 來源的事件的事件來源。
  2. Broker 和觸發程序 提供事件傳遞,而無需生產者或消費者知道事件是如何路由的。
  3. 自訂事件和事件回覆:在 Knative 中,所有事件都是 CloudEvents,因此擁有事件的標準格式和各種 SDK 來讀取/寫入它們非常有用。Knative 支援自訂事件和事件回覆。任何服務都可以接收事件、進行一些處理、使用新資料建立自訂事件,並回覆到 Broker,以便其他服務可以讀取自訂事件。這在管線中很有用,其中每個服務都做一些工作,並將訊息轉發到下一個服務。

影像處理管線

在這個影像處理管線範例中,使用者將影像上傳到 Google Cloud 上的儲存貯體、使用許多不同的 Knative 服務處理影像,並將處理後的影像儲存到輸出貯體。

我為管線定義了兩個要求

  1. 上傳的影像在透過管線傳送之前會經過篩選。例如,不允許成人主題或暴力影像。
  2. 管線可以包含任意數量的處理服務,可以根據需要新增或移除。

架構

本節說明影像處理管線的架構。該管線部署到 Google Cloud 上的 Google Kubernetes Engine (GKE)。

Image processing pipeline architecture

  1. 影像會儲存到輸入 Cloud Storage 貯體。
  2. CloudStorageSource 將 Cloud Storage 更新事件讀取到 Knative 中。
  3. 篩選服務接收 Cloud Storage 事件。它使用 Vision API 來判斷影像是否安全或應該篩選。如果影像安全,篩選服務會建立類型為 dev.knative.samples.fileuploaded 的自訂 CloudEvent,並將其傳回 Broker。
  4. 調整大小服務接收 fileuploaded 事件,然後使用 ImageSharp 程式庫調整影像大小。然後,該服務將調整大小的影像儲存到輸出貯體,建立類型為 dev.knative.samples.fileresized 的自訂 CloudEvent,並將事件傳回 Broker。
  5. 浮水印服務接收 fileresized 事件,使用 ImageSharp 程式庫將浮水印新增至影像,並將影像儲存到輸出貯體。
  6. 標籤器接收 fileuploaded 事件,使用 Vision API 從影像中擷取標籤,並將標籤儲存到輸出貯體。

測試管線

為了測試管線,我將一張來自我最喜歡的海灘的照片(里約熱內盧的伊帕內瑪)上傳到貯體

Beach with sunset

幾秒鐘後,我在輸出貯體中看到 3 個檔案

gsutil ls gs://knative-atamel-images-output

gs://knative-atamel-images-output/beach-400x400-watermark.jpeg
gs://knative-atamel-images-output/beach-400x400.png
gs://knative-atamel-images-output/beach-labels.txt

我們可以在文字檔案中看到標籤 Sky,Body of water,Sea,Nature,Coast,Water,Sunset,Horizon,Cloud,Shore,以及調整大小和加水印的影像

Beach with sunset

BigQuery 處理管線

這個管線範例是一個排程驅動的管線,它查詢並找到英國和賽普勒斯的每日 COVID-19 病例數。我使用了 BigQuery 上的公用 COVID-19 資料集來取得資料、產生圖表,並每天為每個國家/地區傳送一封電子郵件給自己,其中包含這些圖表。

架構

以下是管線的架構。

BigQuery processing pipeline architecture

  1. 我為兩個國家/地區(英國和賽普勒斯)設定兩個 CloudSchedulerSources,以每天呼叫一次 QueryRunner 服務。
  2. QueryRunner 服務接收英國和賽普勒斯的排程器事件,使用 BigQuery 的公用 COVID-19 資料集查詢每個國家/地區的 COVID-19 病例,並將結果儲存到單獨的 BigQuery 資料表中。完成此操作後,QueryRunner 服務會傳回類型為 dev.knative.samples.querycompleted 的自訂 CloudEvent。
  3. ChartCreator 服務接收 querycompleted CloudEvent,使用 Matplotlib 從 BigQuery 資料建立圖表,並將其儲存到 Cloud Storage 貯體。
  4. 通知器服務是另一個從貯體透過 CloudStorageSource 接收 com.google.cloud.storage.object.finalize CloudEvent 的服務,並使用 SendGrid 向使用者傳送電子郵件通知。

測試管線

CloudSchedulerSource 建立 CloudScheduler 作業

gcloud scheduler jobs list

ID                                                  LOCATION      SCHEDULE (TZ)          TARGET_TYPE  STATE
cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2  europe-west1  0 16 * * * (UTC)       Pub/Sub      ENABLED
cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869  europe-west1  0 17 * * * (UTC)       Pub/Sub      ENABLED

觸發作業

gcloud scheduler jobs run cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2

您應該會在幾分鐘內收到一封包含類似於此圖表的電子郵件

Chart - United Kingdom

這總結了我的文章。正如我已經提到的,如果您想要更詳細的說明,可以查看我的 image-processing-pipelinebigquery-processing-pipeline,作為我的 Knative 教學的一部分

如果您有任何問題/意見,請隨時在 Twitter 上與我聯繫 @meteatamel)。


作者:Mete Atamel - Google Cloud 開發者推廣工程師

我們使用分析和 Cookie 來了解網站流量。為此目的,您使用我們網站的資訊會與 Google 分享。了解更多。