甲府方重信Blog

...Shigenobu Koufugatas Blog

  • Increase font size
  • Default font size
  • Decrease font size
Error
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
  • Unable to load Cache Storage: database
Home 業務日誌 ビッグデータ ハッシュタグのリアルタイム分析のためのラムダ・アーキテクチャー

ハッシュタグのリアルタイム分析のためのラムダ・アーキテクチャー

E-mail Print PDF

Trident、Hadoop、Splout SQLを用いたハッシュタグのリアリタイム分析のための「ラムダ・アーキテクチャー」の事例

この記事で私たちは、TridentHadoopSplout SQLを連携させて、簡単な「ラムダ・アーキテクチャー」の例をどのように構築したかを示すつもりです。 私たちはStormの上位における高レベルAPIであるTrident、Hadoopに対する高速リードオンリーSQLであるSplout SQLについて学んでいきます。この事例のアーキテクチャーは、このgithubプロジェクトでホストされています。私たちはツイートにおけるハッシュタグの出現数を、日付によってカウントする作業をシミュレートします。完全なゴールは、この単純な問題を完全にスケーラブルな方法で解き、問い合わせに対するリモートの低レイテンシー・サービスを提供することにより、ハッシュタグのカウントに進化をもたらすことです。この中には、二つのシステムの連結とそれに対するリアルタイム集計が含まれます。

そこで、すべてのハッシュタグに対して、私たちは次のようなデータ構造によって、一日あたりの出現数についてリモートのサービスに問い合わせることができるようにしたいのです。:

{
  "20091022":115,
  "20091023":115,
  "20091024":158,
  "20091025":19
}

ラムダ・アーキテクチャー

この「ラムダ・アーキテクチャー」は、Nathan Marz氏によって開発されたコンセプトです。私たちはこのアーキテクチャーが、スケールし、かつまた、長時間のバッチ処理の優位性と、数秒単位で更新されるデータを持つリアルタイム・システムの新鮮さの双方を兼ね備えているので、このアーキテクチャー・モデルが好きなのです。私たちはインターネット上にこのような事例を見つけることができなかったので、これを開発するのが役立つと考え、共有するのです。

Trident

TridentStormの上位に存在するAPIです。私たちはすでに以前の記事において、Stormについて説明し、簡単なサンプルを示しました。 TridentはStormの上位に、HadoopにCascadingがもたらしたのと似たような高レベルの構造を提供します (each()やgroupBy()などです)。TridentはまたTopologyにおいて、Stateの保存、取出しに対するラッパーとプリミティブを提供します。これらは、メモリにおかれたり、永続的なデータストアにおかれたりします。

Splout SQL

Splout SQLは私たちが開発したデータベースであり、Hadoopからデータを効率よく取り出すことができます。読者はSQLが使えるElephantDBとして、これを見なすことが出来ます。これは、パーティション化され、リードオンリーであり、Hadoopスケールのデータセットに対して高性能なSQLデータベースとなります。私たちは以前の記事において、 これについて説明しています。

全てのピースを糊付けして、ラムダを形作る

ツイートは、システム(1)へとフィードされます。例えば、キューを通じて私たちはツイートを取り出すことができます。(StormはKafkaJMSKestrelに対するコネクターを持っています。 しかし、読者は自身のものを開発するのもそう難しくありません)。Tridentのstream (2)は、Hadoopに保存します(HDFS)。日次によってカウントをメモリー上のStateに対し、リアルタイムで処理します。Hadoop(3)においては、私たちはすべての過去のデータを保持します。これにより、バッチ処理が、日付ごとのハッシュタグによるツイートを収集することが出来ます。そして、これから巨大なファイルを生成することが出来ます。これにより、私たちはSplout SQLのコマンドラインもしくは、ファイルのインデクシングのために、APIツールを使用することができます。そして、Splout SQLクラスター(4)に配置することができます。このクラスターは、非常に高速にすべての統計情報を提供することが出来るでしょう。次に、二番目のTrident Stream (DRPC)がタイムライン問い合わせ(5)を提供するために使用することが出来ます。そして、このStreamはバッチ・レイヤー(Splout SQLを通じて)と、リアルタイム・レイヤー(StreamのメモリーStateを通じて)の双方を問い合わせます。そして、結果を単一のタイムライン・レスポンスにミックスします。私たちはさらに詳しく、各項目について見ていきます。

バッチ・レイヤー

