日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

php 聚合和组合,reduce端连接-分区分组聚合(示例代码)

發布時間:2025/3/17 php 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 php 聚合和组合,reduce端连接-分区分组聚合(示例代码) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.1.1???????? reduce端連接-分區分組聚合

reduce端連接則是利用了reduce的分區功能將stationid相同的分到同一個分區,在利用reduce的分組聚合功能,將同一個stationid的氣象站數據和溫度記錄數據分為一組,reduce函數讀取分組后的第一個記錄(就是氣象站的名稱)與其他記錄組合后輸出,實現連接。例如連接下面氣象站數據集和溫度記錄數據集。先用幾條數據做分析說明,實際肯定不只這點數據。

氣象站數據集,氣象站id和名稱數據表

StationId StationName

1~hangzhou

2~shanghai

3~beijing

溫度記錄數據集

StationId? TimeStamp Temperature

3~20200216~6

3~20200215~2

3~20200217~8

1~20200211~9

1~20200210~8

2~20200214~3

2~20200215~4

目標:是將上面兩個數據集進行連接,將氣象站名稱按照氣象站id加入氣象站溫度記錄中最輸出結果:

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

詳細步驟如下

(1)兩個maper讀取兩個數據集的數據輸出到同一個文件

因為是不同的數據格式,所以需要創建兩個不同maper分別讀取,輸出到同一個文件中,所以要用MultipleInputs設置兩個文件路徑,設置兩個mapper。

(2)創建一個組合鍵用于map輸出結果排序。

組合鍵使得map輸出按照stationid升序排列,stationid相同的按照第二字段升序排列。mark只有兩個值,氣象站中讀取的數據,mark為0,溫度記錄數據集中讀取的數據mark為1。這樣就能保證stationid相同的記錄中第一條就是氣象站名稱,其余的是溫度記錄數據。組合鍵TextPair定義如下

package Temperature;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class TextPair implements WritableComparable {

private Text first;

private Text second;

public TextPair(Text first, Text second) {

this.first = first;

this.second = second;

}

public int compareTo(TextPair o) {

int cmp=first.compareTo(o.getFirst());

if (cmp!=0)//第一字段不同按第一字段升序排列

{

return cmp;

}

///第一字段相同,按照第二字段升序排列

return second.compareTo(o.getSecond());

}

public void write(DataOutput dataOutput) throws IOException {

first.write(dataOutput);

second.write(dataOutput);

}

public void readFields(DataInput dataInput) throws IOException {

first.readFields(dataInput);

second.readFields(dataInput);

}

public Text getFirst() {

return first;

}

public void setFirst(Text first) {

this.first = first;

}

public Text getSecond() {

return second;

}

public void setSecond(Text second) {

this.second = second;

}

}

定義maper輸出的結果如下,前面是組合鍵,后面是值。

<1,0>??? hangzhou

<1,1>??? 20200211~9

<1,1>??? 20200210~8

<2,0>??? shanghai

<2,1>??? 20200214~3

<2,1>??? 20200215~4

<3,0>??? beijing

<3,1>??? 20200216~6

<3,1>??? 20200215~2

<3,1>??? 20200217~8

(3)map結果傳入reduce按stationid分區再分組聚合

map輸出結果會按照組合鍵第一個字段stationid升序排列,相同stationid的記錄按照第二個字段升序排列,氣象站數據和記錄數據混合再一起,shulfe過程中,map將數據傳給reduce,會經過partition分區,相同stationid的數據會被分到同一個reduce,一個reduce中stationid相同的數據會被分為一組。假設采用兩個reduce任務,分區按照stationid%2,則分區后的結果為

分區1

<1,0>??? hangzhou

<1,1>??? 20200211~9

<1,1>??? 20200210~8

<3,0>??? beijing

<3,1>??? 20200216~6

<3,1>??? 20200215~2

<3,1>??? 20200217~8

分區2

<2,0>??? shanghai

<2,1>??? 20200214~3

<2,1>??? 20200215~4

(4)分區之后再將每個分區的數據按照stationid分組聚合

分區1

分組1

<1,0>???

分組2

<3,0>???

分區2

<2,0>

(5)將分組聚合后的數據傳入reduce函數,將車站加入到后面的溫度記錄輸出。

