AWS Advent Calendar2012の23日担当の片山です。
タイトルがスベってると酷評されておりますが、今回はタイトル通り、Amazon SWFにまつわる内容を書きました。
結構長いです。
はじめに
クリスマスイブを翌日に控えた12月23日、とあるサンタクロースは悩んでいました。
サンタ「し、しまった。気がついたらもう23日ではないか!!」
このサンタクロース、実は今年のITインフラ担当に任命されていて、全世界の子供のプレゼント希望を集め、配送までを手配する基盤を構築する必要があったのです。しかし今年のプレゼントの研究用に買ったKindle、うっかり漫画を大人買いしてしまい、読みふけっているうちに今日を迎えてしまったのです。
完全にクズですね。
サンタ「ぐぬぬ、仕方がない、昨年まで使っていたシステムを使って・・・」
XXX「いやー、そらあきませんわ。去年の年末でシステムのリース期間切れてますやん。」
この声の主は、トナカイさんです。元々IT系ベンチャーのCIOで、今は引退して悠々自適にプレゼント配りをしています。大阪府出身高砂部屋。
トナカイ「サンタクロースはん、まだシステムできてまへんの?もう明日イブでっせ。」
サンタ「。。。」
トナカイ「まあでも、AWS使ったらなんとかなるんとちゃいますか?」
サンタ「まじでか?」
トナカイ「教えてもええけど、高こうつきまっせ。グヘヘ。」
サンタ「ぐぬぬ。。」
その後トナカイの助言により、インフラとしてはAWS、そしてワークフロー作成のためにSWFが採用されることになりました。
かくしてプレゼント配送基盤のシステム構築が始まるのでした。
最初に返って来た結果を使用する
サンタ「プレゼントがレアものだった場合、何人かで手分けして探すだろ?そのうちで誰かが最初に見つけたらワークフローを続けるようにしたいんだけど、どうしたらいいの?」
トナカイ「その場合はOrPromiseやな」
OrPromiseは、複数のPromiseをラップした形のPromiseです。Promiseは値と状態(ready,unready)を持っており、SWFの非同期メソッドの引数として使用すると、Promiseがready状態になるまでメソッド呼び出しが行なわれない、という特性をもちます。
SWFはこの性質を使って非同期処理を実現していますが、OrPromiseはそれをさらにラップして、ラップしたうちの1つのPromiseがreadyになると、自分をreadyステータスに変更させます。
結果として、OrPromiseを引数に取るメソッドは、1つのPromiseがreadyになった時に呼び出されます。
以下のように使用します。
public Promise<String> findItem(String itemName) { Promise<String> branch1Result = findByPersonA(itemName); Promise<String> branch2Result = findByPersonB(itemName); OrPromise branchResult = new OrPromise(branch1Result, branch2Result); return processResults(branchResult); } @Asynchronous Promise<String> processResults(OrPromise branchResult) { Promise<?>[] resultArray = branchResult.getValues(); Promise<List<String>> output = null; for(Promise<?> result:resultArray){ if(result.isReady()==true){ output = (Promise<List<String>>)result; break; } } return output; }
処理フローは以下のような形になります。
トナカイ「processResultで受け取るOrPromiseの結果列を調べて、isReady()がtrueのPromiseを探すのがポイントやな。」
サンタ「なるほど。ちなみに終わってない処理をキャンセルしたい場合は?」
トナカイ「findByPerson内の処理をTryCatchで実装しておくと、cancelシグナルを送ることが出来るから、proseccResultsで処理が終わってないものにcancelを送ったらええよ。」
例外処理内で実行した結果を元に処理を続ける
サンタ「中国にいるサンタ、たまに通信エラーで連絡が取れなくなるじゃない?ああいう場合に日本のサンタ派遣するんだけど、そういう処理はどう書くの?」
トナカイ「さらっと怖いこと言うなー。その場合はTryCatchとSettableを使うと便利やな。」
TryCatchは、非同期コードで例外処理を行なうためのクラスです。AmazonSWFの処理は複数のシステムやプロセスをまたいで実行されるため、通常のJavaで使用されるtry-catch構文では、例外を捕捉できません。
このクラスにはdoTry()やdoCatch()というメソッドが定義されていて、doTryメソッド内で非同期処理を呼び出した際に発生する例外を、doCatchで捕捉することが出来ます。
TryCatchFinallyやTryFinallyクラスも用意されており、名前の通りの実装を行なう事ができます。
SettableはPromiseクラスの派生クラスで、機能はPromiseと同じです。ただしSettableは手動で値をセットして、状態をunreadyからreadyに変更することができます。つまりSettableの状態変化をトリガーにして、手動でワークフローを制御する事が出来ます。Promiseはフレームワーク、Settableはユーザーコードが利用するための入れ物ということになります。
TryCatchは非同期コードで実行されるため、TryCatchの実行終了後にSettableを使って呼び出し元のロジックを動かしてあげることで、ワークフロー内でうまくTryCatchを使うことができます 。
@Asynchronous public Promise<String> getSantaClaus() { //Settable型をインスタンス化。 final Settable<String> result = new Settable<String>(); TryCatch branch = new TryCatch() { @Override protected void doTry() throws Throwable { Promise<String> santaId = client.getSantaInChina(); setResult(result,santaId); } @Override protected void doCatch(Throwable e) throws Throwable { result.set(JAPANESE_SANATA_ID); } }; return result; } @Asynchronous public void setResult(@NoWait Settable<String>result, Promise<String> value){ result.set(value.get()); }
上記を図にすると、このようになります。
わかりにくいですが、PromiseやSettableを使用する事でシグナルを送って処理を繋げているのが分かるかと思います。
サンタ「なかなか難しいな。」
トナカイ「非同期で実行されるというのがミソやな。例えばclient.getSantaInChinaの結果をそのままresultに入れてしまいそうになるけど、getSantaInChina自体も別のプロセスやシステムで非同期に実行されるから、結果がreadyになるまで待ってやらんとあかんねん。」
サンタ「ところで日本のサンタ固定でもいいかな?」
トナカイ「北海道にSWFに詳しいサンタがおるから、ええんとちゃう?」
人手によるアクティビティの完了
サンタ「だいぶ出来てきたけど、最後に人手が入る処理が必要になったんだが。プレゼントを梱包し終わったら、ワークフローを進めるようにしたいんだけど。」
トナカイ「マニュアル実行のパターンやな。この場合は@ManualActivityCompletionを使うんや。」
@ManualActivityCompletionは、人手が介入するような処理を行なう時に使用するアノテーションで、非同期メソッドに付与します。
このアノテーションが付与されたメソッドでは、「タスクトークン」と呼ばれるIDを取得する事が出来ます。このIDを使用すると、何か人力処理を行なった後にSWFに対してこのIDを投げることで、ワークフローを再開することが出来るようになります。
プレゼントを用意した後に、正しく準備が出来たかどうかを返すようなメソッドの場合、以下のように実装を行ないます。
@ManualActivityCompletion public Boolean preparePresent() { ActivityExecutionContext executionContext = contextProvider.getActivityExecutionContext(); String taskToken = executionContext.getTaskToken(); //プレゼント準備を指示するメールを送信 sendNotifyEmail(taskToken); //戻り値はダミー return null; }
上記処理が実行されると、作業者にメールが送付され、SWFのワークフローは一旦待機状態になります。作業者に送られたメールは、例えば以下のような内容になります。
Yamanさん プレゼントの準備をお願いします。 品目:任天堂 Wii U 包装紙:赤 リボン:青 準備が出来たら、このリンクをクリックして下さい。 http://christmas.elasticbeanstalk.com/confirm.do?result=OK&taskToken=AAAAKgAAAAEAAAAAAAAAAfw1n....(344文字の文字列) 品物不足などで準備出来なかった場合は、このリンクをクリックして下さい。 http://christmas.elasticbeanstalk.com/confirm.do?result=NG&taskToken=AAAAKgAAAAEAAAAAAAAAAfw1n....(344文字の文字列)
作業終了後に、作業者はいずれかのリンクをクリックします。リンクには2つのパラメーターがついており、1つが作業結果を表すresultパラメータ、もう一つがタスクトークンです。
ワークフローとは別に、この作業ステータスを受け取るプロセスを起動させておき、リンクをクリックしたときにワークフローが継続するよう、SWFにタスクトークンを投げます。例えばBeanstalk上にServletを待機させていた場合、以下のようなコードとなります。
public class ConfirmServlet extends HttpServlet { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String token = req.getParameter("taskToken"); String result = req.getParameter("result"); AWSCredentials credentials = new PropertiesCredentials( ConfirmServlet.class.getResourceAsStream("/AwsCredentials.properties")); //SWFクライアントを作成 AmazonSimpleWorkflow swfClient = new AmazonSimpleWorkflowClient(credentials); swfClient.setEndpoint("http://swf.us-east-1.amazonaws.com"); //手動操作の完了を通知するためのクラス ManualActivityCompletionClientFactoryImpl factory = new ManualActivityCompletionClientFactoryImpl(swfClient); ManualActivityCompletionClient manualActivityCompletionClient = factory.getClient(token); if ("OK".equals(result)) { manualActivityCompletionClient.complete(Boolean.TRUE); } else { manualActivityCompletionClient.complete(Boolean.FALSE); } resp.setContentType("text/html"); resp.getWriter().write("<html><body>お疲れ様でした。</body></html>"); } }
上記処理を絵にすると、以下のようになります。
サンタ「タスクトークンを取っておいて、それを使ってManualActivityCompletionClientへ通知するのがキモなんだな。」
トナカイ「今回はServletやけど、普通のプログラムでも何でもええで。あと、manualActivityCompletionClient.complete()に対して値を渡してるけど、この値は@ManualActivityCompletionをつけたメソッド、つまりpreparePresentメソッドの戻り値と合わせる必要があるんや。逆に言えば、型さえあってればStringでもデータ型でも構わんから、手動処理時に付与した番号やIDなんかを、元のワークフローに簡単に返せるんや。」
サンタ「ところで非同期処理の間でJavaのオブジェクトを普通にやりとりしてるけど、データサイズに上限はないの?」
トナカイ「パラメータはJson形式にシリアライズされてやりとりされるんやけど、このJSONは32768文字が上限や。シリアライズ後でこのサイズやから、あんまり大きいデータは渡されへんな。画像とかはS3において、パスだけやりとりした方がええよ。」
そして完成
サンタ「なんとか出来上がったようだ。あとは靴下からのHTTPS通信を受け取って、ワークフローを開始する所を作ればOKだ。。」
トナカイ「これもBeanstalkに載せるべきやな。ちなみにワークフロー自体をBeanstalkに載せる手もあるで。」
サンタ「??」
トナカイ「Beanstalkは監視もついとるし、オートスケールも使える。アプリも自動で全部のサーバにデプロイしてくれるしな。デサイダーや複数のアクティビティを動かして、フィルターチェーンさせれば、ELBからのヘルスチェックで、ワークフローが生きてるかどうかもチェックできるで。」
public class WorkflowWorkerFilter implements Filter { protected WorkflowWorker workflowWorker; public void init(FilterConfig config) throws ServletException { try { AmazonSimpleWorkflow swfClient = AmazonSWFUtils.createSWFClient(); String domain = ConfigHelper.getInstance().getDomain(); workflowWorker = new WorkflowWorker(swfClient, domain, Constants.DECIDER_LIST); workflowWorker.addWorkflowImplementationType(DeciderImpl.class); workflowWorker.start(); } catch (Exception e) { throw new ServletException(e); } } public void doFilter(ServletRequest req, ServletResponse res, FilterChain filterChain) throws IOException, ServletException { if (workflowWorker.isRunning() == false) { throw new ServletException("WorkflowWorker is not running."); } //フィルターチェーンでDeepHealthCheck filterChain.doFilter(req, res); } public void destroy() { workflowWorker.shutdown(); } }
サンタ「ともかく出来上がった。あとは全世界にデプロイだ!」
トナカイ「SWFで作っとけば、処理が足らなくてもインスタンス追加するだけで処理能力あげられるからな。まあボトルネックは、高齢化の進むサンタクロースやと思うけどな。」
サンタ「。。。」
ということで、トナカイさんのお陰で無事にシステムは出来上がりました。明日は皆さん、靴下のご準備をお忘れなく。
最後に
今回ご紹介した処理は、AWSが公開しているSWFのレシピ集でも紹介されています。
こちらは英語で書かれているため、クリスマスプレセント代わりに日本語訳を作って提供しよう!と思っていたのですが、このレシピを読むために入れたKindleアプリ、うっかり漫画を大人買いしてしまい、読みふけっているうちに今日を迎えてしまったのです。
完全にクズですね。
とはいえ少しは翻訳しましたので、興味のある方はご利用下さい。レシピの簡易的な説明は翻訳してあるので、英語版読むときも多少は参考になるかと思います。
http://adsj-demo.s3.amazonaws.com/swf-recipes/swf-recipes-ja.pdf
こちらは年末年始で一通り翻訳したいと思います。
さてAWSのAdvent Calenter、明日はサーバーワークスのザビオさんが担当です。クリスマスイブにお届けされる超ロマンチックでおもろいブログにご期待下さい!!
ということで、メリーSWFマス!