バッチ・レイヤーは、もっとも単純なものです。私たちは、すべてのツイートをHDFSに追記する必要があり、定期的に、日時によってそれらを収集するいくつかの単純な処理を実行させます。私たちはこのパートを開発はしておらず、Infochimpのより大きなものから、小さなサンプルデータセットを使用します。 このデータセットは次のような形をしており、このリンクから辿ることが出来ます。:

hashtag	2009081313	1	calirfonia
hashtag	2009101713	1	caliroadtrip
hashtag	2009101815	2	caliroadtrip
hashtag	2009080813	1	caliroots
hashtag	2009092807	1	caliroots

このような単純なタスクをどのように実行するか、についてはインターネット上にたくさんの作品や事例があります(低レベルなものから、高レベルなものまで)。: PangoolCascadingPigHiveなどです。このアイディアは、バッチ・レイヤーの出力は上記のようになるべきであるというものです。: タブ区切りのテキストファイルに、ハッシュタグとカウントがリストされます。読者は、すべてのツイートをHDFS上に保存しているので、多くのほかの事柄を計算するバッチ処理を実行することが出来ます。そして、それは毎時間、ゼロからすべてを再計算することが出来ます。読者はここで完全な自由とフォルト・トレランスを手に入れるのです。

サービス・レイヤー

このアーキテクチャーの挑戦の一つが、ギガバイトもしくはテラバイトさえもの価値あるデータでありうるデータセットに対し、高スループット下での低レイテンシーな問い合わせを提供するというものです(複数の同時のユーザーに対して)。 読者はHadoopを使って、ほぼ全部の統計情報を計算することが出来るでしょう。そのため、読者はなんらかの橋渡しをHadoopと高速データベースの間に必要とするでしょう。読者は直接Hadoopから生成されたファイルを問い合わせることはできません。ImpalaもしくはDrillのようなものならば、読者に対話的にこのデータセットを分析することができるかもしれません。しかし、読者はWebサーバーの前にそれを置くことはできないのです。読者はインデックスもしくは、バイナリーツリーのような実際に高速に検索できるものが必要なのです。

読者はVoldemortElephantDBのようなキー・バリュー・ストアに、ハッシュタグをキーに統計情報をダンプするかもしれません。 しかし、もし読者が日次、週次、月次のタイムラインを提供するように考えたとしたらどうでしょう。最も活発な時間帯を示すために、時間毎にグループ化したいと考えたとしたらどうでしょう。Hadoopに対してキー・バリュー・ストアを使用するのは、常に読者が見ようとしている全てを事前に計算しておく必要があることを意味します。それは、常に満足できるとは限りません。Splout SQLは、通常、より便利であり、事前に収集されたデータに対しリッチでかつ柔軟な問い合わせを許します。

In the the example’s github you will find instructions on how to quickly download Splout SQL and load the example dataset in it. Splout SQL’s command-line tools for indexing and deploying look like this:

hadoop jar splout-hadoop-*-hadoop.jar simple-generate -i sample-hashtags -o out-hashtags -pby hashtag -p 2 -s "label:string,date:string,count:int,hashtag:string" --index "hashtag,date" -t hashtags -tb hashtags
hadoop jar splout-hadoop-*-hadoop.jar deploy -q http://localhost:4412 -root out-hashtags -ts hashtags

The first line generates the indexed data structures (SQL files) needed for serving queries fast and the second line launches a deploy process that moves the files from the Hadoop cluster to the Splout SQL serving cluster.

Because Splout is a partitioned SQL, you need to specify partitioning schema and fields, as well as number of partitions to generate. You can also specify things like “fields to be indexed” if you know what kind of queries you will do. In this case we created a compound index on “hashtag” and “date” which would allow us to extend the application, being able to query the data between arbitrary time periods.

But for Splout SQL to be used by Trident’s DRPC we need to build a connector for it (SploutState). This connector extends ReadOnlyState and provides the necessary StateFactory required by Trident:

1 public class SploutState extends ReadOnlyState {
2 // The Splout Java client
3 private SploutClient sploutClient;
4 ...
5 public static class Factory implements StateFactory {
6 ...
7 }
8 }

It then implements a method that can receive multiple queries and execute them using Splout’s Java client:

1 public List<Object> querySplout(String tablespace, List<String> sql, List<String> keys) {
2 ...
3 }

(Note how each query is associated with a partition key. This is necessary since Splout only queries one partition for each query.)
(Why multiple queries? Because Trident may group several Tuples into mini-batches that may be then executed more efficiently rather than doing them one-by-one.)

