跳至內容

使用 Knative 和 Restate 建立具狀態的應用程式

發佈於:2024-09-02 ,  修改於:2024-09-04

使用 Knative 和 Restate 建立具狀態的應用程式

Building Stateful applications with Knative and Restate

作者:Francesco Guardiani,Restate 資深軟體工程師,Giselle van Dongen,Restate 開發人員倡導者

Knative 徹底改變了在 Kubernetes 上開發和運作無伺服器應用程式的方式,但要在其之上建立具狀態的應用程式仍然相當具有挑戰性。

例如,假設您想要建立一個需要持久化某些狀態的應用程式。為了做到這一點,您可能需要將您的服務連接到資料庫,而這樣做時,您需要處理重試、重複事件、雙寫以及各種其他分散式系統問題。

再舉一個例子,假設您想要建立一個需要呼叫不同服務的服務協調器,並在其中一個服務失敗時最終補償某些操作。理想情況下,您只想編寫一些按順序執行一個接一個操作的程式碼,如果其中一個操作失敗,則執行回滾。但實際上,這並不容易,因為您需要解決諸如呼叫下游服務時的重試、協調器服務的失敗,甚至在呼叫下游服務時更棘手的長時間等待等問題。

如果您可以將應用程式狀態嵌入其中,並在 Knative 服務中執行複雜的服務協調,而無需處理上述任何問題,那會怎麼樣?

介紹 Restate

Restate 是一個開源的持久執行引擎,用於建立具狀態的無伺服器應用程式。換句話說,您建立的程式碼看起來像一般的 RPC 服務,並且程式碼是持久執行的,也就是說引擎會儲存執行進度。在崩潰之後,引擎會透明地將應用程式還原到先前的狀態,並從它停止的位置恢復執行。

記錄執行進度的另一個方面是,如果等待時間很長,例如由於服務回應速度慢,引擎會自動暫停執行,以避免浪費運算資源。實際上,這意味著在「等待時間」期間,應用程式可以縮減至零!

透過一起使用 Restate 和 Knative,您可以開發具狀態的實體、協調微服務、實作 Saga 模式、重複資料刪除事件,同時在不需要工作時能夠縮減至零。Restate 將負責處理諸如狀態一致性、跨服務通訊、故障恢復等困難的分散式系統問題。

使用 Restate,您可以使用可用的 Restate SDK 之一來建立應用程式,然後將其部署為無伺服器/無狀態 HTTP 伺服器,例如使用 Knative 服務。目前,Restate 支援 Golang、Java、Kotlin、Typescript、Rust 和 Python。要呼叫您的服務,您需要將請求傳送至 Restate 而不是直接傳送至您的服務,這樣 Restate 就像您的用戶端和您的服務之間的「代理」。

要部署 Restate 引擎,有不同的策略:您可以將其作為 k8s 叢集上的具狀態部署進行部署,類似於您部署資料庫的方式,或者您可以使用 Restate Cloud 受管服務。如需更多資訊,請參閱 如何部署 Restate

註冊流程範例

為了讓您了解它的運作方式,我將向您展示一個如何一起使用 Knative 和 Restate 建立註冊流程的範例。範例應用程式組成如下

  • 一個使用者服務,我們在其中儲存使用者資訊。
  • 一個註冊服務,它封裝了註冊新使用者、傳送確認電子郵件以及之後啟用的流程。

使用者服務

讓我們先從使用者服務開始。

要建立它,我們將建立一個 Restate 虛擬物件,這是一個抽象概念,用於封裝一組 RPC 處理常式,並與其關聯的 K/V 儲存。虛擬物件可透過金鑰定址,您在呼叫其處理常式之一時提供該金鑰。此外,虛擬物件每個金鑰都有一個內建鎖定,這意味著 Restate 將確保對於給定的金鑰,最多只能執行一個請求,任何其他請求將在每個金鑰佇列中排隊。

讓我們先定義取得使用者資料的處理常式

// Struct to encapsulate the user service logic
type userObject struct{}

// User struct definition, ser/deserializeable with json
type User struct {
    Name     string `json:"name"`
    Surname  string `json:"surname"`
    Password string `json:"password"`
}

func (t *userObject) Get(ctx restate.ObjectSharedContext) (User, error) {
    return restate.Get[User](ctx, "user")
}

每個 Restate 處理常式都會使用 Context 呼叫,這是一個封裝 Restate 向開發人員公開的各種功能的介面。此內容會因處理常式的類型而異。

在這種情況下,我們使用 restate.Get,它從 Restate 的虛擬物件 K/V 儲存讀取值。

然後,我們可以定義處理常式來 Initialize 使用者

// Initialize will initialize the user object
func (t *userObject) Initialize(ctx restate.ObjectContext, user User) error {
    // Check if the user doesn't exist first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr != nil {
        return restate.TerminalError(fmt.Errorf("the user was already initialized"))
    }

    // Store the user
    restate.Set(ctx, "user", user)

    // Store the unactivated status
    restate.Set(ctx, "activated", false)

    return nil
}

restate.Get 類似,使用 restate.Set,我們可以寫入虛擬物件 K/V 儲存。

最後,在使用者初始化後 Activate 使用者的處理常式

// Activate will signal the user is activated
func (t *userObject) Activate(ctx restate.ObjectContext) error {
    // Check if the user exists first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr == nil {
        return restate.TerminalError(fmt.Errorf("the user doesn't exist"))
    }

    // Store the activated status
    restate.Set(ctx, "activated", false)

    return nil
}

