[Storm] ストリーム処理エンジンStorm[Storm] How to start “Storm”

奈良で迎える初めての冬が想像以上に寒くて、原付での移動に辛さを感じ始めました.学生の方の松浦です.

最近はStormというストリーム処理エンジンを触っています.HadoopやS4などの既存CEPエンジンがフォーカスしていなかった領域をカバーするものとして注目されているものです.
StormはGithub上でプロジェクトが公開されており、ライセンスの範囲であれば誰でも利用することが可能です.

本記事ではStormを始めてみるにあたっての手順を簡単にご紹介したいと思います.

そもそもStormって?

StormはTwitter社のNathan Marz氏が開発したJVM上の分散処理フレームワークです.Stormの大きな特徴として

  • 高速なストリームデータ処理
  • 耐障害性

の2点がよく挙げられます(このような機能を実現している手法については、長くなるので参考URLの2件目などを参照していただければと思いま す).twitterを利用されている方には馴染み深い「トレンド」という同サービスの機能ですが、そのトレンドを求めるためにStormが利用されてい ます.トレンドとは、絶えずpostされるtweetのストリームデータから特定のアルゴリズムを用いて人気のワードを抽出するものですが、このような サービスでStormが非常に強力な威力を発揮します.

Stormは「Spout」と「Bolt」と呼ばれる2種類のノードから構成されます.このSpoutとBoltの構成をTopologyと呼びま す.システムの大まかな動作の流れは「Spoutが外部から受け付けたデータをTopologyに従ってBoltに流し、処理ノードであるBoltがデー タを処理していく」といったものとなります.
また、Stormの実行形態には、SpoutとBoltをマルチスレッド化して単一ノードで動作させるLocal modeと、実際のクラスタ上にSpoutとBoltをそれぞれ実装するRemote modeの2種類があります.前者はクラスタへの実装前に、構築したTopologyや各ノードの処理が正常に動作しているかといったことを確認するため などに利用されます.

本記事では同氏が公開しているサンプルプロジェクト(storm-starter)を実行可能なLocal modeの環境を構築し、実際にサンプルプログラムを実行するまでの流れを紹介していきます.
(本記事でご紹介する内容はi686機上にCentOS6.3を最小構成でインストールした場合の動作を前提としているため、各自の環境に合わせて必要に応じて適宜読み替えていただければ幸いです.)

事前準備

storm-starter導入にあたって、必要なパッケージをyumで取得する必要があります.幾つかのパッケージはepelリポジトリに収録されているため、epelの導入も行います.
このセクションのコマンドはrootユーザで実行してください.

storm-starterの導入

さて、それでは実際に手元の環境でstorm-starterを動かしてみましょう.
といっても、storm-starterを動かすだけなら数手順で終わります.このstorm-starterというプロジェクトの中には、Local modeとして動作するために必要な全ライブラリが収録されているためzip形式で配布されているStorm本体のパッケージをインストールする必要すら ありません.公開されているプロジェクトをgit cloneするだけです.
storm-starterを実行するにあたり、まずはClojureプロジェクトのビルドツールであるLeiningenを導入します.storm-starterではLeiningenを使って依存ライブラリの解決などを行います.
今回は/usr/local/binにleinコマンドをインストールすることにします.インストール後には実行権限も忘れずに付与しておきましょう.

/usr/local/binへのPATHが環境変数に設定されていない場合は/etc/profileや~/.profileなどに追加して設定を反映させます.

それでは、storm-starterをインストールします.ユーザのホームディレクトリ直下にstorm_projectというディレクトリを作成し、このディレクトリ以下のプロジェクトはgitで管理するものとします.

次にstorm-starterプロジェクトのトップディレクトリに移動し、先程インストールしたLeiningenを用いて依存ライブラリの解決とプロジェクトのコンパイルを行います.

これでstorm-starterの導入は完了です.非常に手軽であることを実感していただけたかと思います.

storm-starterの実行

では早速サンプルプログラムを実行してみましょう.プロジェクトのトップディレクトリに移動して実行するのですが、そのままでは処理のログが全て標 準出力に吐かれてしまうので、ここでは一時的にログを保存するため~/stom_project/log/storm_starterというディレクトリ を作成して、そこにログを保存してみることにします.

これでサンプルの実行も完了です.lessなどでログを眺めてみましょう.
ExclamationTopologyは入力された文字列に対して”!”を付加して出力するプログラム、word-countは入力された文字列を半角 スペースで分割して単語の出現回数を計上するプログラムです.(ちなみに余談ですが、プロジェクトの中身を見るとわかるように、後者のサンプルの文字列分 割処理部はPythonで書かれたものを呼び出しています)

ソースの解説

最後にExclamationTopologyを例にとって、簡単なソースコードの解説を行います(繰り返しになりますがstorm-starterはLocal modeでの動作となります).
適当なテキストエディタでstorm-starterのExclamationTpology.javaを開いてみましょう.
それでは早速mainの中身を眺めていきます.

これは新たなTopologyを生成するための一文です.ここで作成したTopologyをStormに登録することで処理を実現します.

ここでは先に作成したTopologyを実際にどのような構成にするかについて記述しています.setSpout()とsetBolt()でそれぞれの ノードを作成します.第一引数で処理系の識別子、第二引数で処理の内容、第三引数で起動するスレッドの数をそれぞれ指定しています.第二引数で指定した処 理内容についてはmainの上のExclamationTopologyクラスで指定されているため、ここの内容を変更することで異なる処理結果を得るこ とができます.
では少し飛ばして、if分岐が真の場合の部分を眺めてみます.

上の一文で幾つのWorker(=プロセス)を起動するかを指定しています.先のsetSpoutとsetBoltの部分を見ると、合 計15スレッドが起動することがわかります.それに対してWorker数が3であるため1Worker当たり5つのスレッドを持つことになります.このと きWorkerへのスレッドの振り方はStormが自動的に決定します.
最後にsubmitTopology()で作成したTopologyを登録して処理を行います.
簡単ではありますが、大まかな流れについては以上です.

実際にサンプルソースを修正して何か作る場合はプロジェクトをEclipseに取り込むこともできますし、開発に当たってはMavenでのプロジェクト管理が推奨されています.
また、今回はLocal modeのみの紹介となりましたが、Remote modeでの利用がStorm本来の姿であり、そこで耐障害性などが実現されます.
もし機会があれば、今後そういった情報についても書いていくかもしれません.
今回の記事は以上です.

参考

Storm本家(Github)
https://github.com/nathanmarz/storm
Stormの解説記事
http://www.ibm.com/developerworks/jp/opensource/library/os-twitterstorm/?cmp=dw&cpb=dwope&ct=dwrss&cr=dwrss&ccy=jp&csr=120712

MATSUURA Masanao