サーバレスなバッチを管理するためのKudaを作成しました

概要

こちらの記事でも書いたのですが、小さなバッチを運用するにはCloud Runのようなコンテナベースのサーバレス環境はとても向いています。

バッチサーバとしてインスタンスを常駐しておく必要が無いのでコスト面でも有利ですし、横にスケールさせやすいと言うのもあります。

ただ、バッチとして考えた場合にはジョブ管理ツールというかワークフローエンジンが無い。1ジョブで完結するものは良いですが、典型的なジョブは以下のように後続処理を持っています。

f:id:pascal256:20190708234925p:plain
Job Flow

後続処理が並列で動いて集約の処理がそれを待つとか典型的な実装ですよね?

ただ、これをマイクロサービスでやるのはちょっと面倒。おそらくMSA的に正しく実装しようとすると全てをイベントドリブンで記述して「JOBaの後にJOBbが動く」と言ったことを連鎖的に書くのだと思います。

これは大規模な処理で並列度を高めるには良いと思うのですが、既存のマイグレーションや小規模な運用では少し面倒です。

なので、シンプルにそれを実現するためのツールとして「Kuda」を作りました。

github.com

Kudaを使うことで各マイクロサービスは後続のことを気にする事無く記述でき、ジョブのワークフローを外出しできます。

使い方

現状、アルファ版でとりあえず作ったレベルでバギー&GCPのみ対応ですが、一応動かす事は可能です。

まず、下記のようなjobs.yamlというYAMLファイルでワークフローを記述します。

name: exec-batch
tasks:
  - name: step-load
    url: https://xxx-app1.run.app
  - name: step-parallel1
    dependencies: [step-load]
    url: https://xxx-app2.run.app
  - name: step-parallel2
    dependencies: [step-load]
    url: https://xxx-app3.run.app
  - name: step-join
    dependencies: [step-parallel1, step-parallel2]
    url: https://xxx-app4.run.app

tasks配下に対象となるジョブの一覧を書いていきます。 nameはそのまま名前で、urlがジョブとして実行したいマイクロサービスのURLとなります。dependenciesには依存(先行ジョブ)を書きます。

同じ依存を書けばそれらは並列で実行されますし、カンマ区切りで複数個書けば全てのジョブの完了を待ってから処理に入ります。

このファイルをGCS上に置いてkuda自体もCloud Runなどにデプロイすれば準備完了です。

ジョブのステータスを確認

JOBの実行状態を以下のようにURLを叩く事で確認できます。「READY -> RUNNING -> DONE」という状態遷移をします。サンプルは未稼働状態なので全部READY。

$ curl http://localhost:8080/jobq 
[(step-load, READY), (step-parallel1, READY), (step-parallel2, READY), (step-join, READY)]

実行

以下のようにexecで実行を開始します。

$ curl http://localhost:8080/exec 
done

このURLは非同期なのでジョブの完了を待つ事なくすぐにレスポンスを返します。 ジョブの進捗を見る場合は先ほどのjobqを実行するか、kuda側の標準出力を見る事で確認できます。 標準出力の結果は下記の通り。

07:09:29 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-1) START:JOB step-btc-collector
07:09:35 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-1) END:JOB step-btc-collector
07:09:35 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-3) START:JOB step-btc-predictor-dtree
07:09:35 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-5) START:JOB step-btc-predictor-knn
07:09:39 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-5) END:JOB step-btc-predictor-knn
07:09:39 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-3) END:JOB step-btc-predictor-dtree
07:09:39 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-1) START:JOB step-btc-scoring
07:09:45 INFO  [cn.or.pa.ku.WorkflowManager]] (ForkJoinPool.commonPool-worker-1) END:JOB step-btc-scoring
07:09:45 INFO  [cn.or.pa.ku.rs.WorkflowResource]] (executor-thread-3) END:WORKFLOW
07:09:45 INFO  [cn.or.pa.ku.rs.WorkflowResource]] (executor-thread-3) CLEAR:JOBQ

仕組み

基本的な挙動としては

  1. execが実行されるとjobs.yamlを解析してDAGを作成
  2. DAGを元に実行状態を表すキューを作ってファイルに保存
  3. キューを読み込んで依存ジョブが全てDONEになってるREADY のジョブがあれば全て実行
  4. 実行時にジョブのステータスをRUNNNINGに変更してファイルに保存
  5. 実行時に実行完了時に自分自身(kuda)を呼ぶようにするために非同期処理で対象ジョブをキック
  6. 全てのジョブがDONEになるまで3-5を繰り返し

となります。

ポイントとしては一つ一つのリクエストは非同期処理で終わらせてるのでjobs.yamlに書いたジョブ全てが終わるまでkudaのリクエストが継続する訳ではない、ということです。

なので、小粒なジョブをたくさん書いてトータル実行時間が伸びたから親のリクエストであるKudaがタイムアウト、とかにはなら無いはず。まだ、ちゃんとテストして無いから肝心の部分がバグってるかもですがw

Kuda自体もCloud Runで動くサーバレス環境で状態はGCSなど外部に保存する想定なので並列度が上がれば勝手にスケールするはず。ただ、並列度が上がれば誤って同時実行しそうな気がするので排他ロックをDB使うなりで実現が必要な気はしてます。

今後の想定

とりあえず今後の想定としては以下の通り

  • ジョブの実行状態とかが見えるWeb UIの作成
  • Cloud Scheduler とかと連携してスケジューラーもWeb UIに統合
  • quarkusで作ってあるのでnative-buildして高速化 # 現状実行時エラーになるのでjavaで稼働
  • 排他処理周りの検討
  • テストとリファクタリングをいっぱい

まとめ

hwrapと合わせて使うことで、Cloud Runなどを利用して簡単にサバーレスバッチを作るためのツールを作りました。

巨大なバッチならSparkとか使えばスケールアウト構成に出来るのですが、そうじゃない場合はk8sとかを使って上手くできないかなー、と思ってたのでようやく開発着手、というところですね。

Dockerというかk8sをベースにバッチ管理だとargoがありますが、あれよりも機能を減らしてでも手軽に運用出来るものに出来たら良いな、と思っています。HTTP対応してれば良いのでk8s/Cloud Runに本来的には縛られないし。

ちなみに名前の由来は「パイプライン」からの連想で「管」と、なんとなくリクエストを連鎖的に発生させて動かすイメージから「管狐」を連想したのが理由です。

それでは、Happy Hacking!