[Spark]PySpark入门学习教程---例子RDD与DataFrame
生活随笔
收集整理的這篇文章主要介紹了
[Spark]PySpark入门学习教程---例子RDD与DataFrame
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一 例子說明
用spark的RDD與DataFrame兩種方式實現如下功能
1.合并主特征與單特征
2.對標簽進行過濾
3.標簽與特征進行合并
4.輸出指定格式最后的數據
二 數據說明
包括三個文件:
| 標簽文件 | driver.txt | 1001|1|1|10 1002|1|0|5 1003|1|0|10 1004|1|0|10 |
| 主特征文件 | inst.txt | 1001|0:1 1:1 2:1 3:1 1002|0:1 1:1 2:2 1003|0:1 1:1 2:3 |
| 單特征文件 | feature.txt | 1001|10 |
三 使用RDD方式進行操作
1.
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSessionimport sys import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate() sc = spark.sparkContext2
org_inst_file = "./inst.txt" label_input_file = "./driver.txt" subscore_file = "./feature.txt"3
def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str) org_inst = sc.textFile(org_inst_file).map(read_inst) # (id, inst_str) print(org_inst.collect())4
def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line)) # (cmid, suffix_str) print(label.collect())?
5
def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore) # (id, subscore) print(subscore.collect())6
subscore_index = "4" def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)]) new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore) # print(new_inst.collect())?
?7
def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label) print(inst_with_label.collect())?
8
inst_with_label.saveAsTextFile("./output_rdd")四 使用DataFrame方式進行操作
?1.
#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSessionimport sys import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()2
org_inst_file = "./inst.txt" label_input_file = "./driver.txt" subscore_file = "./feature.txt"3
df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature') df_inst.show() df_inst.printSchema()4
df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature') df_subscore.show() df_subscore.printSchema()5
df_merge_feature = df_inst.join(df_subscore, on="id", how="left") df_merge_feature.show()?
6
df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2") df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5)) df_label.show()?7
df_merge = df_merge_feature.join(df_label, on="id", how="inner") df_merge.show()8
from pyspark.sql.types import * from pyspark.sql.functions import udfsubscore_index = "4" def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType()) df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature')) df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]] df_merge2.show()?
?9
# 寫到csv file="./output_dataframe" df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')?
?
?
總結
以上是生活随笔為你收集整理的[Spark]PySpark入门学习教程---例子RDD与DataFrame的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JS中的slice()和splice()
- 下一篇: 如何在wine下为Source Insi