MapReduceアプリケーションの開発から実行までの流れ

提供: omotenashi-mind
移動先: 案内検索


ここではMapReduceアプリケーションの開発からHadoop上での実行までの流れを簡単に解説致します。

前提

Hadoopが導入済みの環境があることが前提です。導入がお済みでない場合は「Apache Hadoop入門」をご覧ください。

データ準備

本記事では過去の気象データからその年の最高気温を求める仕組みをMapReduceで作成する手順をベースに説明を進めていきます。

はじめにMapReduceで処理するデータの準備を行います。
日本の過去の気象データは「気象庁のサイト」からダウンロードできるので、そちらから入手してください。
なお、ダウンロードしたファイルの先頭には処理に不要なヘッダー情報があるので除去してください。

(ヘッダー情報を除去した後のダウンロードファイルのイメージ)

1981/1/1 1:00:00,
1981/1/1 2:00:00,
1981/1/1 3:00:00,2.7
1981/1/1 4:00:00,
1981/1/1 5:00:00,
1981/1/1 6:00:00,1.6
1981/1/1 7:00:00,
1981/1/1 8:00:00,
1981/1/1 9:00:00,1.9
...

※データ量を多くするため、あえて1時間単位の温度を取得しています。


コーディング

データの準備ができたら、さっそくコーディングを進めます。
Hadoop上でのMapReduceアプリケーション実行に必要なものは次の3つです。

  1. Mapperクラス
  2. Reducerクラス
  3. Job実行用メインクラス


それぞれ順を追って説明します。

Mapperクラス

MapperクラスはMapReduceの最初のステップで呼び出される処理で、入力データ内からReduceに渡すべき値の選定とキーとなる情報を抽出する役割を持ちます。
年ごとの最高気温を計算したい場合、Mapperの役割としてはキーとなる「年」とその年に計測された気温がReducerに渡すべき値となります。

以下、サンプルコードを示します。

(Mapperサンプルコード)

 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.io.DoubleWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 
 9 public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
10 
11 	@Override
12 	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
13 		
14 		String[] data = value.toString().split(",");
15 		if(data.length != 2) {
16 			return;
17 		}
18 		
19 		String year = data[0].split("/")[0];
20 		double temperature = Double.parseDouble(data[1]);
21 		
22 		context.write(new Text(year), new DoubleWritable(temperature));
23 	}
24 }


Reducerクラス

次に用意するのがReducerです。Reducerの役割はMapperが選定した値から目的の値を算出することです。
今回の例では、年ごとの測定結果から最高気温を導出することが目的となります。

以下、サンプルコードです。

(Reducerサンプルコード)

 1 import java.io.IOException;
 2 import java.util.Comparator;
 3 import java.util.Optional;
 4 import java.util.stream.StreamSupport;
 5 import org.apache.hadoop.io.DoubleWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 public class MaxTemperatureReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
10 	
11 	@Override
12 	public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
13 		double maxValue = Double.MIN_VALUE;
14 		
15 		for(DoubleWritable value : values) {
16 			if(maxValue < value.get()) {
17 				maxValue = value.get();
18 			}
19 		}
20 		
21 		context.write(key, new DoubleWritable(maxValue));
22 	}
23 }


Mainクラス

最後に用意するのが、MapperとReducerを組み合わせてジョブを実行するためのMainクラスです。

以下、サンプルコードです。

(Mainサンプルコード)

 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.DoubleWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapred.FileInputFormat;
 8 import org.apache.hadoop.mapred.FileOutputFormat;
 9 import org.apache.hadoop.mapred.JobConf;
10 import org.apache.hadoop.mapreduce.Job;
11 
12 public class MaxTemperature {
13 
14 	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
15 		
16 		JobConf jobConf = new JobConf(new Configuration());
17 		FileInputFormat.addInputPath(jobConf, new Path(args[0]));
18 		FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
19 
20 		Job job = Job.getInstance(jobConf);
21 		
22 		job.setJarByClass(MaxTemperature.class);
23 		job.setJobName("Max temperature");
24 		
25 		job.setMapperClass(MaxTemperatureMapper.class);
26 		job.setReducerClass(MaxTemperatureReducer.class);
27 		job.setOutputKeyClass(Text.class);
28 		job.setOutputValueClass(DoubleWritable.class);
29 		
30 		System.exit(job.waitForCompletion(true) ? 0 : 1);
31 	}
32 }


実行編

コーディングが完了したら、いよいよHadoop上での実行です。

jarファイル準備

先ほど作成したMapReduceアプリケーションを実行するためにはjarファイルを作成する必要があります。
Eclipseのエクスポート機能やmavenのタスクなどでjarファイルを作成してください。
ここでは作成したjarファイルの名前を「max-temperature-calc.jar」とします。

Hadoop起動&入力ファイル準備

Hadoopを起動し、入力ファイルの準備を行います。
ここでは擬似分散モードあるいは完全分散モードでHadoopが設定されていることを前提に説明を進めます。

まず必要な作業はHadoopの起動です。

sbin/start-dfs.sh


起動が完了したら、HDFS上にファイルの配置を行います。
input-temperatureフォルダ配下にあるデータをHDFS上のinput-temperatureフォルダに配置したい場合は次のようなコマンドとなります。

bin/hdfs dfs -put input-temperature/ input-temperature


MapReduceアプリケーションの実行

入力ファイルのHDFS上への配置が完了したら、いよいよ実行です。

bin/hadoop jar ./max-temperature-calc.jar input-temperature output-temperature


処理が正常に完了していれば、HDFS上のoutput-temperatureフォルダに処理結果が出力されているはずです。

以上.

まとめ

いかがだったでしょうか。Hadoop上でMapReduceアプリケーションの実行を行うためには少し手間が掛かりますが、手順を覚えてしまえばそれほど抵抗はなくなると思います。
とはいえ、開発中に何度も繰り返す作業は非効率的なので単体レベルでの動作確認と組み合わせ行うほうが良いでしょう。

MapReduceの単体テスト方法については「MRUnitを用いたMapReduceの単体テスト」でもまとめていますのでご覧ください。