我們現在準備好實作註冊服務。

註冊服務

註冊服務有一個單一處理程序來協調註冊流程

func (t *signupService) Signup(ctx restate.Context, newUser NewUser) (string, error) {
    // Initialize the newUser first
    user := User{
        Name:     newUser.Name,
        Surname:  newUser.Surname,
        Password: newUser.Password,
    }
    _, err := restate.Object[restate.Void](ctx, "User", newUser.Username, "Initialize").Request(user)
    if err != nil {
        return "", err
    }

    // Prepare an awakeable to await the email activation
    awakeable := restate.Awakeable[restate.Void](ctx)

    // Send the activation email
    _, err = restate.Run[restate.Void](ctx, func(ctx restate.RunContext) (restate.Void, error) {
        return restate.Void{}, sendEmail(newUser.Username, awakeable.Id())
    })
    if err != nil {
        return "", err
    }

    // Await the activation
    _, err = awakeable.Result()
    if err != nil {
        return "", err
    }

    // Activate the user
    _, err = restate.Object[restate.Void](ctx, "User", newUser.Username, "Activate").Request(user)
    if err != nil {
        return "", err
    }

    return fmt.Sprintf("The new user %s is signed up and activated", newUser.Username), nil
}

透過 restate.Call,我們可以調用其他 Restate 服務。這些請求保證只會執行一次。

使用 restate.Awakeable,我們可以等待任意事件發生。您可以簡單地透過向 Restate 發送 HTTP 請求並提供 Awakeable id 來完成請求。在我們的範例中,電子郵件將嵌入一個包含 Awakeable id 的連結,一旦使用者點擊驗證按鈕,該連結就會被完成。

透過 restate.Run,我們可以執行任何任意程式碼並將結果記憶化,這樣一來,如果發生崩潰,Restate 不會重新執行該程式碼區塊,而是會載入儲存的結果並將其用於後續操作。

啟動 HTTP 服務並使用 Knative 部署它

使用 HTTP 公開服務

func main() {
    // Read PORT env injected by Knative Serving
    port := os.Getenv("PORT")
    if port == "" {
        port = "9080"
    }
    bindAddress := fmt.Sprintf(":%s", port)

    // Bind services to the Restate HTTP/2 server
    srv := server.NewRestate().
        Bind(restate.Reflect(&userObject{})).
        Bind(restate.Reflect(&signupService{}))

    // Start HTTP/2 server
    if err := srv.Start(context.Background(), bindAddress); err != nil {
        slog.Error("application exited unexpectedly", "err", err.Error())
        os.Exit(1)
    }
}

您現在可以使用您的工具(例如 ko)建立容器映像

$ ko build main.go -B

並使用 kn 部署它

$ kn service create signup \
  --image $MY_IMAGE_REGISTRY/main.go \
  --port h2c:8080

在發送請求之前,您需要告知 Restate 您新的服務部署

$ restate deployments register http://signup.default.svc

就是這樣!您現在可以開始發送請求了

$ curl https://#:8080/Signup/Signup --json '{"username": "slinkydeveloper", "name": "Francesco", "surname": "Guardiani", "password": "Pizza-without-pineapple"}'

請注意:為了簡潔起見,程式碼範例的某些部分已省略,請查看完整範例以取得更多詳細資訊,以及如何使用 kind 在本機執行此範例。

我們為您撐腰

讓我們假設 Signup 流程中的 sendEmail 函數在我們第一次嘗試註冊時失敗,會發生什麼事?

如果沒有 Restate,您需要在迴圈中重試執行 sendEmail 幾次。但是,如果當重試執行 sendEmail 時,**註冊服務崩潰或消失**呢?在這種情況下,您將失去對註冊進度的追蹤,並且下次使用者按下 F5 時,您需要一些邏輯來重建先前的註冊狀態和/或將其捨棄。

使用 Restate,如果 sendEmail 失敗,它將自動重試,而所有先前已執行的操作(在本例中是對 User/Initialize 處理程序的呼叫)將不會再次執行,而是會簡單地還原其結果值。

這要歸功於 Restate 的持久執行引擎,它會記錄應用程式的進度,並且在發生崩潰時,它會從上次中斷的地方重新啟動。更重要的是,Restate 能夠在無法取得進展時暫停執行,例如在長時間休眠的情況下,或是在等待來自另一個服務的回應時,所有這些都不會將您的業務邏輯分割成一系列不同的處理程序。沒錯,您理解正確,**在等待期間,您的 Knative 服務可以縮減到零!**

下一步是什麼

在這篇文章中,我們研究了如何使用 Restate 建立一個有狀態的實體和一個簡單的協調流程,並將其部署到 Knative 上。

透過將 Restate 和 Knative 結合在一起,您可以獲得兩全其美的優勢,因為您可以輕鬆地開發有狀態的應用程式來建立無伺服器應用程式。

將 Restate 和 Knative 結合在一起,您可以建立更多內容:工作流程saga有狀態事件處理(也結合 Knative Eventing!),僅舉幾個例子。查看 Restate 範例以了解可以建構哪些內容:https://github.com/restatedev/examples

我們使用分析和 Cookie 來了解網站流量。您使用我們網站的資訊會與 Google 分享以達到此目的。了解更多。