こんにちは id:dhigashi です。
以前、 Fn Project を簡単に触ってみましたので、今回は Fn Flow を触ってみたいと思います。
Fn Project の記事はこちらからご覧ください。
Fn Flow について
Fn Flow のREADME には Fn Flow について以下のように紹介されており、Fn 単体では不足していた実際のアプリケーション開発で必要になる機能をサポートしています。
Flow empowers you to build workflows as distributed programs that are as complex as you need them to be and supports a rich set of concurrency primitives including fork-join, chaining, delays and error handling.
今回は Fn Flow を用いた並列処理 (fork-join)、連鎖 (chaining)、遅延処理 (delays) を実装してみたいと思います。
Word Count with Fn Flow
今回作成する Fn Flow アプリケーションは指定された複数のテキストファイルの中の各単語の出現頻度を数え上げる、なんちゃっての MapReduce っぽいアプリケーションです。
入力にテキストファイルのURLと任意で単語を指定すると、その単語の出現頻度を出力します。
単語を指定しない場合はすべての単語の出現頻度を出力します。
例として、ハムレットを幕毎に分割し Object Storage に配置しダウンロード URL を指定し、単語に「hamlet」「love」に指定してみます。
"hamlet"は94回、"love"は62回出現しているようです。(単語の Split 処理が雑な為、正しい数になっていない気がします)
# Request { "urls": [ "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT2.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT3.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT4.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT5.txt" ], "words": [ "hamlet", "love" ] } # Result {"love":62,"hamlet":94}
アプリケーションのソースコードはこちらから参照して下さい。
事前準備
Fn サーバがインストール済みで関数のデプロイ等が行える事を前提とします。
準備がまだの場合は冒頭で紹介した記事をご覧ください。
本記事執筆時の環境は以下の通りです。
$ fn version Client version is latest version: 0.4.153 Server version: 0.3.545
尚、すべての作業は同じサーバインスタンス上で行うものとします。
Flow サーバの起動
Fn サーバが起動できたら Flow サーバを起動します。
Flow サーバは Fn サーバを呼び出せる必要があるため IP アドレスを取得し、Flow サーバの起動時に環境変数で与えます
$ FNSERVER_IP=$(docker inspect --type container -f '{{.NetworkSettings.IPAddress}}' fnserver) $ docker run --rm -d \ -p 8081:8081 \ -e API_URL="http://$FNSERVER_IP:8080/r" \ -e no_proxy=$FNSERVER_IP \ --name flowserver \ fnproject/flow:latest
Flow サーバが起動している事が確認できます。
$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a9947d53982d fnproject/flow:latest "/fnproject/flow-ser…" About a minute ago Up About a minute 0.0.0.0:8081->8081/tcp flowserver f7ff0cd11b34 fnproject/fnserver "./fnserver" 6 minutes ago Up 6 minutes 2375/tcp, 0.0.0.0:8080->8080/tcp fnserver
Fn Flow アプリケーションの準備
まずデプロイするアプリケーションを取得しておきます。
$ git clone https://github.com/cloudii-oc/examples.git $ cd examples/fnproject/flow-word-count $ ls app.yaml counter mapper
このアプリケーションには counter
と mapper
の2つの関数が含まれています。
mapper
は独立した関数で、指定された URL からテキストファイルを読み込み各単語の出現頻度を数え上げます。
counter
がメインの関数で、リクエストを分割し mapper
関数を呼び出すなどの処理を行います。
mapper
関数が返した出現頻度のマージ処理、単語の絞り込みなども担当します。
Java での Fn Flow アプリケーションについて
Flow アプリケーションの開発の導入は チュートリアル と FnFlowsUserGuide を参考にしました。
Fn Flow アプリケーション開発に用いる Flow API は CompletableFuture API と共通の API が多くあり、CompletableFuture API を用いた並行プログラミングの経験者にはとっつき易くなっています。
以下では、どのように Fn Flow アプリケーションを開発しているか counter
関数を例に簡単にご紹介します。
FlowFuture を作成する
Flow#invokeFunction
で関数を呼び出し FlowFuture を作成します。
FlowFuture<Message.MapperResponse> future = flow.invokeFunction("./mapper", mapperRequest, Message.MapperResponse.class);
処理を並列で実行する
複数の関数を呼び出し、Flow#allOf
で全ての FlowFuture が完了したときに新しい FlowFuture を作成します。
List<FlowFuture<Message.MapperResponse>> futures = /* 複数関数の呼び出し */ flow.allOf(futures.toArray(new FlowFuture[0])) .thenApply(...
処理を連鎖して実行する
FlowFuture が完了したとき、その結果を Flow#thenApply
に与えた関数の引数に与え実行し、その結果を新たな FlowFuture として返します。
FlowFuture<List<Message.MapperResponse>> futureResults = flow.allOf(futures.toArray(new FlowFuture[0])) .thenApply(VOID -> futures.stream() .map(FlowFuture::get) .collect(Collectors.toList()));
処理の結果を取得する
Flow#get
で FlowFuture が完了するまで待ち、その結果を取得します。
FlowFuture<Map<String, Integer>> futureCount = /* 出現頻度を取得する */
futureCount.get();
Fn Flow アプリケーションのデプロイ
それでは、取得した Fn Flow アプリケーションを Fn サーバにデプロイします。
この時 fn deploy
コマンドの --all
オプションで全ての関数をデプロイします。
$ fn deploy --local --all Deploying counter to app: flow-word-count at path: /counter Bumped to version 0.0.4 Building image counter:0.0.4 ................................. Updating route /counter using image counter:0.0.4... Deploying mapper to app: flow-word-count at path: /mapper Bumped to version 0.0.4 Building image mapper:0.0.4 ..................... Updating route /mapper using image mapper:0.0.4...
デプロイ後ルーティングを確認すると /counter
と /mapper
の2つが存在する事が確認できます。
$ fn list routes flow-word-count PATH IMAGE ENDPOINT /counter counter:0.0.4 localhost:8080/r/flow-word-count/counter /mapper mapper:0.0.4 localhost:8080/r/flow-word-count/mapper
正しくデプロイできるか確かめる為、 mapper
関数を実行してみます。
$ echo -n '{"url":"https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"}' | fn call flow-word-count /mapper {"count":{"(within)":1,"spoke":1,"hamlet":23,"your":49,"without":3,"youth":5,"lion's":1,"these":13,"would":14,"'that":1,"sovereignty":1,"prison":1 (略)
正常にデプロイができている事が確認できます。
同様に counter
関数を実行してみます。
$ echo -n '{"urls":["https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"]}' | fn call flow-word-count /counter com.fnproject.fn.runtime.exception.PlatformCommunicationException: Failed to create flow at com.fnproject.fn.runtime.flow.RemoteFlowApiClient.createFlow(RemoteFlowApiClient.java:51) at com.fnproject.fn.runtime.flow.FlowContinuationInvoker$2.currentFlow(FlowContinuationInvoker.java:172) at com.fnproject.fn.api.flow.Flows.currentFlow(Flows.java:41) (略)
Flow が作成できないとエラーとなってしまいました。
FLow の設定
このままでは Flow アプリケーション (counter
関数) を実行できないので設定を行います。
Fn サーバが Flow サーバと通信できるようにするため、Flow サーバの IP アドレスを取得し設定を行います。
$ FLOWSERVER_IP=$(docker inspect --type container -f '{{.NetworkSettings.IPAddress}}' flowserver) $ fn config app flow101 COMPLETER_BASE_URL "http://$FLOWSERVER_IP:8081"
以上でアプリケーションを実行できるようになりました。
改めて counter
関数を実行してみます。
$ echo -n '{"urls":["https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"]}' | fn call flow-word-count /counter {"(within)":1,"spoke":1,"hamlet":23,"your":49,"without":3,"youth":5,"lion's":1,"these":13,"would":14,"'that":1,"sovereignty":1,"prison":1,"sister":4,"whatsoever":1,"know't":1,"blasts":1,"everlasting":1,"audience":1,(略)
実行できる事が確認できました。
冒頭で例として載せた、ハムレットに「hamlet」「love」という単語がいくつ含まれるのかを調べてみます。
$ cat hamlet.json { "urls": [ "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT2.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT3.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT4.txt", "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT5.txt" ], "words": [ "hamlet", "love" ] } $ fn call flow-word-count /counter < hamlet.json {"love":62,"hamlet":94}
まとめ
Fn Flow を用いて外部の関数を非同期に呼び出す、並行処理を実装する、など単純な Fn 関数より複雑なアプリケーションを作成してみました。
今回はエラーハンドリングなど検証不足の部分も多いですが、興味を持たれた方はぜひ触ってみて下さい。