Another thing we need to do is implement a BaseQueryFunction (HashTagsSploutQuery) that will be used in conjunction with SploutState to define the DRPC stream. This function will contain the business logic involved in querying Splout for returning the data we want it to return. So, SploutState can be used together with any BaseQueryFunction. In this case, our function will look like:

1 public class HashTagsSploutQuery extends BaseQueryFunction<SploutState, Object> {
2
3 public List<Object> batchRetrieve(SploutState state, List<TridentTuple> args) {
4 List<String> sqls = new ArrayList<String>();
5 List<String> partitionKeys = new ArrayList<String>();
6 // fill the data
7 for(TridentTuple arg: args) {
8 String hashTag = arg.getString(0);
9 sqls.add("SELECT SUM(count), substr(date, 0, 9) as day FROM hashtags WHERE hashtag = '" + hashTag + "' GROUP BY day;");
10 partitionKeys.add(hashTag);
11 }
12 return state.querySplout(TABLESPACE, sqls, partitionKeys);
13 }
14
15 public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
16 collector.emit(new Values(result));
17 }
18 }

The first method, batchRetrieve(), is called by Trident with a batch of Tuples. We then call the underlying SploutState for resolving the query using a custom SQL that groups by day and queries by hashtag. The second method, execute() is called to obtain the data that will be appended to the Tuples in the stream. In this case we will append one more field to the tuple that will contain the result of each query.

We will see later how this part is connected to the rest of the system in more detail.

The real-time layer

The real-time layer is implemented using a Trident stream that saves state into a memory map. The code can be found in the topology class (LambdaHashTagsTopology) and it looks like this:

1 TridentState hashTagCounts = topology
2 .newStream("spout1", spout)
3 // note how we carry the date around
4 .each(new Fields("tweet", "date"), new Split(), new Fields("word"))
5 .each(new Fields("word", "date"), new HashTagFilter(), new Fields("hashtag"))
6 .groupBy(new Fields("hashtag"))
7 .persistentAggregate(mapState, new Fields("hashtag", "date"), new CountByDate(), new Fields("datecount"));

When developing Trident streams you have to keep in mind two things. One is the way Tuples are mutated around the stream: each() functions process a set of input Fields and emit a Tuple with these input fields together with the output Fields that the function emits. On the other hand, aggregate() functions only emit the fields that are derived from the function. If you want to emit a subset of the Fields of an each() function you can use project(). The other thing is that, because of this, you can’t emit output Fields that are named like an input Field (they would collide in the result Tuple).

