Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row={})为并行?
代碼場景:
1)設(shè)定的幾種數(shù)據(jù)場景,遍歷所有場景:依次統(tǒng)計(jì)滿足每種場景條件下的數(shù)據(jù),并把統(tǒng)計(jì)結(jié)果存入hive;
2)已有代碼如下:
case class IndoorOTTCalibrateBuildingVecotrLegend(oid: Int, minHeight: Int, maxHeight: Int, minGridIDCount: Int, maxGridIDCount: Int, heightType: Int) extends Serializable // 實(shí)例化建筑物區(qū)間段:按照柵格的個(gè)數(shù)(面積)、樓的高度(商場等場景)來劃分場景val buildingHeightLegends = List(IndoorOTTCalibrateBuildingVecotrLegend(1, 1, 30, 1, 21, BuildingCalibrateHeightType.HeightType1.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(2, 1, 30, 21, 45, BuildingCalibrateHeightType.HeightType2.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(3, 1, 30, 45, 100, BuildingCalibrateHeightType.HeightType3.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(4, 30, 50, 1, 21, BuildingCalibrateHeightType.HeightType4.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(5, 30, 50, 21, 45, BuildingCalibrateHeightType.HeightType5.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(6, 30, 50, 45, 100, BuildingCalibrateHeightType.HeightType6.toString.toInt),IndoorOTTCalibrateBuildingVecotrLegend(7, 50, 5000, 1, 100, BuildingCalibrateHeightType.HeightType7.toString.toInt))spark.sparkContext.parallelize(buildingHeightLegends).collect().foreach(buildingHeightLegend => {generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)})備注:
在generateSampleBySenceType()函數(shù)內(nèi)部包含有:
spark.sql(s"""|xxx |where t10.heihgt>=${buildingHieghtLegend.MinHeight} and t10.height<${buildingHieghtLegend.MaxHeight} |and t10.gridcount<=${buildingHieghtLegend.MinGridIDCount} and t10.gridcount>${buildingHieghtLegend.MaxGridIDCount}
|""".stripMargin)
如果把代碼修改:
val buildingHeightLegends_df = spark.sqlContext.createDataFrame(buildingHeightLegends)buildingHeightLegends_df.createOrReplaceTempView("temp_buildingheightlegends")sql(s"""|select * from temp_buildingheightlegends""".stripMargin).repartition(buildingHeightLegends.length).foreachPartition(rows => {for (row <- rows) {val buildingHeightLegend = new IndoorOTTCalibrateBuildingVecotrLegend(row.getAs[Int]("oid"),row.getAs[Int]("minheight"),row.getAs[Int]("maxheight"),row.getAs[Int]("mingrididcount"),row.getAs[Int]("maxgrididcount"),row.getAs[Int]("heighttype"))generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)}})則會(huì)提示:generateSampleBySenceType()內(nèi)部sql代碼位置拋出SparkSession為NULL的異常。
修改方案:
把buildingHeightLegends注冊(cè)為臨時(shí)表temp_buildingHeightLegends,去掉外層的foreach,之后在generateSampleBySenceType()內(nèi)部把temp_buildingHeightLegends與其他結(jié)果集合進(jìn)行cross join:
測試代碼如下:
-- 場景表 CREATE TABLE [dbo].[test_senceitems]([sencetype] [int] NULL,[minheight] [int] NULL,[maxheight] [int] NULL,[mingridcount] [int] NULL,[maxgridcount] [int] NULL ) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (1, 1, 30, 1, 21) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (2, 1, 30, 21, 45) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (3, 1, 30, 45, 100) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (4, 30, 50, 1, 21) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (5, 30, 50, 21, 45) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (6, 30, 50, 45, 100) INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (7, 50, 5000, 1, 100)-- 業(yè)務(wù)過濾統(tǒng)計(jì)表 CREATE TABLE [dbo].[test_grid]([gridid] [nvarchar](50) NULL,[height] [int] NULL,[gridcount] [int] NULL ) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g1', 8, 23) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g2', 3, 87) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g3', 4, 34) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g4', 30, 54) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g5', 32, 32) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g6', 32, 20) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g7', 120, 34) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g8', 89, 54) INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g9', 9, 16)替換generateSampleBySenceType()內(nèi)部sql(s"""|""".stripMargin)代碼類似如下:
select t10.*,t11.* from test_grid t10 cross join test_senceitems t11 where t10.height>=t11.minheight and t10.height<t11.maxheight and t10.gridcount>=t11.mingridcount and t10.gridcount<t11.maxgridcount?
轉(zhuǎn)載于:https://www.cnblogs.com/yy3b2007com/p/8505152.html
總結(jié)
以上是生活随笔為你收集整理的Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row={})为并行?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ag-grid
- 下一篇: 父类指针访问子类成员变量