日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop emr_在Amazon EMR上运行Hadoop MapReduce作业

發(fā)布時間:2023/12/3 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop emr_在Amazon EMR上运行Hadoop MapReduce作业 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

hadoop emr

不久前,我發(fā)布了如何使用CLI設(shè)置EMR群集的信息。 在本文中,我將展示如何使用適用于AWS的Java SDK來設(shè)置集群。 展示如何使用Java AWS開發(fā)工具包執(zhí)行此操作的最佳方法是展示我認(rèn)為完整的示例,因此讓我們開始吧。

    • 設(shè)置一個新的Maven項目

為此,我創(chuàng)建了一個新的默認(rèn)Maven項目。 您可以運行該項目中的主類來啟動EMR集群并執(zhí)行我在本文中創(chuàng)建的MapReduce作業(yè):

package net.pascalalma.aws.emr;import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; import com.amazonaws.services.elasticmapreduce.model.*; import com.amazonaws.services.elasticmapreduce.util.StepFactory; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client;import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.UUID;/*** Created with IntelliJ IDEA.* User: pascal* Date: 22-07-13* Time: 20:45*/ public class MyClient {private static final String HADOOP_VERSION = "1.0.3";private static final int INSTANCE_COUNT = 1;private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();private static final UUID RANDOM_UUID = UUID.randomUUID();private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString();private static final String BUCKET_NAME = "map-reduce-intro";private static final String S3N_HADOOP_JAR ="s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar";private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/";private static final String[] JOB_ARGS =new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt","s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME};private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED,JobFlowExecutionState.FAILED,JobFlowExecutionState.TERMINATED});static AmazonS3 s3;static AmazonElasticMapReduceClient emr;private static void init() throws Exception {AWSCredentials credentials = new PropertiesCredentials(MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties"));s3 = new AmazonS3Client(credentials);emr = new AmazonElasticMapReduceClient(credentials);emr.setRegion(Region.getRegion(Regions.EU_WEST_1));}private static JobFlowInstancesConfig configInstance() throws Exception {// Configure instances to useJobFlowInstancesConfig instance = new JobFlowInstancesConfig();instance.setHadoopVersion(HADOOP_VERSION);instance.setInstanceCount(INSTANCE_COUNT);instance.setMasterInstanceType(INSTANCE_TYPE);instance.setSlaveInstanceType(INSTANCE_TYPE);// instance.setKeepJobFlowAliveWhenNoSteps(true);// instance.setEc2KeyName("4synergy_palma");return instance;}private static void runCluster() throws Exception {// Configure the job flowRunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance());request.setLogUri(S3N_LOG_URI);// Configure the Hadoop jar to useHadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);jarConfig.setArgs(ARGS_AS_LIST);try {StepConfig enableDebugging = new StepConfig().withName("Enable debugging").withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(new StepFactory().newEnableDebuggingStep());StepConfig runJar =new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),jarConfig);request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar}));//Run the job flowRunJobFlowResult result = emr.runJobFlow(request);//Check the status of the running jobString lastState = "";STATUS_LOOP:while (true) {DescribeJobFlowsRequest desc =new DescribeJobFlowsRequest(Arrays.asList(new String[]{result.getJobFlowId()}));DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);for (JobFlowDetail detail : descResult.getJobFlows()) {String state = detail.getExecutionStatusDetail().getState();if (isDone(state)) {System.out.println("Job " + state + ": " + detail.toString());break STATUS_LOOP;} else if (!lastState.equals(state)) {lastState = state;System.out.println("Job " + state + " at " + new Date().toString());}}Thread.sleep(10000);}} catch (AmazonServiceException ase) {System.out.println("Caught Exception: " + ase.getMessage());System.out.println("Reponse Status Code: " + ase.getStatusCode());System.out.println("Error Code: " + ase.getErrorCode());System.out.println("Request ID: " + ase.getRequestId());}}public static boolean isDone(String value) {JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);return DONE_STATES.contains(state);}public static void main(String[] args) {try {init();runCluster();} catch (Exception e) {e.printStackTrace(); }} }

在此類中,我首先聲明一些常量,我認(rèn)為這些常量是顯而易見的。 在init()方法中,我使用添加到項目中的憑據(jù)屬性文件。 我將此文件添加到了Maven項目的'/ main / resources'文件夾中。 它包含我的訪問密鑰和秘密密鑰。
我還將EMR客戶的區(qū)域設(shè)置為“ EU-WEST”。
下一個方法是“ configInstance()”。 在這種方法中,我通過設(shè)置Hadoop版本,實例數(shù),實例大小等來創(chuàng)建和配置JobFlowInstance。您還可以配置'keepAlive'設(shè)置,以在作業(yè)完成后使集群保持活動狀態(tài)。 在某些情況下這可能會有所幫助。 如果要使用此選項,則也可以設(shè)置要用于訪問集群的密鑰對,這可能會很有用,因為如果不設(shè)置此密鑰就無法訪問集群。 方法“ runCluster()”是集群實際運行的地方。 它創(chuàng)建啟動集群的請求。 在此請求中,添加了必須執(zhí)行的步驟。 在我們的例子中,其中一個步驟是運行在先前步驟中創(chuàng)建的JAR文件。 我還添加了一個調(diào)試步驟,以便在集群完成并終止后我們可以訪問調(diào)試日志記錄。 我們可以簡單地訪問我用常量'S3N_LOG_URI'設(shè)置的S3存儲桶中的日志文件。 創(chuàng)建此請求后,我們將基于此請求啟動集群。 然后,我們每隔10秒鐘拉動一次,以查看作業(yè)是否完成,并在控制臺上顯示一條消息,指示作業(yè)的當(dāng)前狀態(tài)。 要執(zhí)行第一次運行,我們必須準(zhǔn)備輸入。

    • 準(zhǔn)備輸入

作為作業(yè)的輸入(有關(guān)此示例作業(yè)的更多信息,請參見此),我們必須使字典內(nèi)容可用于EMR群集。 此外,我們必須使JAR文件可用,并確保輸出和日志目錄存在于我們的S3存儲桶中。 有幾種方法可以執(zhí)行此操作:您也可以通過使用SDK以編程方式執(zhí)行此操作,也可以使用S3cmd從命令行執(zhí)行此操作,也可以使用AWS管理控制臺進(jìn)行此操作 。 只要您得到類似的設(shè)置,就可以了:

  • s3:// map-reduce-intro
  • s3:// map-reduce-intro / input
  • s3://map-reduce-intro/input/input.txt
  • s3:// map-reduce-intro / job
  • s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
  • s3:// map-reduce-intro / log
  • s3:// map-reduce-intro / result

或在使用S3cmd時如下所示:

s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/ 2013-07-20 13:06 469941 s3://map-reduce-intro/input/input.txt 2013-07-20 14:12 5491 s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar 2013-08-06 14:30 0 s3://map-reduce-intro/log/ 2013-08-06 14:27 0 s3://map-reduce-intro/result/

在上面的示例中,我已經(jīng)在代碼中引入了S3客戶端。 您還可以使用它來準(zhǔn)備輸入或獲取輸出,作為客戶工作的一部分。

    • 運行集群

當(dāng)一切就緒后,我們就可以運行工作。 我只是在IntelliJ中運行'MyClient'的主要方法,并在控制臺中獲得以下輸出:

Job STARTING at Tue Aug 06 16:31:55 CEST 2013 Job RUNNING at Tue Aug 06 16:36:18 CEST 2013 Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013 Job COMPLETED: {JobFlowId: j-JDB14HVTRC1L,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:14 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013,LastStateChangeReason: Steps completed},Instances: {MasterInstanceType: m1.small,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com,MasterInstanceId: i-93268ddf,InstanceCount: 1,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ,Name: master,Market: ON_DEMAND,InstanceRole: MASTER,InstanceType: m1.small,InstanceRequestCount: 1,InstanceRunningCount: 0,State: ENDED,LastStateChangeReason: Job flow terminated,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:34:28 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}],NormalizedInstanceHours: 1,Ec2KeyName: 4synergy_palma,Placement: {AvailabilityZone: eu-west-1a},KeepJobFlowAliveWhenNoSteps: false,TerminationProtected: false,HadoopVersion: 1.0.3},Steps: [{StepConfig: {Name: Enable debugging,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:12 CEST 2013,EndDateTime: Tue Aug 06 16:36:40 CEST 2013,}}, {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,Args: [s3n://map-reduce-intro/input/input.txt, s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:40 CEST 2013,EndDateTime: Tue Aug 06 16:38:10 CEST 2013,}}],BootstrapActions: [],SupportedProducts: [],VisibleToAllUsers: false ,} Process finished with exit code 0

當(dāng)然,我們在S3存儲桶中配置的“結(jié)果”文件夾中有一個結(jié)果:

我將結(jié)果傳輸?shù)轿业谋镜赜嬎銠C上,并進(jìn)行了查看:

這樣就可以得出這個簡單的結(jié)論,但我認(rèn)為,這是創(chuàng)建Hadoop作業(yè)并在對它進(jìn)行單元測試之后在群集上運行它的完整示例,就像我們對所有軟件所做的那樣。

以該設(shè)置為基礎(chǔ),可以輕松地提出更復(fù)雜的業(yè)務(wù)案例,并對其進(jìn)行測試和配置以在AWS EMR上運行。

參考: The Pragmatic Integrator博客上的JCG合作伙伴 Pascal Alma在Amazon EMR上運行Hadoop MapReduce作業(yè) 。

翻譯自: https://www.javacodegeeks.com/2013/09/run-your-hadoop-mapreduce-job-on-amazon-emr.html

hadoop emr

總結(jié)

以上是生活随笔為你收集整理的hadoop emr_在Amazon EMR上运行Hadoop MapReduce作业的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。