日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

pyspark汇总小结

發(fā)布時(shí)間:2023/11/28 生活经验 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pyspark汇总小结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

20220402

Spark報(bào)Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResultSize
https://blog.csdn.net/qq_27600723/article/details/107023574
pyspark讀寫iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
這里要指定pyspark的路徑,如果是服務(wù)器的話最好用spark所在的pyspark路徑
import os
java8_location = r'D:\Java\jdk1.8.0_301/'  # 設(shè)置你自己的路徑
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 讀iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目標(biāo)地址,不同的服務(wù)器集群,要拷貝對(duì)應(yīng)的兩個(gè)hive文件到當(dāng)?shù)乜蛻舳说膒yspar conf文件夾下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec  values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夾,把iceberg下的兩個(gè)hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把這個(gè)文件放在pyspark的jars文件夾
Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json沒找到hive的兩個(gè)配置文件,需要在init里面指定pyspark的路徑即可解決
# findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
      od_all = spark.createDataFrame(od)od_all.createOrReplaceTempView('od_all')od_duplicate = spark.sql("select distinct user_id,goods_id,category_second_id from od_all;")od_duplicate.createOrReplaceTempView('od_duplicate')od_goods_group = spark.sql(" select user_id,count(goods_id) goods_nums_total from od_duplicate group by user_id ;")
sql語句中所牽扯的表,需要createOrReplaceTempView創(chuàng)建
執(zhí)行sql時(shí)出現(xiàn)錯(cuò)誤 extraneous input ';' expecting EOF near '<EOF>'
https://blog.csdn.net/xieganyu3460/article/details/83055935

https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html?highlight=type
pyspark數(shù)據(jù)類型

TypeError: field id: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>https://blog.csdn.net/weixin_40983094/article/details/115630358
# code:utf-8
from pathlib import Pathimport pandas as pd
from pyspark.ml.fpm import FPGrowth
import datetime
import platform
import os
import warnings
warnings.filterwarnings("ignore")
from utils_ import usetime,log_generate
from param_config import configlogger = log_generate(config.log["name"], config.log["date"])sys = platform.system()
if sys == "Windows":PATH = os.path.abspath(str(Path("").absolute())) + "/"
else:PATH = "/home/guanlian_algo_confirm3/"os.environ["JAVA_HOME"] = r"D:\Java\jdk1.8.0_301"t1 = datetime.datetime.now()@usetime
def calculate_fpgrowth(spark, data, total_nums):data = spark.createDataFrame(data)data.createOrReplaceTempView("all_data")part_data = spark.sql("select * from all_data ")all_record = part_data.select("goods_huizong")  # 篩選多列all_record.show(5)def transform_to_list(col):per_row = col.split("|")  # 傳入的列數(shù)據(jù),自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理return per_rowall_record = all_record.rdd.map(lambda row: (row["goods_huizong"], transform_to_list(row["goods_huizong"])))all_record = spark.createDataFrame(all_record, ["goods_huizong", "goods_huizong_list"])all_record.show(5)all_record = all_record.select("goods_huizong_list")all_record = all_record.withColumnRenamed("goods_huizong_list", "items")logger.debug()("總數(shù)據(jù)條數(shù) {}".format(total_nums))fp = FPGrowth(minSupport=0.0001, minConfidence=0.8)fpm = fp.fit(all_record)  # 模型擬合fpm.freqItemsets.show(5)  # 在控制臺(tái)顯示前五條頻繁項(xiàng)集fp_count = fpm.freqItemsets.count()if fp_count == 0:return pd.DataFrame()logger.debug()("*" * 100)logger.debug()("頻繁項(xiàng)條數(shù) {} ".format(fp_count))ass_rule = fpm.associationRules  # 強(qiáng)關(guān)聯(lián)規(guī)則ass_rule.show()rule_nums = ass_rule.count()if rule_nums == 0:return pd.DataFrame()logger.debug()("規(guī)則條數(shù) {} ".format(rule_nums))ass_rule = ass_rule.select(["antecedent", "consequent", "confidence"])ass_rule.show(5)ass_rule_df = ass_rule.toPandas()ass_rule_df["antecedent_str"] = ass_rule_df["antecedent"].apply(lambda x: str(x))ass_rule_df.sort_values(["antecedent_str", "confidence"], ascending=[True, False], inplace=True)t2 = datetime.datetime.now()logger.debug()("spent ts:", t2 - t1)return ass_rule_df簡(jiǎn)單實(shí)例

20220314

代碼設(shè)置參數(shù)比命令行傳參數(shù)的級(jí)別高,最終用的還是代碼里面設(shè)置的參數(shù)

py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'iceberg': org.apache.iceberg.spark.SparkCatalog
需要去iceberg官網(wǎng)下載一個(gè) iceberg-spark-runtime-3.2_2.12-0.13.1.jar包
放在spark的jars下面https://iceberg.apache.org/docs/latest/getting-started/

