Spark加载外部配置文件
--files path啟動加載配置文件
在spark-streaming程序中需要配置文件中的數(shù)據(jù)來完成某項統(tǒng)計時,需要把配置文件打到工程里,maven的配置如下:
?
?<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.txt</include>
<include>*.txt</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
?
這樣在local模式下運行時沒問題的,但是要放在yarn集群上就會出問題,需要用如下方式來調(diào)用:
?
?spark-submit --class com.kingsoft.server.KssNodeStreaming
--master yarn-cluster
--driver-memory 2G
--executor-memory 5G
--num-executors 10
--jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar
--files /home/hadoop/idc_ip.txt,/home/hadoop/ipdata.txt
/home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar
0.0.0.0
58006
?
Spark中addFile加載配置文件
????? 我們在使用Spark的時候有時候需要將一些數(shù)據(jù)分發(fā)到計算節(jié)點中。一種方法是將這些文件上傳到HDFS上,然后計算節(jié)點從HDFS上獲取這些數(shù)據(jù)。當然我們也可以使用addFile函數(shù)來分發(fā)這些文件。注意,如果是spark程序通過yarn集群上加載配置文件,path必須是集群hdfs的絕對路徑,如:viewfs://58-cluster//home/hdp_lbg_supin/resultdata/zhaopin/recommend/config/redis.properties。
addFile
??addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夾(如果是文件夾,必須是HDFS路徑),然后Spark的Driver和Exector可以通過SparkFiles.get()方法來獲取文件的絕對路徑(Get the absolute path of a file added through SparkContext.addFile()),addFile的函數(shù)原型如下:
?
?def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
?
????? addFile把添加的本地文件傳送給所有的Worker,這樣能夠保證在每個Worker上正確訪問到文件。另外,Worker會把文件放在臨時目錄下。因此,比較適合用于文件比較小,計算比較復雜的場景。如果文件比較大,網(wǎng)絡傳送的消耗時間也會增長。
????? path:可以是local、hdfs(任何hadoop支持的文件系統(tǒng))、HTTP、HTTPS、FTP等。local方式時,在windows下使用絕對路徑時需要加個“/”,如“d:/iteblog.data”得寫成“/d:/iteblog.data”或“file:///d:/iteblog.data”。
? recursive:如果path是一個目錄,那么我們可以設置recursive為true,這樣Spark會遞歸地分發(fā)這個路徑下面的所有文件到計算節(jié)點的臨時目錄。
? 通過SparkFiles.get(path:String)獲取添加的文件路徑。
?
?var path = "/user/iteblog/ip.txt"
sc.addFile(path)
val rdd = sc.textFile(SparkFiles.get(path))
?
????? 上面的實例展示了如何在Driver中獲取分發(fā)出去的文件,我們還可以在Exector獲取到分發(fā)的文件:
?
?var path = "/user/iteblog/ip.txt"
sc.addFile(path)
val rdd = sc.parallelize((0 to 10))
rdd.foreach{ index =>
val path = SparkFiles.get(path)
......
}
????? 如果我們添加的是壓縮文件,比如.tar.gz、.tgz或者.tar,Spark會調(diào)用Linux的解壓縮命令tar去解壓縮這些文件。
?
addJar
??addJar添加在這個SparkContext實例運行的作業(yè)所依賴的jar。,其函數(shù)原型如下:
?
def addJar(path: String)????? path:可以是本地文件(local file)、HDFS文件(其他所有的Hadoop支持的文件系統(tǒng)也可以)、HTTP、 HTTPS 或者是FTP URI文件等等。
?
? 其實Spark內(nèi)部通過spark.jars參數(shù)以及spark.yarn.dist.jars函數(shù)傳進去的Jar都是通過這個函數(shù)分發(fā)到Task的。
?
將配置文件打到工程jar包里
?
????? 注意:IntelliJ IDEA創(chuàng)建Maven項目時,必須是Java項目,否則不能將配置文件打到工程jar包。
????? redis.properties配置文件示例如下:
?
?redis.host=xx.xx.xxx.x
redis.port=6380
redis.password=6f3d16c5119bb946
redis.maxActive=500
redis.maxWait=10000
redis.maxidle=500
redis.minidle=10
redis.maxtotal=500
????? RedisClient.scala加載配置文件示例如下:
?
?
?package com.bj58.adsp.dp.files.redis
import java.io.{InputStream, BufferedInputStream, FileInputStream}
import java.util.Properties
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
import scala.collection.JavaConversions._
/**
* Created by Administrator on 2015/10/23.
*/
object RedisClient extends Serializable {
val properties: Properties = new Properties
val in: InputStream = getClass.getResourceAsStream("/redis.properties")
properties.load(new BufferedInputStream(in))
val redisHost = properties.getProperty("redis.host")
val redisPort = properties.getProperty("redis.port")
val redisPwd = properties.getProperty("redis.password")
val redisTimeout = 30000
val config = new GenericObjectPoolConfig()
config.setTestOnBorrow(true)
config.setMaxIdle(properties.getProperty("redis.maxidle").toInt)
config.setMinIdle(properties.getProperty("redis.minidle").toInt)
config.setMaxTotal(properties.getProperty("redis.maxtotal").toInt)
lazy val pool = new JedisPool(config, redisHost, redisPort.toInt, redisTimeout, redisPwd)
lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
?
????? 如果是讀取配置文件內(nèi)容,可以直接:
?val properties: Properties = new Properties
val in: InputStream = getClass.getResourceAsStream("/redis.properties")
properties.load(new BufferedInputStream(in))
????? 如果是只加載配置文件,可以直接:
?
?
?public static RedisClient init() {
RedisClient client = null;
try {
// 根據(jù)配置文件初始化Redis客戶端
client = RedisClient.getInstance(getClass.getResource("/redis.properties"));
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
?
直接將配置文件內(nèi)容寫到程序中
示例如下:
?
?lines.foreachRDD(rdd => {
//embedded function
def func(records: Iterator[String]) {
var conn: Connection = null
var stmt: PreparedStatement = null
try {
val url = "jdbc:mysql://xx.xxx.xx.xxx:3307/supindb"
val user = "root"
val password = "root"
conn = DriverManager.getConnection(url, user, password)
records.flatMap(_.split(" ")).foreach(word => {
val sql = "insert into mytable(word) values(?)"
stmt = conn.prepareStatement(sql)
stmt.setString(1, word)
stmt.executeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}
val repartitionedRDD = rdd.repartition(4)
repartitionedRDD.foreachPartition(func)
})
?
使用addFile("__app__.jar")方式
????? 如果程序中調(diào)用其他服務,而其他服務需要加載配置文件,則可以將程序打成jar包,并命名為__app__.jar,使用sc.addFile("__app__.jar")方式即可。
?
Refer:
http://blog.csdn.net/aaa1117a8w5s6d/article/details/43090017
http://www.iteblog.com/archives/1704
總結(jié)
以上是生活随笔為你收集整理的Spark加载外部配置文件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark性能优化指南:基础篇
- 下一篇: Spark读取配置源码剖析