日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop提交作业到云端问题解决

發布時間:2023/12/10 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop提交作业到云端问题解决 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

問題描述:

當按照Hadoop實戰上講述的用eclipse提交作業,其實作業是運行在eclipse虛擬的一個云環境中,而不是真正提交到Hadoop云端運行。在50030上也看不到job的運行記錄,此時的代碼如下:

package com.spork.hadoop.jobutil.test;import java.io.File; import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import com.spork.hadoop.jobutil.EJob;public class WordCountTest {public static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"};Job job = new Job(conf, "word count");job.setJarByClass(WordCountTest.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inAndOut[0]));FileOutputFormat.setOutputPath(job, new Path(inAndOut[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }


如果要將作業真正提交到云端,一種是用命令行打jar包,然后運行、提交作業,這方式比較笨,而程序員都是些懶人啊,能讓機器做干嘛自己動手。具體實現可以參考這個博客:http://weixiaolu.iteye.com/blog/1402919

這里介紹一個更加智能的方式,其主要思路是將jar打包的工作放在了java代碼中來完成。其中使用了一個工具類ejob.java。

示例代碼如下:

package com.spork.hadoop.jobutil.test;import java.io.File; import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import com.spork.hadoop.jobutil.EJob;public class WordCountTest {public static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {// Add these statements. XXXFile jarFile = EJob.createTempJar("bin");EJob.addClasspath("/usr/hadoop-1.2.1/conf");ClassLoader classLoader = EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf = new Configuration();String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"};Job job = new Job(conf, "word count");// And add this statement. XXX((JobConf) job.getConfiguration()).setJar(jarFile.toString());job.setJarByClass(WordCountTest.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inAndOut[0]));FileOutputFormat.setOutputPath(job, new Path(inAndOut[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

?

ejob類的代碼如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.spork.hadoop.jobutil;import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; import java.util.jar.Manifest;public class EJob {private static ArrayList<URL> classPath = new ArrayList<URL>();/** Unpack a jar file into a directory. */public static void unJar(File jarFile, File toDir) throws IOException {JarFile jar = new JarFile(jarFile);try {Enumeration entries = jar.entries();while (entries.hasMoreElements()) {JarEntry entry = (JarEntry) entries.nextElement();if (!entry.isDirectory()) {InputStream in = jar.getInputStream(entry);try {File file = new File(toDir, entry.getName());if (!file.getParentFile().mkdirs()) {if (!file.getParentFile().isDirectory()) {throw new IOException("Mkdirs failed to create "+ file.getParentFile().toString());}}OutputStream out = new FileOutputStream(file);try {byte[] buffer = new byte[8192];int i;while ((i = in.read(buffer)) != -1) {out.write(buffer, 0, i);}} finally {out.close();}} finally {in.close();}}}} finally {jar.close();}}/*** Run a Hadoop job jar. If the main class is not in the jar's manifest, then* it must be provided on the command line.*/public static void runJar(String[] args) throws Throwable {String usage = "jarFile [mainClass] args...";if (args.length < 1) {System.err.println(usage);System.exit(-1);}int firstArg = 0;String fileName = args[firstArg++];File file = new File(fileName);String mainClassName = null;JarFile jarFile;try {jarFile = new JarFile(fileName);} catch (IOException io) {throw new IOException("Error opening job jar: " + fileName).initCause(io);}Manifest manifest = jarFile.getManifest();if (manifest != null) {mainClassName = manifest.getMainAttributes().getValue("Main-Class");}jarFile.close();if (mainClassName == null) {if (args.length < 2) {System.err.println(usage);System.exit(-1);}mainClassName = args[firstArg++];}mainClassName = mainClassName.replaceAll("/", ".");File tmpDir = new File(System.getProperty("java.io.tmpdir"));tmpDir.mkdirs();if (!tmpDir.isDirectory()) {System.err.println("Mkdirs failed to create " + tmpDir);System.exit(-1);}final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);workDir.delete();workDir.mkdirs();if (!workDir.isDirectory()) {System.err.println("Mkdirs failed to create " + workDir);System.exit(-1);}Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {try {fullyDelete(workDir);} catch (IOException e) {}}});unJar(file, workDir);classPath.add(new File(workDir + "/").toURL());classPath.add(file.toURL());classPath.add(new File(workDir, "classes/").toURL());File[] libs = new File(workDir, "lib").listFiles();if (libs != null) {for (int i = 0; i < libs.length; i++) {classPath.add(libs[i].toURL());}}ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0]));Thread.currentThread().setContextClassLoader(loader);Class<?> mainClass = Class.forName(mainClassName, true, loader);Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() });String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);try {main.invoke(null, new Object[] { newArgs });} catch (InvocationTargetException e) {throw e.getTargetException();}}/*** Delete a directory and all its contents. If we return false, the directory* may be partially-deleted.*/public static boolean fullyDelete(File dir) throws IOException {File contents[] = dir.listFiles();if (contents != null) {for (int i = 0; i < contents.length; i++) {if (contents[i].isFile()) {if (!contents[i].delete()) {return false;}} else {// try deleting the directory// this might be a symlinkboolean b = false;b = contents[i].delete();if (b) {// this was indeed a symlink or an empty directorycontinue;}// if not an empty directory or symlink let// fullydelete handle it.if (!fullyDelete(contents[i])) {return false;}}}}return dir.delete();}/*** Add a directory or file to classpath.* * @param component*/public static void addClasspath(String component) {if ((component != null) && (component.length() > 0)) {try {File f = new File(component);if (f.exists()) {URL key = f.getCanonicalFile().toURL();if (!classPath.contains(key)) {classPath.add(key);}}} catch (IOException e) {}}}/*** Add default classpath listed in bin/hadoop bash.* * @param hadoopHome*/public static void addDefaultClasspath(String hadoopHome) {// Classpath initially contains conf dir.addClasspath(hadoopHome + "/conf");// For developers, add Hadoop classes to classpath.addClasspath(hadoopHome + "/build/classes");if (new File(hadoopHome + "/build/webapps").exists()) {addClasspath(hadoopHome + "/build");}addClasspath(hadoopHome + "/build/test/classes");addClasspath(hadoopHome + "/build/tools");// For releases, add core hadoop jar & webapps to classpath.if (new File(hadoopHome + "/webapps").exists()) {addClasspath(hadoopHome);}addJarsInDir(hadoopHome);addJarsInDir(hadoopHome + "/build");// Add libs to classpath.addJarsInDir(hadoopHome + "/lib");addJarsInDir(hadoopHome + "/lib/jsp-2.1");addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common");}/*** Add all jars in directory to classpath, sub-directory is excluded.* * @param dirPath*/public static void addJarsInDir(String dirPath) {File dir = new File(dirPath);if (!dir.exists()) {return;}File[] files = dir.listFiles();if (files == null) {return;}for (int i = 0; i < files.length; i++) {if (files[i].isDirectory()) {continue;} else {addClasspath(files[i].getAbsolutePath());}}}/*** Create a temp jar file in "java.io.tmpdir".* * @param root* @return* @throws IOException*/public static File createTempJar(String root) throws IOException {if (!new File(root).exists()) {return null;}Manifest manifest = new Manifest();manifest.getMainAttributes().putValue("Manifest-Version", "1.0");final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {jarFile.delete();}});JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile),manifest);createTempJarInner(out, new File(root), "");out.flush();out.close();return jarFile;}private static void createTempJarInner(JarOutputStream out, File f,String base) throws IOException {if (f.isDirectory()) {File[] fl = f.listFiles();if (base.length() > 0) {base = base + "/";}for (int i = 0; i < fl.length; i++) {createTempJarInner(out, fl[i], base + fl[i].getName());}} else {out.putNextEntry(new JarEntry(base));FileInputStream in = new FileInputStream(f);byte[] buffer = new byte[1024];int n = in.read(buffer);while (n != -1) {out.write(buffer, 0, n);n = in.read(buffer);}in.close();}}/*** Return a classloader based on user-specified classpath and parent* classloader.* * @return*/public static ClassLoader getClassLoader() {ClassLoader parent = Thread.currentThread().getContextClassLoader();if (parent == null) {parent = EJob.class.getClassLoader();}if (parent == null) {parent = ClassLoader.getSystemClassLoader();}return new URLClassLoader(classPath.toArray(new URL[0]), parent);}}

?

修改之后,50030就可以監測到job的運行狀況了,而且看cpu的使用狀態,各個cpu的占用率也上去了。

?

若想更加詳細了解ejob的實現機理,可以參考下面的博客。

http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html

?

轉載于:https://www.cnblogs.com/lovecreatemylife/p/4370333.html

總結

以上是生活随笔為你收集整理的hadoop提交作业到云端问题解决的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。