因為數據是經過mark升序排列的,所以每組中第一個數據就是氣象站的名稱數據,剩下的是改氣象的溫度記錄數據,mark字段的作用就是為了保證氣象站數據在第一條。所以讀取每組中第一個value,既是氣象站名稱。與其他value組合輸出,即實現了數據集的連接。

1~hangzhou ~20200211~9

1~hangzhou ~20200210~8

2~shanghai ~20200214~3

2~shanghai ~20200215~4

3~beijing ~20200216~6

3~beijing ~20200215~2

3~beijing ~20200217~8

(6)詳細的代碼實例

package Temperature;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

import java.util.Iterator;

public class JoinRecordWithStationId extends Configured implements Tool {

//氣象站名稱數據集map處理類

public static class StationMapper extends Mapper{

protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

//1~hangzhou

String[] values=value.toString().split("~");

if (values.length!=2)

{

return;

}

//組合鍵第一字段為stationid,第二字段為默認0,表示車站名字數據

context.write(new TextPair(new Text(values[0]),new Text("0")),new Text(values[1]));

}

}

//溫度記錄數據集處理mapper類

public static class TemperatureRecordMapper extends Mapper{

protected void map(TextPair key, Text value, Context context) throws IOException, InterruptedException {

String[] values=value.toString().split("~");

if (values.length!=3)

{

return;

}

//組合鍵第一字段為stationid,第二字段為默認1,表示溫度記錄數據

//3~20200216~6

String outputValue=values[1]+"~"+values[2];

context.write(new TextPair(new Text(values[0]),new Text("1")),new Text(outputValue));

}

}

//按照statitionid分區的partioner類

public static class FirstPartitioner extends Partitioner{

public int getPartition(TextPair textPair, Text text, int i) {

//按照第一字段stationid取余reduce任務數,得到分區id

return Integer.parseInt(textPair.getFirst().toString())%i;

}

}

//分組比較類

public static class GroupingComparator extends WritableComparator

{

public int compare(WritableComparable a, WritableComparable b) {

TextPair pairA=(TextPair)a;

TextPair pairB=(TextPair)b;

//stationid相同,返回值為0的分為一組

return pairA.getFirst().compareTo(pairB.getFirst());

}

}

//reudce將按鍵分組的后數據,去values中第一個數據(氣象站名稱),聚合values后面的溫度記錄輸出到文件

public static class JoinReducer extends Reducer

{

@Override

protected void reduce(TextPair key, Iterable values, Context context) throws IOException, InterruptedException {

Iterator it =values.iterator();

String stationName=it.next().toString();

while (it.hasNext())

{

String outputValue="~"+stationName+"~"+it.toString();

context.write(key.getFirst(),new Text(outputValue));

}

}

}

public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

if (args.length!=3)

{

return -1;

}

Job job=new Job(getConf(),"joinStationTemperatueRecord");

if (job==null)

{

return -1;

}

job.setJarByClass(this.getClass());

//設置兩個輸入路徑,一個輸出路徑

Path StationPath=new Path(args[0]);

Path TemperatureRecordPath= new Path(args[1]);

Path outputPath=new Path(args[2]);

MultipleInputs.addInputPath(job,StationPath, TextInputFormat.class,StationMapper.class);

MultipleInputs.addInputPath(job,TemperatureRecordPath,TextInputFormat.class,TemperatureRecordMapper.class);

FileOutputFormat.setOutputPath(job,outputPath);

//設置分區類、分組類、reduce類

job.setPartitionerClass(FirstPartitioner.class);

job.setGroupingComparatorClass(GroupingComparator.class);

job.setReducerClass(JoinReducer.class);

//設置輸出類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(TextPair.class);

job.setMapOutputValueClass(Text.class);

return job.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception

{

//三個參數,參數1:氣象站數據集路徑,參數2:溫度記錄數據集路徑,參數3:輸出路徑

int exitCode= ToolRunner.run(new JoinRecordWithStationId(),args);

System.exit(exitCode);

}

}

執行任務命令

% hadoop jar temperature-example.jar JoinRecordWithStationId input/station/all input/ncdc/all output

自己開發了一個股票智能分析軟件,功能很強大,需要的點擊下面的鏈接獲取:

總結

以上是生活随笔為你收集整理的php 聚合和组合,reduce端连接-分区分组聚合(示例代码)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。