日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响

發布時間:2023/12/15 python 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在實踐中,性能影響幾乎與您省略了partitionBy子句相同.所有記錄將被洗牌到一個分區,在本地排序并逐個順序迭代.

差異僅在于總共創建的分區數.讓我們舉例說明使用包含10個分區和1000個記錄的簡單數據集的示例:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

如果您定義沒有partition by子句的框架

w_unpart = Window.orderBy(f.col("index").asc())

并使用滯后

df_lag_unpart = df.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")

)

總共只有一個分區:

df_lag_unpart.rdd.glom().map(len).collect()

[1000]

與具有虛擬索引的幀定義相比(與您的代碼相比簡化了一點:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

將使用等于spark.sql.shuffle.partitions的分區數:

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")

)

df_lag_part.rdd.glom().count()

11

只有一個非空分區:

df_lag_part.rdd.glom().filter(lambda x: x).count()

1

遺憾的是,沒有通用的解決方案可以用來解決PySpark中的這個問題.這只是實現的固有機制與分布式處理模型相結合.

由于索引列是順序的,因此您可以生成每個塊具有固定數量記錄的人工分區鍵:

rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(

"block", (f.col("index") / rec_per_block).cast("int")

)

并用它來定義框架規范:

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")

)

這將使用預期的分區數:

df_lag_with_block.rdd.glom().count()

11

大致統一的數據分布(我們無法避免哈希沖突):

df_lag_with_block.rdd.glom().map(len).collect()

[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

但是在塊邊界上有許多空白:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()

12

由于邊界易于計算:

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(

# Here we depend on sequential identifiers

# This could be generalized to any monotonically increasing

# id by taking min and max per block

(idx - 1, idx) for idx in

df_lag_with_block.groupBy("block").min("index")

.drop("block").rdd.flatMap(lambda x: x)

.collect()))[2:] # The first boundary doesn't carry useful inf.

你總是可以選擇:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

并分別填寫:

# We use window without partitions here. Since number of records

# will be small this won't be a performance issue

# but will generate "Moving all data to a single partition" warning

missing_with_lag = missing.withColumn(

"diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")

).select("index", f.col("diffs_col1").alias("diffs_fill"))

并加入:

combined = (df_lag_with_block

.join(missing_with_lag, ["index"], "leftouter")

.withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

獲得理想的結果:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(

combined["diffs_col1"] != df_lag_unpart["diffs_col1"]

)

assert mismatched.count() == 0

總結

以上是生活随笔為你收集整理的python partition by函数_python – 避免Spark窗口函数中单个分区模式的性能影响的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。