日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java生成数据插入hbase_hbase实战之javaAPI插入数据

發(fā)布時(shí)間:2024/1/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java生成数据插入hbase_hbase实战之javaAPI插入数据 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一,實(shí)現(xiàn)思路

1,先mapreduces得到并傳遞數(shù)據(jù)。

2,寫(xiě)好連接表,創(chuàng)建表,插入表hbase數(shù)據(jù)庫(kù)的工具。

3,在reduces中調(diào)用寫(xiě)好的hbase工具。

4,main類提交。

二,代碼書(shū)寫(xiě)

1,mapper

package com;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

//傳遞數(shù)據(jù)

public class mapper extends Mapper{

@Override

protected void map(LongWritable key, Text value, Mapper.Context context)

throws IOException, InterruptedException {

String data = value.toString();

String[] s = data.split(",");

System.out.println(data);

context.write(new Text("1"), new User(s[0],s[1],s[2],s[3],s[4]));

}

}

2,hbase工具類

package com;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Admin;

import org.apache.hadoop.hbase.client.Connection;

import org.apache.hadoop.hbase.client.ConnectionFactory;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Table;

public class HbaseUtils {

public static final String c="info";

//reducer調(diào)用的方法

public static void insertinfo(String ip,String port,String tableName,List list) throws Exception{

Connection con=getConnection(ip,port);

HBaseAdmin admin = (HBaseAdmin)con.getAdmin();

Table table = con.getTable(TableName.valueOf(tableName));

boolean b = admin.tableExists(TableName.valueOf(tableName));

if(!b){

createTable(admin,tableName);

}

insertList(table,list);

}

//插入數(shù)據(jù)的方法

private static void insertList(Table table, List list) throws Exception {

for (User user : list) {

Put put = new Put(user.getId().getBytes());

put.addColumn(c.getBytes(), "name".getBytes(), user.getName().getBytes());

put.addColumn(c.getBytes(), "Age".getBytes(), user.getAge().getBytes());

put.addColumn(c.getBytes(), "Sex".getBytes(), user.getSex().getBytes());

put.addColumn(c.getBytes(), "Part".getBytes(), user.getPart().getBytes());

table.put(put);

}

}

//創(chuàng)建表的方法

private static void createTable(Admin admin, String tableName) throws Exception {

HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));

HColumnDescriptor descriptor2 = new HColumnDescriptor(c);

descriptor.addFamily(descriptor2);

admin.createTable(descriptor);

}

//獲得與hbase的連接

private static Connection getConnection(String ip, String port) throws Exception {

Configuration configuration = HBaseConfiguration.create();

configuration.set("hbase.zookeeper.quorum", ip);

configuration.set("hbase.zookeeper.property.clientPort", port);

Connection connection = ConnectionFactory.createConnection(configuration);

return connection;

}

}

3,reducer

package com;

import java.io.IOException;

import java.lang.reflect.InvocationTargetException;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.beanutils.BeanUtils;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class reducer extends Reducer{

@Override

protected void reduce(Text keyin, Iterable value, Reducer.Context conetxt)

throws IOException, InterruptedException {

ArrayList list=new ArrayList();

//克隆迭代器中的數(shù)據(jù)

for(User user:value) {

User user1=new User();

System.out.println(user);

try {

BeanUtils.copyProperties(user1, user);

list.add(user1);

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

System.out.println("list+++++++++++++++"+list);

//調(diào)用hbase工具的方法

try {

HbaseUtils.insertinfo("192.168.184.131", "2181", "sw", list);

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

conetxt.write(new Text("status"), new Text(":success"));

}

}

4,main

package com;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class main {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set("mapreduce.framework.name", "local");

conf.set("fs.defaultFS", "file:///");

Job wordCountJob = Job.getInstance(conf);

//重要:指定本job所在的jar包

wordCountJob.setJarByClass(main.class);

//設(shè)置wordCountJob所用的mapper邏輯類為哪個(gè)類

wordCountJob.setMapperClass(mapper.class);

//設(shè)置wordCountJob所用的reducer邏輯類為哪個(gè)類

wordCountJob.setReducerClass(reducer.class);

//設(shè)置map階段輸出的kv數(shù)據(jù)類型

wordCountJob.setMapOutputKeyClass(Text.class);

wordCountJob.setMapOutputValueClass(User.class);

//設(shè)置最終輸出的kv數(shù)據(jù)類型

wordCountJob.setOutputKeyClass(Text.class);

wordCountJob.setOutputValueClass(Text.class);

//設(shè)置要處理的文本數(shù)據(jù)所存放的路徑

FileInputFormat.setInputPaths(wordCountJob, "C:\\test\\in6\\data.txt");

FileOutputFormat.setOutputPath(wordCountJob, new Path("C:\\test\\out6"));

//提交job給hadoop集群

wordCountJob.waitForCompletion(true);

}

}

總結(jié)

以上是生活随笔為你收集整理的java生成数据插入hbase_hbase实战之javaAPI插入数据的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。