前回は、
Java SDKを使う準備をする
実際に使用するにはSDKをダウンロードしておく必要があります。下記サイトからダウンロードしておきましょう。
- AWS SDK for Java
- URL:http://
aws. amazon. com/ sdkforjava/ - Java Library for Amazon Elastic MapReduce
- URL:http://
aws. amazon. com/ code/ Elastic-MapReduce/ 2305
AWS SDK for Javaは、
また、
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.1.1</version>
</dependency>
Amazon Elastic MapReduce Clientのコンストラクタにプロパティファイルを渡す
では、
private static final String AWS_CREDENTIALS_PROPERTIES = "/example/emr/AWSCredentials.properties";
private AmazonElasticMapReduce emr;
public EMRManager() throws IOException {
emr = new AmazonElasticMapReduceClient(
new PropertiesCredentials(
EMRManager.class.getResourceAsStream(AWS_CREDENTIALS_PROPERTIES)));
}
SDKでEMRを操作するにはAmazonElasticMapReduceClientを使います。AmazonElasticMapReduceClientのコンストラクタに渡している
accessKey=YYYYYYYYYYYYYYYYY
secretKey=ZZZZZZZZZZZZZZZZZZ
これらは前回も出てきたアクセスキー、
public EMRManager() throws IOException {
emr = new AmazonElasticMapReduceClient(new BasicAWSCredentials("accessKey", "secretKey"));
}
としてBasicAWSCredentialsを使って各キーを指定することもできます。
ステップを定義する
EMRを実行するにあたっては、
StepConfig stepConfig =
new StepConfig()
.withName("job")
.withActionOnFailure(ACTION_ON_FAILURE)
.withHadoopJarStep(new HadoopJarStepConfig()
.withJar(MAPREDUCE_JAR)
.withArgs(Arrays.asList(jobName, inputPath, outputPath)));
リスト4の3行目以降の意味は以下のとおりです。
- StepConfig#withName
- →Jobの名前を指定します。
- StepConfig#withActionOnFailure
- →Jobの実行に失敗した時の動作を指定します。CANCEL_
AND_ WAITはJobに失敗した場合、 Job自体をキャンセルし、 EMRを待機状態にします。 - StepConfig#withHadoopJarStep
- →実行するJarを定義します。
- HadoopJarStepConfig#withJar
- →Jarのパスを指定します。
- HadoopJarStepConfig#withArgs
- →Jarに渡すパラメータを指定します。
デバッグモードで起動するには
前回でも紹介しましたが、
private static final String EMR_DEBUGGIN_NAME = "Debugging";
StepConfig enableDebugging =
new StepConfig()
.withName(EMR_DEBUGGIN_NAME)
.withActionOnFailure(ACTION_ON_TERMINATE)
.withHadoopJarStep(new StepFactory().newEnableDebuggingStep());
Jobと違うのは以下の2点です。
- withActionOnFailureにTERMINATE_
JOB_ FLOWを指定している (実行に失敗した場合、 EMR自体を終了させます) - withHadoopJarStepにデバッグ用のステップを指定している
Jobを実行させるリクエストを定義する
最後に、
private static final String INSTANCE = "m1.large";
private static final String AVAILABILITY_ZONE = "us-east-1a";
private static final String HADOOP_VERSION = "0.20";
private static final String KEY_PAIR_NAME = "cluster-key";
private static final int INSTANCE_NUMBER = 10;
private static final String MAPREDUCE_JAR = "s3://example/mr.jar";
private static final String EMR_LOG_URI = "s3://emr-log/";
RunJobFlowRequest runJobFlowRequest =
new RunJobFlowRequest()
.withName(MAP_REDUCE_NAME)
.withSteps(enableDebugging, stepConfig)
.withLogUri(EMR_LOG_URI)
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName(KEY_PAIR_NAME)
.withHadoopVersion(HADOOP_VERSION)
.withInstanceCount(INSTANCE_NUMBER)
.withKeepJobFlowAliveWhenNoSteps(true)
.withMasterInstanceType(INSTANCE)
.withSlaveInstanceType(INSTANCE)
.withPlacement(new PlacementType()
.withAvailabilityZone(AVAILABILITY_ZONE)));
リスト6の3行目以降の意味を、
- runJobFlowRequest#withName
- →このJob全体の名前を定義します。
- runJobFlowRequest#withSteps
- →先ほど定義した実際のJobとデバッグ用のステップを定義します。
- runJobFlowRequest#withLogUri
- →今回はデバッグの指定をしているので出力先を定義します。
- runJobFlowRequest#withInstances
- →JobFlowInstancesConfigで細かいインスタンスの情報を定義します。
続いて、
- JobFlowInstancesConfig#withEc2KeyName
- →EC2で定義してあるSSHのキーを指定します。
- JobFlowInstancesConfig#withHadoopVersion
- →Hadoopのバージョンを定義します。現在は0.
19と0. 20を指定することができます。 - JobFlowInstancesConfig#withInstanceCount
- →インスタンスの起動数を定義します。このインスタンス数は全体でのインスタンス数となります。今回は10インスタンスとしました。そのため、
マスタノード1台、 スレーブノード9台という構成になります。 - JobFlowInstancesConfig#withKeepJobFlowAliveWhenNoSteps
- →ステップが終了した時の動作です。trueとしてあるので1つのJobが終了してもEMR自体は終了せずに待機状態になります。
- JobFlowInstancesConfig#withMasterInstanceType
- →マスタのインスタンスタイプを指定します。
- JobFlowInstancesConfig#withSlaveInstanceType
- →スレーブのインスタンスタイプを指定します。
- JobFlowInstancesConfig#withPlacement
- →PlacementTypeを定義し、
インスタンスをどのEC2のゾーンに起動するかを定義します。
なお、
Jobを実行させる
それでは最後にEMRを起動し、
RunJobFlowResult result = emr.runJobFlow(runJobFlowRequest);
String jobFlowId = result.getJobFlowId();
まずはAmazonElasticMapReduce#runJobFlowを渡します。実行するとRunJobFlowResultに結果が返ってきますが、
また、
AddJobFlowStepsRequest addJobFlowStepsRequest =
new AddJobFlowStepsRequest().withJobFlowId(jobFlowId)
.withSteps(stepConfig);
emr.addJobFlowSteps(addJobFlowStepsRequest);
それぞれの行の意味は以下のとおりです。
- AddJobFlowStepsRequestに追加のJobリクエストを定義
- AddJobFlowStepsRequest#withJobFlowIdにRunJobFlowResultで取得したJobフローIDを指定
- AddJobFlowStepsRequest#withStepsで新しいステップを定義
- AmazonElasticMapReduce#addJobFlowStepsに追加のリクエストを渡して完了
Jobが完了したか確認する
ここまでで起動させる方法を見てきましたが、
private static final String STEP_DETAIL_STATUS_PENDING = "PENDING";
private static final String STEP_DETAIL_STATUS_RUNNING = "RUNNING";
private static final String STEP_DETAIL_STATUS_COMPLETED = "COMPLETED";
String stepStatus = STEP_DETAIL_STATUS_PENDING;
while (stepStatus.equals(STEP_DETAIL_STATUS_PENDING) ||
stepStatus.equals(STEP_DETAIL_STATUS_RUNNING)) {
try {
Thread.sleep(1 * (1000 * 60));
} catch (Exception e) {
}
DescribeJobFlowsRequest describeJobFlowsRequest =
new DescribeJobFlowsRequest().withJobFlowIds(jobFlowId);
DescribeJobFlowsResult describeJobFlowsResult =
emr.describeJobFlows(describeJobFlowsRequest);
boolean found = false;
for (JobFlowDetail jobFlowDetail : describeJobFlowsResult.getJobFlows()) {
for (StepDetail stepDetail : jobFlowDetail.getSteps()) {
if (stepDetail.getStepConfig().getName().equals("job1")) {
stepStatus = stepDetail.getExecutionStatusDetail().getState();
found = true;
break;
}
}
if (found) {
break;
}
}
}
ここではステップの実行が待ち、
まずは、
DescribeJobFlowsResultからステップのステータスを取得し、
以上で、
EMRを終了させる
最後に忘れてはいけないのはEMR自体を終了させることです。これを忘れてしまうといつまでもEMR
終了のコードはリスト10のようになります。
TerminateJobFlowsRequest request = new TerminateJobFlowsRequest();
request.setJobFlowIds(Arrays.asList(jobFlowId));
emr.terminateJobFlows(request);
以下の3つを行うのがポイントとなります。
- TerminateJobFlowsRequestを定義する
- 終了させるJobフローIDを指定する
- AmazonElasticMapReduce#terminateJobFlowsを送信して完了
今回は、
次回は、
- 今回の記事内で紹介したサンプルコードはこちらからダウンロードしてください:samples.
zip