# code:utf-8
import findspark
import pandas as pd
findspark.init()
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
# from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udf
spark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')# spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù)    你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark讀取iceberg
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
from out_udf import outer_udf
#  /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
#  --master local  --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udfspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數(shù)
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數(shù)
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')
spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測(cè)試 數(shù)據(jù)    你好') ").show()spark.sql("select outer_udf('測(cè)試數(shù)據(jù)你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark對(duì)iceberg(hive)進(jìn)行操作

20220311

AttributeError: 'NoneType' object has no attribute 'sc' 解決方法!
把構(gòu)建spark對(duì)象放在循環(huán)外面或者臨時(shí)建一個(gè)sc對(duì)象?

spark的本質(zhì)就是處理數(shù)據(jù)的代碼換一種語言,另一種表達(dá)方式而已

參數(shù)調(diào)節(jié)
把executor數(shù)量調(diào)小,其他參數(shù)值調(diào)大,不容易報(bào)錯(cuò)

Spark任務(wù)報(bào)java.lang.StackOverflowError
https://blog.csdn.net/u010936936/article/details/88363449
Spark:java.io.IOException: No space left on devicehttps://blog.csdn.net/dupihua/article/details/51133551
ass_rule = ass_rule.filter('antecedent_len == 1')ass_rule = ass_rule.filter('consequent_len == 1')
dataframe過濾https://blog.csdn.net/qq_40006058/article/details/88931884
dataframe各種操作

20220310

data = spark.createDataFrame(data) # 普通dataframe轉(zhuǎn)spark dataframe
data.createOrReplaceTempView("all_data") # 轉(zhuǎn)sql表進(jìn)行操作part_data = spark.sql("select * from all_data where user_type= " + str(cus_type)) #sql操作

https://blog.csdn.net/zhurui_idea/article/details/73090951

ass_rule = ass_rule.rdd.map(lambda row:(row["antecedent"],row['consequent'], calculate_len(row['antecedent'])))# rdd執(zhí)行一下就變成了pipelinerddass_rule = spark.createDataFrame(ass_rule)再createDataFrame一下就變回dataframe

dataframe和RDD的轉(zhuǎn)換


自動(dòng)對(duì)每行數(shù)據(jù)進(jìn)行處理并保留原始其他字段

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the sc
好奇怪字符分裂為列表的時(shí)候,必須前面還有其他字段或者會(huì)報(bào)錯(cuò)
 part_data = spark.sql("select * from all_data where user_type= " + str(cus_type))part_data.show()all_record = part_data.select("user_type",'goods_huizong') # 可以選多個(gè)字段all_record = all_record.rdd.map(lambda row: (row['user_type],transform_to_list(row['goods_huizong']))) 
后面也可以選多個(gè)字段
  File "/usr/local/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport__import__(name)
ModuleNotFoundError: No module named 'utils_'與pyspark大數(shù)據(jù)相關(guān)的函數(shù)只能放在當(dāng)前模塊?通過其他模塊導(dǎo)入
會(huì)不能識(shí)別?

20211231

 Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources資源被其他人占用了

20211230

Spark 2.0.x dump a csv file from a dataframe containing one array of type string
https://stackoverflow.com/questions/40426106/spark-2-0-x-dump-a-csv-file-from-a-dataframe-containing-one-array-of-type-string

from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef array_to_string(my_list):return '[' + ','.join([str(elem) for elem in my_list]) + ']'array_to_string_udf = udf(array_to_string, StringType())df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"]))
df.drop("column_as_array").write.csv(...)
上面的方式有問題 生成的列里面的值是生成式import org.apache.spark.sql.functions._
val dumpCSV = df.withColumn("ArrayOfString", assRule["ArrayOfString"].cast("string")).write.csv(path="/home/me/saveDF")
這一種可以實(shí)現(xiàn)

https://www.jianshu.com/p/3735b5e2c540
https://www.jianshu.com/p/80964332b3c4
rdd或者sparkDataframe寫入csv普通的pandas不能寫入hdfs

import  findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from tqdm import tqdm
import platform
import os
os.environ['JAVA_HOME']=r'/usr/local/jdk1.8.0_212'
t1 = datetime.datetime.now()
appname = "FPgrowth"
#master = "local[6]"spark = SparkSession.Builder().appName(appname)\.config('spark.num-executors','50')\.config('spark.executor.memory','4g')\.config('spark.executor.cores','3')\.config('spark.driver.memory','1g')\.config('spark.default.parallelism','1000')\.config('spark.storage.memoryFraction','0.5')\.config('spark.shuffle.memoryFraction','0.3')\.config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.75")\.config("spark.speculation.multiplier",'1.5')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()
df = spark.read.format("csv"). \option("header", "true") \.load("/data/tb_order_user_sec_type_group.csv")df.createOrReplaceTempView('all_data')
sec_type=spark.sql("select sec_type from all_data ")