Developing Trident streams follows the same programming pattern than that of Cascading. You need to create custom Functions or Filters that execute certain business logic you are interested in, and then inject an instance of them into the flow. For this case, we created “HashTagFilter” that takes an input word and only emits it if it’s a hashtag (#), in which case it emits all characters but the first one:

1 public static class HashTagFilter extends BaseFunction {
2 public void execute(TridentTuple tuple, TridentCollector collector) {
3 String word = tuple.getString(0);
4 if(word.startsWith("#")) {
5 collector.emit(new Values(word.substring(1, word.length())));
6 }
7 }
8 }

We also created an aggregator called “CountByDate” which is executed before persisting all the tuples for a certain hashtag. Because we want to show certain timelines, we need to group the real-time counts by some date. For simplicity we just grouped by day, but we could have extended this further. Keyed by hashtag, we will save a Map of date -> count. Because all this will be executed in parallel, we can provide a Combiner function for efficiency (like in Hadoop). The combiner will just merge two maps that may potentially have different keys.

1 public static class CountByDate implements CombinerAggregator<Map<String, Long>> {
2
3 public Map<String, Long> init(TridentTuple tuple) {
4 Map<String, Long> map = zero();
5 map.put(tuple.getString(1), 1L);
6 return map;
7 }
8
9 public Map<String, Long> combine(Map<String, Long> val1, Map<String, Long> val2) {
10 for(Map.Entry<String, Long> entry : val2.entrySet()) {
11 val2.put(entry.getKey(), MapUtils.getLong(val1, entry.getKey(), 0L) + entry.getValue());
12 }
13 for(Map.Entry<String, Long> entry : val1.entrySet()) {
14 if(val2.containsKey(entry.getKey())) {
15 continue;
16 }
17 val2.put(entry.getKey(), entry.getValue());
18 }
19 return val2;
20 }
21
22 public Map<String, Long> zero() {
23 return new HashMap<String, Long>();
24 }
25 }

We will see now how both the serving-layer and the real-time layed are connected through the DRPC service.

The DRPC service

One of Storm’s goodness is its ability to execute distributed RPC calls, and parallelize them. We can use this service for populating a website that will show timelines for hashtags. Now that we have a batch layer that computes all historical counts and feeds them into Splout SQL, and a real-time layer that can update hashtag counts happening right now in seconds time, how do we put all this together?

The DRPC service we add to the topology looks like this:

1 topology
2 .newDRPCStream("hashtags", drpc)
3 .each(new Fields("args"), new Split(), new Fields("hashtag"))
4 .groupBy(new Fields("hashtag"))
5 .stateQuery(hashTagCounts, new Fields("hashtag"), new MapGet(), new Fields("resultrt"))
6 .stateQuery(sploutState, new Fields("hashtag", "resultrt"), new HashTagsSploutQuery(), new Fields("resultbatch"))
7 .each(new Fields("hashtag", "resultrt", "resultbatch"), new LambdaMerge(), new Fields("result"))
8 .project(new Fields("result"));

Queries are parallelized by “hashtag”. Two queries are executed in sequence, one to the real-time layer (hashTagCounts) and the other one to the batch serving-layer (sploutState). Note how we use Trident’s built-in MapGet() for querying the real-time layer and HashTagsSploutQuery() for querying Splout SQL. Note how the Tuple evolves to have three fields: the hashtag, and the result of each layer. Finally, we use a function called LambdaMerge() that merges the result into a new Tuple field called “result” and project the result to a one-field Tuple that will be returned to the DRPC user.

The business logic of merging the results from both layers is straight-forward. We assume that the batch layer is always right: therefore, if a value comes from it, it overrides any possible value from the real-time layer. We also return the Map sorted by key for convenience:

1 public static class LambdaMerge extends BaseFunction {
2
3 public void execute(TridentTuple tuple, TridentCollector collector) {
4 Map<String, Long> resultRealTime = (Map<String, Long>) tuple.get(1);
5 QueryStatus resultBatch = (QueryStatus) tuple.get(2);
6 TreeMap<String, Long> consolidatedResult;
7
8 if(resultRealTime != null) {
9 consolidatedResult = new TreeMap<String, Long>(resultRealTime);
10 } else {
11 consolidatedResult = new TreeMap<String, Long>();
12 }
13 if(resultBatch != null) {
14 if(resultBatch.getResult() != null) {
15 for(Object rowBatch : resultBatch.getResult()) {
16 Map<String, Object> mapRow = (Map<String, Object>) rowBatch;
17 String day = (String) mapRow.get("day");
18 Long count = Long.parseLong(mapRow.get("SUM(count)").toString());
19 consolidatedResult.put(day, count);
20 }
21 }
22 }
23 collector.emit(new Values(consolidatedResult));
24 }
25 }

Trying it

You can follow the instructions on github for trying this example. The example will use two fake tweets for the real-time layer (#california is cool, I like #california) and query Splout where you would have loaded the example dataset – which also contains counts for california. So after running everything you should see something like this:

...
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":76}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":136}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":192}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":232}]]
Result for hashtag 'california' -> [[{"20091022":115,"20091023":115,"20091024":158,"20091025":19,"20130123":286}]]
...

As you see, the last date (which should match today’s date) is increased in real-time while the other historical dates are appended – they come from Splout.

Conclusions and more

Through this example we have shown Trident, an interesting and useful higher-level API on top of Storm, and Splout SQL, a fast, partitioned, read-only SQL for Hadoop. We have also shown a real example of a fully scalable “lambda architecture”, omitting certain parts that were not so relevant for the example.

But even though for this example to be complete we need to mention the role of a “Master coordinator”. This coordinator should trigger the batch layer periodically and it should assure that the batch layer always processes complete data. If we group by hour, no partial hour should be processed by the batch layer. By meeting this condition we can safely overwrite data from the real-time layer. On the other hand, for the real-time layer to be efficient, we would need to implement an expiration mechanism so that only a rolling time frame is kept in it. Care has to be taken to expire data soon for minimizing the memory footprint but not too soon so that the batch layer wouldn’t have enough time to complete.

As mentioned before, the full code with comments is on github and you can follow the README for executing it locally.

出典: http://www.datasalt.com/2013/01/an-example-lambda-architecture-using-trident-hadoop-and-splout-sql/

Last Updated on Tuesday, 26 March 2013 19:38  

ニュース速報

さっきの件だが、資料の候補をあげて、増田さんに依頼メールを送った。

うまく、話がつながるとよいのだが。さて、どうなることやら。