使用 Knative 在 Google Cloud 上建立事件驅動的影像和 BigQuery 處理管線 ¶
發佈日期: 2020-07-14 , 修訂日期: 2024-01-17
使用 Knative 在 Google Cloud 上建立事件驅動的影像和 BigQuery 處理管線¶
作者:Mete Atamel,Google 軟體工程師兼開發人員宣傳者
在這篇部落格文章中,我將概述我最近使用 Knative Eventing 建構的兩個事件驅動的處理管線。在此過程中,我將解釋 Knative 提供的事件來源、自訂事件和其他元件,這些元件大大簡化了事件驅動架構的開發。
這兩個管線都可以在 GitHub 上找到,包括原始碼、組態和詳細說明,作為我的 Knative 教學的一部分。
使用的 Knative 元件¶
在建立這些範例管線時,我依賴了一些 Knative 元件,這些元件大大簡化了我的開發。更具體地說
- 事件來源 可讓您讀取叢集中的外部事件。Knative-GCP 來源 提供許多可隨時讀取來自各種 Google Cloud 來源的事件的事件來源。
- Broker 和觸發程序 提供事件傳遞,而無需生產者或消費者知道事件是如何路由的。
- 自訂事件和事件回覆:在 Knative 中,所有事件都是 CloudEvents,因此擁有事件的標準格式和各種 SDK 來讀取/寫入它們非常有用。Knative 支援自訂事件和事件回覆。任何服務都可以接收事件、進行一些處理、使用新資料建立自訂事件,並回覆到 Broker,以便其他服務可以讀取自訂事件。這在管線中很有用,其中每個服務都做一些工作,並將訊息轉發到下一個服務。
影像處理管線¶
在這個影像處理管線範例中,使用者將影像上傳到 Google Cloud 上的儲存貯體、使用許多不同的 Knative 服務處理影像,並將處理後的影像儲存到輸出貯體。
我為管線定義了兩個要求
- 上傳的影像在透過管線傳送之前會經過篩選。例如,不允許成人主題或暴力影像。
- 管線可以包含任意數量的處理服務,可以根據需要新增或移除。
架構¶
本節說明影像處理管線的架構。該管線部署到 Google Cloud 上的 Google Kubernetes Engine (GKE)。
- 影像會儲存到輸入 Cloud Storage 貯體。
- CloudStorageSource 將 Cloud Storage 更新事件讀取到 Knative 中。
- 篩選服務接收 Cloud Storage 事件。它使用 Vision API 來判斷影像是否安全或應該篩選。如果影像安全,篩選服務會建立類型為
dev.knative.samples.fileuploaded
的自訂 CloudEvent,並將其傳回 Broker。 - 調整大小服務接收
fileuploaded
事件,然後使用 ImageSharp 程式庫調整影像大小。然後,該服務將調整大小的影像儲存到輸出貯體,建立類型為dev.knative.samples.fileresized
的自訂 CloudEvent,並將事件傳回 Broker。 - 浮水印服務接收
fileresized
事件,使用 ImageSharp 程式庫將浮水印新增至影像,並將影像儲存到輸出貯體。 - 標籤器接收
fileuploaded
事件,使用 Vision API 從影像中擷取標籤,並將標籤儲存到輸出貯體。
測試管線¶
為了測試管線,我將一張來自我最喜歡的海灘的照片(里約熱內盧的伊帕內瑪)上傳到貯體
幾秒鐘後,我在輸出貯體中看到 3 個檔案
gsutil ls gs://knative-atamel-images-output
gs://knative-atamel-images-output/beach-400x400-watermark.jpeg
gs://knative-atamel-images-output/beach-400x400.png
gs://knative-atamel-images-output/beach-labels.txt
我們可以在文字檔案中看到標籤 Sky,Body of water,Sea,Nature,Coast,Water,Sunset,Horizon,Cloud,Shore
,以及調整大小和加水印的影像
BigQuery 處理管線¶
這個管線範例是一個排程驅動的管線,它查詢並找到英國和賽普勒斯的每日 COVID-19 病例數。我使用了 BigQuery 上的公用 COVID-19 資料集來取得資料、產生圖表,並每天為每個國家/地區傳送一封電子郵件給自己,其中包含這些圖表。
架構¶
以下是管線的架構。
- 我為兩個國家/地區(英國和賽普勒斯)設定兩個
CloudSchedulerSources
,以每天呼叫一次QueryRunner
服務。 - QueryRunner 服務接收英國和賽普勒斯的排程器事件,使用 BigQuery 的公用 COVID-19 資料集查詢每個國家/地區的 COVID-19 病例,並將結果儲存到單獨的 BigQuery 資料表中。完成此操作後,QueryRunner 服務會傳回類型為
dev.knative.samples.querycompleted
的自訂 CloudEvent。 - ChartCreator 服務接收
querycompleted
CloudEvent,使用Matplotlib
從 BigQuery 資料建立圖表,並將其儲存到 Cloud Storage 貯體。 - 通知器服務是另一個從貯體透過 CloudStorageSource 接收
com.google.cloud.storage.object.finalize
CloudEvent 的服務,並使用 SendGrid 向使用者傳送電子郵件通知。
測試管線¶
CloudSchedulerSource 建立 CloudScheduler 作業
gcloud scheduler jobs list
ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE
cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 europe-west1 0 16 * * * (UTC) Pub/Sub ENABLED
cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869 europe-west1 0 17 * * * (UTC) Pub/Sub ENABLED
觸發作業
gcloud scheduler jobs run cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2
您應該會在幾分鐘內收到一封包含類似於此圖表的電子郵件
這總結了我的文章。正如我已經提到的,如果您想要更詳細的說明,可以查看我的 image-processing-pipeline 和 bigquery-processing-pipeline,作為我的 Knative 教學的一部分
如果您有任何問題/意見,請隨時在 Twitter 上與我聯繫 @meteatamel)。
作者:Mete Atamel - Google Cloud 開發者推廣工程師