spark读取csv转化为rdd(python+scala+java三种代码汇总)
--------------------------------------------------------------------基本信息----------------------------------------------------------
| 編程語言 | 運行方式 | sc.textFile默認路徑 |
| Python | pyspark | hdfs:// |
| Scala | spark-shell | hdfs:// |
| Java | Intellij | file:// |
這里的默認路徑的意思是說:
如果你只寫了sc.textFile("/xxx")
那么就會默認是本地路徑下面的根目錄或者是hdfs下面的根目錄。
所以最好是都統一寫成sc.textFile("/hdfs://xxx")
------------------------------------------------------------------------------------Pyspark[1]--------------------------------------------------------------
>>> import pandas as pd
>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> lines = sc.textFile("/rdd1.csv")
>>> header = lines.first()
>>> lines = lines.filter(lambda row:row != header) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
>>> lines
PythonRDD[13] at RDD at PythonRDD.scala:53
>>> lines.collect()
['002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '002,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello', '003,hello']
>>> exit()
這里注意哈,這里的路徑僅僅適用于local模式,如果是集群模式,必須是傳遞到hdfs上去。[2]
------------------------------------------------------Scala----------------------------------------------------------------------------------------------
scala> val lines = sc.textFile("/rdd1.csv")
lines: org.apache.spark.rdd.RDD[String] = /rdd1.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> var header = lines.first()
header: String = 001,hello ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
scala> val lines2=lines.filter(_!=header)
lines2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
scala> lines2.collect()
res0: Array[String] = Array(002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 002,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,hello, 003,he...
scala> :q
------------------------------------------------------Java----------------------------------------------------------------------------------------
src/main/java/javaread.java如下:
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2; import java.util.*; import org.apache.spark.SparkConf;import javax.swing.*; import java.util.Iterator; import java.util.Random;public class javaread {public static void main(String[] args){SparkConf conf = new SparkConf().setMaster("spark://Desktop:7077").setJars(new String[]{"/home/appleyuchi/桌面/spark_success/Spark數據傾斜處理/Java/sampling_salting/target/sampling-salting-1.0-SNAPSHOT.jar"}).setAppName("TestSpark");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("WARN");final JavaRDD<String>lines = sc.textFile("hdfs://Desktop:9000/rdd1.csv");final String header= lines.first();JavaRDD<String> lines2 = (JavaRDD<String>) lines.filter(new Function<String, Boolean>(){private static final long serialVersionUID = 1L;@Overridepublic Boolean call(String v1) throws Exception{return !v1.equals(header);}});// System.out.println(lines2.foreach(print));System.out.println(lines2.collect());}}pom.xml如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>sampling-salting</groupId><artifactId>sampling-salting</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>?
上述java代碼是連接的真實集群,不是local模式,所以運行步驟和local模式不太一樣:
①mvn package
②Ctrl+Shift+F10
--------------------------------------------------------------------------------------------------------------------------------------------------------
Reference:
[1]pyspark學習系列(二)讀取CSV文件 為RDD或者DataFrame進行數據處理
[2]spark集群模式下textFile讀取file本地文件報錯解決
總結
以上是生活随笔為你收集整理的spark读取csv转化为rdd(python+scala+java三种代码汇总)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: idea提示 cannot resolv
- 下一篇: websocket python爬虫_p