日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)

發(fā)布時間:2024/1/17 java 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?一:準備數(shù)據(jù)源

?

?

? ? ? 在項目下新建一個student.txt文件,里面的內容為:

? ? ? ?

[plain] view plain copy

print?

  • <code?class="language-java">1,zhangsan,20??
  • 2,lisi,21??
  • 3,wanger,19??
  • 4,fangliu,18</code>??
  • ?
  • 1,zhangsan,20

  • 2,lisi,21

  • 3,wanger,19

  • 4,fangliu,18

  • ? ? ? 二:實現(xiàn)

    ?

    ? ? ?Java版:

    ? ? 1.首先新建一個student的Bean對象,實現(xiàn)序列化和toString()方法,具體代碼如下:

    ? ??

    ?
  • package com.cxd.sql;

  • ?
  • import java.io.Serializable;

  • ?
  • @SuppressWarnings("serial")

  • public class Student implements Serializable {

  • ?
  • ?? ?String sid;

  • ?? ?String sname;

  • ?? ?int sage;

  • ?? ?public String getSid() {

  • ?? ??? ?return sid;

  • ?? ?}

  • ?? ?public void setSid(String sid) {

  • ?? ??? ?this.sid = sid;

  • ?? ?}

  • ?? ?public String getSname() {

  • ?? ??? ?return sname;

  • ?? ?}

  • ?? ?public void setSname(String sname) {

  • ?? ??? ?this.sname = sname;

  • ?? ?}

  • ?? ?public int getSage() {

  • ?? ??? ?return sage;

  • ?? ?}

  • ?? ?public void setSage(int sage) {

  • ?? ??? ?this.sage = sage;

  • ?? ?}

  • ?? ?@Override

  • ?? ?public String toString() {

  • ?? ??? ?return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";

  • ?? ?}

  • ?? ?

  • }

  • ?
  • ?
  • ?
  • ? ? ? ? ?2.轉換,具體代碼如下

    ?

    ?

    ?
  • package com.cxd.sql;

  • ?
  • import java.util.ArrayList;

  • ?
  • import org.apache.spark.SparkConf;

  • import org.apache.spark.api.java.JavaRDD;

  • import org.apache.spark.sql.Dataset;

  • import org.apache.spark.sql.Row;

  • import org.apache.spark.sql.RowFactory;

  • import org.apache.spark.sql.SaveMode;

  • import org.apache.spark.sql.SparkSession;

  • import org.apache.spark.sql.types.DataTypes;

  • import org.apache.spark.sql.types.StructField;

  • import org.apache.spark.sql.types.StructType;

  • ?
  • public class TxtToParquetDemo {

  • ?
  • ?? ?public static void main(String[] args) {

  • ?? ??? ?

  • ?? ??? ?SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");

  • ?? ??? ?SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

  • ?
  • ?? ??? ?reflectTransform(spark);//Java反射

  • ?? ??? ?dynamicTransform(spark);//動態(tài)轉換

  • ?? ?}

  • ?? ?

  • ?? ?/**

  • ?? ? * 通過Java反射轉換

  • ?? ? * @param spark

  • ?? ? */

  • ?? ?private static void reflectTransform(SparkSession spark)

  • ?? ?{

  • ?? ??? ?JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  • ?? ??? ?

  • ?? ??? ?JavaRDD<Student> rowRDD = source.map(line -> {

  • ?? ??? ??? ?String parts[] = line.split(",");

  • ?
  • ?? ??? ??? ?Student stu = new Student();

  • ?? ??? ??? ?stu.setSid(parts[0]);

  • ?? ??? ??? ?stu.setSname(parts[1]);

  • ?? ??? ??? ?stu.setSage(Integer.valueOf(parts[2]));

  • ?? ??? ??? ?return stu;

  • ?? ??? ?});

  • ?? ??? ?

  • ?? ??? ?Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);

  • ?? ??? ?df.select("sid", "sname", "sage").

  • ?? ??? ?coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

  • ?? ?}

  • ?? ?/**

  • ?? ? * 動態(tài)轉換

  • ?? ? * @param spark

  • ?? ? */

  • ?? ?private static void dynamicTransform(SparkSession spark)

  • ?? ?{

  • ?? ??? ?JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  • ?? ??? ?

  • ?? ??? ?JavaRDD<Row> rowRDD = source.map( line -> {

  • ?? ??? ??? ?String[] parts = line.split(",");

  • ?? ??? ??? ?String sid = parts[0];

  • ?? ??? ??? ?String sname = parts[1];

  • ?? ??? ??? ?int sage = Integer.parseInt(parts[2]);

  • ?? ??? ??? ?

  • ?? ??? ??? ?return RowFactory.create(

  • ?? ??? ??? ??? ??? ?sid,

  • ?? ??? ??? ??? ??? ?sname,

  • ?? ??? ??? ??? ??? ?sage

  • ?? ??? ??? ??? ??? ?);

  • ?? ??? ?});

  • ?? ??? ?

  • ?? ??? ?ArrayList<StructField> fields = new ArrayList<StructField>();

  • ?? ??? ?StructField field = null;

  • ?? ??? ?field = DataTypes.createStructField("sid", DataTypes.StringType, true);

  • ?? ??? ?fields.add(field);

  • ?? ??? ?field = DataTypes.createStructField("sname", DataTypes.StringType, true);

  • ?? ??? ?fields.add(field);

  • ?? ??? ?field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);

  • ?? ??? ?fields.add(field);

  • ?? ??? ?

  • ?? ??? ?StructType schema = DataTypes.createStructType(fields);

  • ?? ??? ?

  • ?? ??? ?Dataset<Row> df = spark.createDataFrame(rowRDD, schema);

  • ?? ??? ?df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

  • ?? ??? ?

  • ?? ??? ?

  • ?? ?}

  • ?? ?

  • }


  • ? ? ?scala版本:

    ?

    ? ??

    ?
  • import org.apache.spark.sql.SparkSession

  • import org.apache.spark.sql.types.StringType

  • import org.apache.spark.sql.types.StructField

  • import org.apache.spark.sql.types.StructType

  • import org.apache.spark.sql.Row

  • import org.apache.spark.sql.types.IntegerType

  • ?
  • object RDD2Dataset {

  • ?
  • case class Student(id:Int,name:String,age:Int)

  • def main(args:Array[String])

  • {

  • ?
  • val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()

  • import spark.implicits._

  • reflectCreate(spark)

  • dynamicCreate(spark)

  • }

  • ?
  • /**

  • * 通過Java反射轉換

  • * @param spark

  • */

  • private def reflectCreate(spark:SparkSession):Unit={

  • import spark.implicits._

  • val stuRDD=spark.sparkContext.textFile("student2.txt")

  • //toDF()為隱式轉換

  • val stuDf=stuRDD.map(_.split(",")).map(parts?Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()

  • //stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名

  • stuDf.printSchema()

  • stuDf.createOrReplaceTempView("student")

  • val nameDf=spark.sql("select name from student where age<20")

  • //nameDf.write.text("result") //將查詢結果寫入一個文件

  • nameDf.show()

  • }

  • ?
  • /**

  • * 動態(tài)轉換

  • * @param spark

  • */

  • private def dynamicCreate(spark:SparkSession):Unit={

  • val stuRDD=spark.sparkContext.textFile("student.txt")

  • import spark.implicits._

  • val schemaString="id,name,age"

  • val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))

  • val schema=StructType(fields)

  • val rowRDD=stuRDD.map(_.split(",")).map(parts?Row(parts(0),parts(1),parts(2)))

  • val stuDf=spark.createDataFrame(rowRDD, schema)

  • stuDf.printSchema()

  • val tmpView=stuDf.createOrReplaceTempView("student")

  • val nameDf=spark.sql("select name from student where age<20")

  • //nameDf.write.text("result") //將查詢結果寫入一個文件

  • nameDf.show()

  • }

  • }

  • ? ? ?注:1.上面代碼全都已經測試通過,測試的環(huán)境為spark2.1.0,jdk1.8。

    ?

    ? ? ? ? ? ? ?2.此代碼不適用于spark2.0以前的版本。

    --------------------- 本文來自 黑白調92 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/u010592112/article/details/73730796?utm_source=copy

    總結

    以上是生活随笔為你收集整理的Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。