https://hub.mybinder.turing.ac.uk/user/apache-spark-sjqwupmp/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Quickstart: Pandas API on Spark 快速開始基于pyspark的pandas

part_data=spark.sql("select * from all_data where sec_type= "+ cus_type)
part_data.count() # 統(tǒng)計(jì)RDD中的元素個(gè)數(shù) 行數(shù)
lines.first() # 這個(gè)RDD中的第一個(gè)元素,也就是README.md的第一行

http://spark.apache.org/docs/latest/api/python/getting_started/index.html
pyspark 官方文檔 sparksql和sparkdataframe都參考官方文檔


快速轉(zhuǎn)化成pandas進(jìn)行操作

20210831

Windows10:spark報(bào)錯(cuò)。WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.

https://blog.csdn.net/weixin_43748432/article/details/107378033

java.lang.OutOfMemoryError: GC overhead limit exceeded
https://blog.csdn.net/gaokao2011/article/details/51707163調(diào)大下面的參數(shù)

Spark算子:RDD基本轉(zhuǎn)換操作(5)–mapPartitions、
http://lxw1234.com/archives/2015/07/348.htm
以分區(qū)為單位來map而不是對(duì)每個(gè)元素單獨(dú)map
提高效率

spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \ .config('spark.executor.memory','2g')\  #executor 內(nèi)存設(shè)置.config('spark.executor.cores','2')\ #單個(gè)executor的可用的cpu核心數(shù).config("spark.executor.instances",'10')\ #executor的總個(gè)數(shù).config('spark.driver.memory','1g')\ # driver 的設(shè)置 要比 executor的小?.config('spark.default.parallelism','1000')\ #任務(wù)數(shù)的設(shè)置.config('spark.sql.shuffle.partitions','300')\  #分區(qū)數(shù)的設(shè)置.config("spark.driver.extraJavaOptions","-Xss2048M")\    #jvm相關(guān)設(shè)置 .config("spark.speculation",'True')\  # 避免卡在某個(gè)stage.config("spark.speculation.interval",'100')\ # 避免卡在某個(gè)stage.config("spark.speculation.quantile","0.1")\ # 避免卡在某個(gè)stage.config("spark.speculation.multiplier",'1')\   # 避免卡在某個(gè)stage.config("spark.scheduler.mode",'FAIR')\ # 調(diào)度方式.getOrCreate()
參數(shù)設(shè)置spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \.config('spark.executor.memory','2g')\.config('spark.executor.cores','2')\.config("spark.executor.instances",'10')\.config('spark.driver.memory','3g')\
#這個(gè)參數(shù)很重要    .config('spark.default.parallelism','1000')\#這個(gè)參數(shù)很重要   .config('spark.sql.shuffle.partitions','300')\.config("spark.driver.extraJavaOptions","-Xss3072M")\#這個(gè)參數(shù)很重要    .config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.1")\.config("spark.speculation.multiplier",'1')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()總共32gb內(nèi)存 這個(gè)配置能很快的跑出結(jié)果

https://blog.csdn.net/lotusws/article/details/52423254
spark master local 參數(shù)

然后訪問瀏覽器地址:http://192.168.1.116:4040
sparkui
spark面板地址


配置參數(shù)查看


正在跑的stage
pending 還沒跑的stage
completed 完成的stage
12/69 13 一共69個(gè) stage 已經(jīng)跑了12個(gè) 13個(gè)正在跑
面板主要看stage 和 executor


時(shí)間線 從左到右


job 下面查看具體失敗原因

https://blog.csdn.net/weixin_42340179/article/details/82415085
https://blog.csdn.net/whgyxy/article/details/88779965
在某個(gè)stage卡住
spark運(yùn)行正常,某一個(gè)Stage卡住,停止不前異常分析

https://blog.csdn.net/yf_bit/article/details/93610829
重點(diǎn)
https://www.cnblogs.com/candlia/p/11920289.html
https://www.cnblogs.com/xiao02fang/p/13197877.html
影響spark性能的因素

https://www.csdn.net/tags/OtDaUgysMTk3Mi1ibG9n.html
https://www.cnblogs.com/yangsy0915/p/6060532.html
重點(diǎn)
pyspark 配置參數(shù)

https://www.javaroad.cn/questions/15705
按行循環(huán)

http://www.sofasofa.io/forum_main_post.php?postid=1005461
獲取總行數(shù)和總列數(shù)

https://blog.csdn.net/qq_40006058/article/details/88822268
PySpark學(xué)習(xí) | 常用的 68 個(gè)函數(shù) | 解釋 + python代碼

https://blog.csdn.net/qq_29153321/article/details/88648948
RDD操作

https://www.jianshu.com/p/55efdcabd163
pyspark一些簡(jiǎn)單常用的函數(shù)方法

http://sofasofa.io/forum_main_post.php?postid=1002482
dataframe更改列名

總結(jié)

以上是生活随笔為你收集整理的pyspark汇总小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。