Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
?一:準備數(shù)據(jù)源
??
? ? ? 在項目下新建一個student.txt文件,里面的內容為:
? ? ? ?
[plain] view plain copy
print?
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18
? ? ? 二:實現(xiàn)
?
? ? ?Java版:
? ? 1.首先新建一個student的Bean對象,實現(xiàn)序列化和toString()方法,具體代碼如下:
? ??
?package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
?? ?String sid;
?? ?String sname;
?? ?int sage;
?? ?public String getSid() {
?? ??? ?return sid;
?? ?}
?? ?public void setSid(String sid) {
?? ??? ?this.sid = sid;
?? ?}
?? ?public String getSname() {
?? ??? ?return sname;
?? ?}
?? ?public void setSname(String sname) {
?? ??? ?this.sname = sname;
?? ?}
?? ?public int getSage() {
?? ??? ?return sage;
?? ?}
?? ?public void setSage(int sage) {
?? ??? ?this.sage = sage;
?? ?}
?? ?@Override
?? ?public String toString() {
?? ??? ?return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
?? ?}
?? ?
}
? ? ? ? ?2.轉換,具體代碼如下
?
?
?package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
?? ?public static void main(String[] args) {
?? ??? ?
?? ??? ?SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
?? ??? ?SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
?? ??? ?reflectTransform(spark);//Java反射
?? ??? ?dynamicTransform(spark);//動態(tài)轉換
?? ?}
?? ?
?? ?/**
?? ? * 通過Java反射轉換
?? ? * @param spark
?? ? */
?? ?private static void reflectTransform(SparkSession spark)
?? ?{
?? ??? ?JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
?? ??? ?
?? ??? ?JavaRDD<Student> rowRDD = source.map(line -> {
?? ??? ??? ?String parts[] = line.split(",");
?? ??? ??? ?Student stu = new Student();
?? ??? ??? ?stu.setSid(parts[0]);
?? ??? ??? ?stu.setSname(parts[1]);
?? ??? ??? ?stu.setSage(Integer.valueOf(parts[2]));
?? ??? ??? ?return stu;
?? ??? ?});
?? ??? ?
?? ??? ?Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
?? ??? ?df.select("sid", "sname", "sage").
?? ??? ?coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
?? ?}
?? ?/**
?? ? * 動態(tài)轉換
?? ? * @param spark
?? ? */
?? ?private static void dynamicTransform(SparkSession spark)
?? ?{
?? ??? ?JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
?? ??? ?
?? ??? ?JavaRDD<Row> rowRDD = source.map( line -> {
?? ??? ??? ?String[] parts = line.split(",");
?? ??? ??? ?String sid = parts[0];
?? ??? ??? ?String sname = parts[1];
?? ??? ??? ?int sage = Integer.parseInt(parts[2]);
?? ??? ??? ?
?? ??? ??? ?return RowFactory.create(
?? ??? ??? ??? ??? ?sid,
?? ??? ??? ??? ??? ?sname,
?? ??? ??? ??? ??? ?sage
?? ??? ??? ??? ??? ?);
?? ??? ?});
?? ??? ?
?? ??? ?ArrayList<StructField> fields = new ArrayList<StructField>();
?? ??? ?StructField field = null;
?? ??? ?field = DataTypes.createStructField("sid", DataTypes.StringType, true);
?? ??? ?fields.add(field);
?? ??? ?field = DataTypes.createStructField("sname", DataTypes.StringType, true);
?? ??? ?fields.add(field);
?? ??? ?field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
?? ??? ?fields.add(field);
?? ??? ?
?? ??? ?StructType schema = DataTypes.createStructType(fields);
?? ??? ?
?? ??? ?Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
?? ??? ?df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
?? ??? ?
?? ??? ?
?? ?}
?? ?
}
? ? ?scala版本:
?
? ??
?import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
* 通過Java反射轉換
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()為隱式轉換
val stuDf=stuRDD.map(_.split(",")).map(parts?Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //將查詢結果寫入一個文件
nameDf.show()
}
/**
* 動態(tài)轉換
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=spark.sparkContext.textFile("student.txt")
import spark.implicits._
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts?Row(parts(0),parts(1),parts(2)))
val stuDf=spark.createDataFrame(rowRDD, schema)
stuDf.printSchema()
val tmpView=stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //將查詢結果寫入一個文件
nameDf.show()
}
}
? ? ?注:1.上面代碼全都已經測試通過,測試的環(huán)境為spark2.1.0,jdk1.8。
?
? ? ? ? ? ? ?2.此代碼不適用于spark2.0以前的版本。
--------------------- 本文來自 黑白調92 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/u010592112/article/details/73730796?utm_source=copy
總結
以上是生活随笔為你收集整理的Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: maven环境下使用java、scala
- 下一篇: Java架构经验总结