Gora官方文档之二:Gora对Map-Reduce的支持
參考官方文檔:http://gora.apache.org/current/tutorial.html
項目代碼見:https://code.csdn.net/jediael_lu/mygorademo
另環境準備見: http://blog.csdn.net/jediael_lu/article/details/43272521
當著數據已通過之前的示例存儲在hbase中,數據如下:
\x00\x00\x00\x00\x00\x00\x00D column=common:ip, timestamp=1422529645469, value=85.100.75.104 \x00\x00\x00\x00\x00\x00\x00D column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\x88\xA0 \x00\x00\x00\x00\x00\x00\x00D column=common:url, timestamp=1422529645469, value=/index.php?i=2&a=1__z_nccylulyu&k=238241 \x00\x00\x00\x00\x00\x00\x00D column=http:httpMethod, timestamp=1422529645469, value=GET \x00\x00\x00\x00\x00\x00\x00D column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8 \x00\x00\x00\x00\x00\x00\x00D column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+ \x00\x00\x00\x00\x00\x00\x00D column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=2&a=1__Z_nccYlULyU&k=238241 \x00\x00\x00\x00\x00\x00\x00D column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7 \x00\x00\x00\x00\x00\x00\x00E column=common:ip, timestamp=1422529645469, value=85.100.75.104 \x00\x00\x00\x00\x00\x00\x00E column=common:timestamp, timestamp=1422529645469, value=\x00\x00\x01\x1F\xF1\xB5\xBFP \x00\x00\x00\x00\x00\x00\x00E column=common:url, timestamp=1422529645469, value=/index.php?i=7&a=1__yxs0vome9p8&k=4924961 \x00\x00\x00\x00\x00\x00\x00E column=http:httpMethod, timestamp=1422529645469, value=GET \x00\x00\x00\x00\x00\x00\x00E column=http:httpStatusCode, timestamp=1422529645469, value=\x00\x00\x00\xC8 \x00\x00\x00\x00\x00\x00\x00E column=http:responseSize, timestamp=1422529645469, value=\x00\x00\x00+ \x00\x00\x00\x00\x00\x00\x00E column=misc:referrer, timestamp=1422529645469, value=http://www.buldinle.com/index.php?i=7&a=1__YxS0VoME9P8&k=4924961 \x00\x00\x00\x00\x00\x00\x00E column=misc:userAgent, timestamp=1422529645469, value=Mozilla/5.0 (Windows; U; Windows NT 5.1; tr; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7本例將使用MR讀取hbase中的數據,并進行分析,分析每個url,一天時間內有多少人在訪問,輸出結果保存在hbase中,表中的key為“url+時間”格式的String,value包括三列,分別是url,時間,訪問次數。
0、創建java project及gora.properties,內容如下: ##gora.datastore.default is the default detastore implementation to use ##if it is not passed to the DataStoreFactory#createDataStore() method. gora.datastore.default=org.apache.gora.hbase.store.HBaseStore##whether to create schema automatically if not exists. gora.datastore.autocreateschema=true
1、創建用于對應輸入數據的json文件,并生成相應的類。
上個示例已經完成,見passview.json與PageView.java {"type": "record","name": "Pageview", "default":null,"namespace": "org.apache.gora.tutorial.log.generated","fields" : [{"name": "url", "type": ["null","string"], "default":null},{"name": "timestamp", "type": "long", "default":0},{"name": "ip", "type": ["null","string"], "default":null},{"name": "httpMethod", "type": ["null","string"], "default":null},{"name": "httpStatusCode", "type": "int", "default":0},{"name": "responseSize", "type": "int", "default":0},{"name": "referrer", "type": ["null","string"], "default":null},{"name": "userAgent", "type": ["null","string"], "default":null}] }
2、創建輸入數據的類與表映射文件 <?xml version="1.0" encoding="UTF-8"?><!--Gora Mapping file for HBase Backend --> <gora-otd><table name="Pageview"> <!-- optional descriptors for tables --><family name="common"/> <!-- This can also have params like compression, bloom filters --><family name="http"/><family name="misc"/></table><class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"><field name="url" family="common" qualifier="url"/><field name="timestamp" family="common" qualifier="timestamp"/><field name="ip" family="common" qualifier="ip" /><field name="httpMethod" family="http" qualifier="httpMethod"/><field name="httpStatusCode" family="http" qualifier="httpStatusCode"/><field name="responseSize" family="http" qualifier="responseSize"/><field name="referrer" family="misc" qualifier="referrer"/><field name="userAgent" family="misc" qualifier="userAgent"/></class></gora-otd>
3、創建用于對于輸出數據的json文件,并生成相應的類。 {"type": "record","name": "MetricDatum","namespace": "org.apache.gora.tutorial.log.generated","fields" : [{"name": "metricDimension", "type": "string"},{"name": "timestamp", "type": "long"},{"name": "metric", "type" : "long"}] }
liaoliuqingdeMacBook-Air:MyGoraDemo liaoliuqing$ gora goracompiler avro/metricdatum.json src/
Compiling: /Users/liaoliuqing/99_Project/git/MyGoraDemo/avro/metricdatum.json
Compiled into: /Users/liaoliuqing/99_Project/git/MyGoraDemo/src
Compiler executed SUCCESSFULL.
4、創建輸出數據的類與表映射內容,并將之加入第2步創建的文件中。
<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"><field name="metricDimension" family="common" qualifier="metricDimension"/><field name="timestamp" family="common" qualifier="ts"/><field name="metric" family="common" qualifier="metric"/></class>
5、寫主類文件
程序處理的關鍵步驟:
(1)獲取輸入、輸出DataStore
if(args.length > 0) {String dataStoreClass = args[0];inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class, Pageview.class, conf);if(args.length > 1) {dataStoreClass = args[1];}outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);} else {inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);}(2)設置job的一些基本屬性
Job job = new Job(getConf());job.setJobName("Log Analytics");log.info("Creating Hadoop Job: " + job.getJobName());job.setNumReduceTasks(numReducer);job.setJarByClass(getClass());
(3)定義job相關的Map類及mapr的輸入輸出信息。
(4)定義job相關的Reduce類及reduce的輸入輸出信息。
GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);(5)定義map類 public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,LongWritable> {private LongWritable one = new LongWritable(1L);private TextLong tuple;@Overrideprotected void setup(Context context) throws IOException ,InterruptedException {tuple = new TextLong();tuple.setKey(new Text());tuple.setValue(new LongWritable());};@Overrideprotected void map(Long key, Pageview pageview, Context context)throws IOException ,InterruptedException {CharSequence url = pageview.getUrl();long day = getDay(pageview.getTimestamp());tuple.getKey().set(url.toString());tuple.getValue().set(day);context.write(tuple, one);};/** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */private long getDay(long timeStamp) {return (timeStamp / DAY_MILIS) * DAY_MILIS; }}
(6)定義reduce類 public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,String, MetricDatum> {private MetricDatum metricDatum = new MetricDatum();@Overrideprotected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)throws IOException ,InterruptedException {long sum = 0L; //sum up the valuesfor(LongWritable value: values) {sum+= value.get();}String dimension = tuple.getKey().toString();long timestamp = tuple.getValue().get();metricDatum.setMetricDimension(new Utf8(dimension));metricDatum.setTimestamp(timestamp);String key = metricDatum.getMetricDimension().toString();key += "_" + Long.toString(timestamp);metricDatum.setMetric(sum);context.write(key, metricDatum);};}
(8)使用輸入輸出DataStore來創建一個job,并執行 Job job = createJob(inStore, outStore, 3);boolean success = job.waitForCompletion(true);
其實使用Gora與一般的MR程序的主要區別在于:
(1)繼承于GoraMapper/GoraReducer,而不是Mapper/Reducer。
(2)使用GoraMapper.initMapperJob(), GoraReducer.initReducerJob()設置輸入輸出類型,而且可以使用一個DataSource類對象表示輸入/輸出的KEY-VALUE。
如本例中的mapper,使用instroe來代替指定了輸入KV類型為Long,Pageview,本例中的reducer,使用outstore來代替指定了輸出類型為String, MetricDatum。
對比http://blog.csdn.net/jediael_lu/article/details/43416751中所描述的運行一個job所需的基本屬性:
GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class, LogAnalyticsMapper.class, true); GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);以上語句同時完成了2、3、4、5步,即指定了2、Map/Reduce的類:LogAnalyticsMapper.class與LogAnalyticsReducer.class
指定了3、4、輸入格式及內容及5、reduce的輸出類型:即輸入輸出均為DataSource格式,內容為inStore與outStore中的內容。
指定了5、指定了map的輸出類型,這也是reduce的輸入類型。
附詳細代碼:
?(2) TextLong.java package org.apache.gora.tutorial.log;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;/*** A {@link KeyValueWritable} of {@link Text} keys and * {@link LongWritable} values. */ public class TextLong extends KeyValueWritable<Text, LongWritable> {public TextLong() {key = new Text();value = new LongWritable();}}
?(3) LogAnalytics.java package org.apache.gora.tutorial.log;import java.io.IOException;import org.apache.avro.util.Utf8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.gora.mapreduce.GoraMapper; import org.apache.gora.mapreduce.GoraReducer; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.tutorial.log.generated.MetricDatum; import org.apache.gora.tutorial.log.generated.Pageview; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** LogAnalytics is the tutorial class to illustrate Gora MapReduce API. * The analytics mapreduce job reads the web access data stored earlier by the * {@link LogManager}, and calculates the aggregate daily pageviews. The* output of the job is stored in a Gora compatible data store. * * <p>See the tutorial.html file in docs or go to the * <a href="http://incubator.apache.org/gora/docs/current/tutorial.html"> * web site</a>for more information.</p>*/ public class LogAnalytics extends Configured implements Tool {private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);/** The number of miliseconds in a day */private static final long DAY_MILIS = 1000 * 60 * 60 * 24;/*** The Mapper takes Long keys and Pageview objects, and emits * tuples of <url, day> as keys and 1 as values. Input values are * read from the input data store.* Note that all Hadoop serializable classes can be used as map output key and value.* *///6、定義map類public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong,LongWritable> {private LongWritable one = new LongWritable(1L);private TextLong tuple;@Overrideprotected void setup(Context context) throws IOException ,InterruptedException {tuple = new TextLong();tuple.setKey(new Text());tuple.setValue(new LongWritable());};@Overrideprotected void map(Long key, Pageview pageview, Context context)throws IOException ,InterruptedException {CharSequence url = pageview.getUrl();long day = getDay(pageview.getTimestamp());tuple.getKey().set(url.toString());tuple.getValue().set(day);context.write(tuple, one);};/** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */private long getDay(long timeStamp) {return (timeStamp / DAY_MILIS) * DAY_MILIS; }}/*** The Reducer receives tuples of <url, day> as keys and a list of * values corresponding to the keys, and emits a combined keys and* {@link MetricDatum} objects. The metric datum objects are stored * as job outputs in the output data store.*///7、定義reduce類public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable,String, MetricDatum> {private MetricDatum metricDatum = new MetricDatum();@Overrideprotected void reduce(TextLong tuple, Iterable<LongWritable> values, Context context)throws IOException ,InterruptedException {long sum = 0L; //sum up the valuesfor(LongWritable value: values) {sum+= value.get();}String dimension = tuple.getKey().toString();long timestamp = tuple.getValue().get();metricDatum.setMetricDimension(new Utf8(dimension));metricDatum.setTimestamp(timestamp);String key = metricDatum.getMetricDimension().toString();key += "_" + Long.toString(timestamp);metricDatum.setMetric(sum);context.write(key, metricDatum);};}/*** Creates and returns the {@link Job} for submitting to Hadoop mapreduce.* @param inStore* @param outStore* @param numReducer* @return* @throws IOException*/public Job createJob(DataStore<Long, Pageview> inStore,DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {//3、設置job的一些基本屬性Job job = new Job(getConf());job.setJobName("Log Analytics");log.info("Creating Hadoop Job: " + job.getJobName());job.setNumReduceTasks(numReducer);job.setJarByClass(getClass());/* Mappers are initialized with GoraMapper.initMapper() or * GoraInputFormat.setInput()*///4、定義job相關的Map類及mapr的輸入輸出信息。GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class,LogAnalyticsMapper.class, true);//4、定義job相關的Reduce類及reduce的輸入輸出信息。/* Reducers are initialized with GoraReducer#initReducer().* If the output is not to be persisted via Gora, any reducer * can be used instead. */GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);return job;}@Overridepublic int run(String[] args) throws Exception {DataStore<Long, Pageview> inStore;DataStore<String, MetricDatum> outStore;Configuration conf = new Configuration();//1、獲取輸入、輸出DataStore。if(args.length > 0) {String dataStoreClass = args[0];inStore = DataStoreFactory.getDataStore(dataStoreClass, Long.class, Pageview.class, conf);if(args.length > 1) {dataStoreClass = args[1];}outStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, MetricDatum.class, conf);} else {inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);}//2、使用輸入輸出DataStore來創建一個jobJob job = createJob(inStore, outStore, 3);boolean success = job.waitForCompletion(true);inStore.close();outStore.close();log.info("Log completed with " + (success ? "success" : "failure"));return success ? 0 : 1;}private static final String USAGE = "LogAnalytics <input_data_store> <output_data_store>";public static void main(String[] args) throws Exception {if(args.length < 2) {System.err.println(USAGE);System.exit(1);}//run as any other MR jobint ret = ToolRunner.run(new LogAnalytics(), args);System.exit(ret);}}
6、運行程序
(1)導出程序—>runnable jar file,并將其上傳到服務器
(2)運行程序
$ java -jar MyGoraDemo.jar org.apache.gora.hbase.store.HBaseStore org.apache.gora.hbase.store.HBaseStore
(3)查看hbase中的結果
hbase(main):001:0> list
TABLE????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
AccessLog????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Jan2814_webpage??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Jan2819_webpage??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Jan2910_webpage??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Jan2920_webpage??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Metrics??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
Passwd???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
member???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?
8 row(s) in 2.6450 seconds
hbase(main):002:0> scan 'Metrics'
總結
以上是生活随笔為你收集整理的Gora官方文档之二:Gora对Map-Reduce的支持的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Gora快速入门
- 下一篇: 运行一个Hadoop Job所需要指定的