使用 Knative 和 Restate 建立具狀態的應用程式 ¶
發佈於:2024-09-02 , 修改於:2024-09-04
使用 Knative 和 Restate 建立具狀態的應用程式¶
作者:Francesco Guardiani,Restate 資深軟體工程師,Giselle van Dongen,Restate 開發人員倡導者
Knative 徹底改變了在 Kubernetes 上開發和運作無伺服器應用程式的方式,但要在其之上建立具狀態的應用程式仍然相當具有挑戰性。
例如,假設您想要建立一個需要持久化某些狀態的應用程式。為了做到這一點,您可能需要將您的服務連接到資料庫,而這樣做時,您需要處理重試、重複事件、雙寫以及各種其他分散式系統問題。
再舉一個例子,假設您想要建立一個需要呼叫不同服務的服務協調器,並在其中一個服務失敗時最終補償某些操作。理想情況下,您只想編寫一些按順序執行一個接一個操作的程式碼,如果其中一個操作失敗,則執行回滾。但實際上,這並不容易,因為您需要解決諸如呼叫下游服務時的重試、協調器服務的失敗,甚至在呼叫下游服務時更棘手的長時間等待等問題。
如果您可以將應用程式狀態嵌入其中,並在 Knative 服務中執行複雜的服務協調,而無需處理上述任何問題,那會怎麼樣?
介紹 Restate¶
Restate 是一個開源的持久執行引擎,用於建立具狀態的無伺服器應用程式。換句話說,您建立的程式碼看起來像一般的 RPC 服務,並且程式碼是持久執行的,也就是說引擎會儲存執行進度。在崩潰之後,引擎會透明地將應用程式還原到先前的狀態,並從它停止的位置恢復執行。
記錄執行進度的另一個方面是,如果等待時間很長,例如由於服務回應速度慢,引擎會自動暫停執行,以避免浪費運算資源。實際上,這意味著在「等待時間」期間,應用程式可以縮減至零!
透過一起使用 Restate 和 Knative,您可以開發具狀態的實體、協調微服務、實作 Saga 模式、重複資料刪除事件,同時在不需要工作時能夠縮減至零。Restate 將負責處理諸如狀態一致性、跨服務通訊、故障恢復等困難的分散式系統問題。
使用 Restate,您可以使用可用的 Restate SDK 之一來建立應用程式,然後將其部署為無伺服器/無狀態 HTTP 伺服器,例如使用 Knative 服務。目前,Restate 支援 Golang、Java、Kotlin、Typescript、Rust 和 Python。要呼叫您的服務,您需要將請求傳送至 Restate 而不是直接傳送至您的服務,這樣 Restate 就像您的用戶端和您的服務之間的「代理」。
要部署 Restate 引擎,有不同的策略:您可以將其作為 k8s 叢集上的具狀態部署進行部署,類似於您部署資料庫的方式,或者您可以使用 Restate Cloud 受管服務。如需更多資訊,請參閱 如何部署 Restate。
註冊流程範例¶
為了讓您了解它的運作方式,我將向您展示一個如何一起使用 Knative 和 Restate 建立註冊流程的範例。範例應用程式組成如下
- 一個使用者服務,我們在其中儲存使用者資訊。
- 一個註冊服務,它封裝了註冊新使用者、傳送確認電子郵件以及之後啟用的流程。
使用者服務¶
讓我們先從使用者服務開始。
要建立它,我們將建立一個 Restate 虛擬物件,這是一個抽象概念,用於封裝一組 RPC 處理常式,並與其關聯的 K/V 儲存。虛擬物件可透過金鑰定址,您在呼叫其處理常式之一時提供該金鑰。此外,虛擬物件每個金鑰都有一個內建鎖定,這意味著 Restate 將確保對於給定的金鑰,最多只能執行一個請求,任何其他請求將在每個金鑰佇列中排隊。
讓我們先定義取得使用者資料的處理常式
// Struct to encapsulate the user service logic
type userObject struct{}
// User struct definition, ser/deserializeable with json
type User struct {
Name string `json:"name"`
Surname string `json:"surname"`
Password string `json:"password"`
}
func (t *userObject) Get(ctx restate.ObjectSharedContext) (User, error) {
return restate.Get[User](ctx, "user")
}
每個 Restate 處理常式都會使用 Context
呼叫,這是一個封裝 Restate 向開發人員公開的各種功能的介面。此內容會因處理常式的類型而異。
在這種情況下,我們使用 restate.Get
,它從 Restate 的虛擬物件 K/V 儲存讀取值。
然後,我們可以定義處理常式來 Initialize
使用者
// Initialize will initialize the user object
func (t *userObject) Initialize(ctx restate.ObjectContext, user User) error {
// Check if the user doesn't exist first
usr, err := restate.Get[*User](ctx, "user")
if err != nil {
return err
}
if usr != nil {
return restate.TerminalError(fmt.Errorf("the user was already initialized"))
}
// Store the user
restate.Set(ctx, "user", user)
// Store the unactivated status
restate.Set(ctx, "activated", false)
return nil
}
與 restate.Get
類似,使用 restate.Set
,我們可以寫入虛擬物件 K/V 儲存。
最後,在使用者初始化後 Activate
使用者的處理常式
// Activate will signal the user is activated
func (t *userObject) Activate(ctx restate.ObjectContext) error {
// Check if the user exists first
usr, err := restate.Get[*User](ctx, "user")
if err != nil {
return err
}
if usr == nil {
return restate.TerminalError(fmt.Errorf("the user doesn't exist"))
}
// Store the activated status
restate.Set(ctx, "activated", false)
return nil
}
我們現在準備好實作註冊服務。
註冊服務¶
註冊服務有一個單一處理程序來協調註冊流程
func (t *signupService) Signup(ctx restate.Context, newUser NewUser) (string, error) {
// Initialize the newUser first
user := User{
Name: newUser.Name,
Surname: newUser.Surname,
Password: newUser.Password,
}
_, err := restate.Object[restate.Void](ctx, "User", newUser.Username, "Initialize").Request(user)
if err != nil {
return "", err
}
// Prepare an awakeable to await the email activation
awakeable := restate.Awakeable[restate.Void](ctx)
// Send the activation email
_, err = restate.Run[restate.Void](ctx, func(ctx restate.RunContext) (restate.Void, error) {
return restate.Void{}, sendEmail(newUser.Username, awakeable.Id())
})
if err != nil {
return "", err
}
// Await the activation
_, err = awakeable.Result()
if err != nil {
return "", err
}
// Activate the user
_, err = restate.Object[restate.Void](ctx, "User", newUser.Username, "Activate").Request(user)
if err != nil {
return "", err
}
return fmt.Sprintf("The new user %s is signed up and activated", newUser.Username), nil
}
透過 restate.Call
,我們可以調用其他 Restate 服務。這些請求保證只會執行一次。
使用 restate.Awakeable
,我們可以等待任意事件發生。您可以簡單地透過向 Restate 發送 HTTP 請求並提供 Awakeable id 來完成請求。在我們的範例中,電子郵件將嵌入一個包含 Awakeable id 的連結,一旦使用者點擊驗證按鈕,該連結就會被完成。
透過 restate.Run
,我們可以執行任何任意程式碼並將結果記憶化,這樣一來,如果發生崩潰,Restate 不會重新執行該程式碼區塊,而是會載入儲存的結果並將其用於後續操作。
啟動 HTTP 服務並使用 Knative 部署它¶
使用 HTTP 公開服務
func main() {
// Read PORT env injected by Knative Serving
port := os.Getenv("PORT")
if port == "" {
port = "9080"
}
bindAddress := fmt.Sprintf(":%s", port)
// Bind services to the Restate HTTP/2 server
srv := server.NewRestate().
Bind(restate.Reflect(&userObject{})).
Bind(restate.Reflect(&signupService{}))
// Start HTTP/2 server
if err := srv.Start(context.Background(), bindAddress); err != nil {
slog.Error("application exited unexpectedly", "err", err.Error())
os.Exit(1)
}
}
您現在可以使用您的工具(例如 ko
)建立容器映像
$ ko build main.go -B
並使用 kn
部署它
$ kn service create signup \
--image $MY_IMAGE_REGISTRY/main.go \
--port h2c:8080
在發送請求之前,您需要告知 Restate 您新的服務部署
$ restate deployments register http://signup.default.svc
就是這樣!您現在可以開始發送請求了
$ curl https://#:8080/Signup/Signup --json '{"username": "slinkydeveloper", "name": "Francesco", "surname": "Guardiani", "password": "Pizza-without-pineapple"}'
請注意:為了簡潔起見,程式碼範例的某些部分已省略,請查看完整範例以取得更多詳細資訊,以及如何使用 kind
在本機執行此範例。
我們為您撐腰¶
讓我們假設 Signup
流程中的 sendEmail
函數在我們第一次嘗試註冊時失敗,會發生什麼事?
如果沒有 Restate,您需要在迴圈中重試執行 sendEmail
幾次。但是,如果當重試執行 sendEmail
時,**註冊服務崩潰或消失**呢?在這種情況下,您將失去對註冊進度的追蹤,並且下次使用者按下 F5 時,您需要一些邏輯來重建先前的註冊狀態和/或將其捨棄。
使用 Restate,如果 sendEmail
失敗,它將自動重試,而所有先前已執行的操作(在本例中是對 User/Initialize
處理程序的呼叫)將不會再次執行,而是會簡單地還原其結果值。
這要歸功於 Restate 的持久執行引擎,它會記錄應用程式的進度,並且在發生崩潰時,它會從上次中斷的地方重新啟動。更重要的是,Restate 能夠在無法取得進展時暫停執行,例如在長時間休眠的情況下,或是在等待來自另一個服務的回應時,所有這些都不會將您的業務邏輯分割成一系列不同的處理程序。沒錯,您理解正確,**在等待期間,您的 Knative 服務可以縮減到零!**
下一步是什麼¶
在這篇文章中,我們研究了如何使用 Restate 建立一個有狀態的實體和一個簡單的協調流程,並將其部署到 Knative 上。
透過將 Restate 和 Knative 結合在一起,您可以獲得兩全其美的優勢,因為您可以輕鬆地開發有狀態的應用程式來建立無伺服器應用程式。
將 Restate 和 Knative 結合在一起,您可以建立更多內容:工作流程、saga、有狀態事件處理(也結合 Knative Eventing!),僅舉幾個例子。查看 Restate 範例以了解可以建構哪些內容:https://github.com/restatedev/examples