日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark加载外部配置文件

發(fā)布時間:2024/1/17 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark加载外部配置文件 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

--files path啟動加載配置文件

在spark-streaming程序中需要配置文件中的數(shù)據(jù)來完成某項統(tǒng)計時,需要把配置文件打到工程里,maven的配置如下:

?

?
  • <build>

  • <plugins>

  • <plugin>

  • <groupId>org.apache.maven.plugins</groupId>

  • <artifactId>maven-surefire-plugin</artifactId>

  • <configuration>

  • <skip>true</skip>

  • </configuration>

  • </plugin>

  • </plugins>

  • <resources>

  • <resource>

  • <directory>src/main/resources</directory>

  • <includes>

  • <include>**/*.txt</include>

  • <include>*.txt</include>

  • </includes>

  • <filtering>true</filtering>

  • </resource>

  • </resources>

  • </build>

  • ?

    這樣在local模式下運行時沒問題的,但是要放在yarn集群上就會出問題,需要用如下方式來調(diào)用:

    ?

    ?
  • spark-submit --class com.kingsoft.server.KssNodeStreaming

  • --master yarn-cluster

  • --driver-memory 2G

  • --executor-memory 5G

  • --num-executors 10

  • --jars /home/hadoop/spark-streaming-flume_2.10-1.0.1.jar,/home/hadoop/avro-ipc-1.7.5-cdh5.1.0.jar,/home/hadoop/flume-ng-sdk-1.5.0.1.jar,/home/hadoop/fastjson-1.1.41.jar

  • --files /home/hadoop/idc_ip.txt,/home/hadoop/ipdata.txt

  • /home/hadoop/SparkStreaming-0.0.1-SNAPSHOT.jar

  • 0.0.0.0

  • 58006

  • ?

    Spark中addFile加載配置文件

    ????? 我們在使用Spark的時候有時候需要將一些數(shù)據(jù)分發(fā)到計算節(jié)點中。一種方法是將這些文件上傳到HDFS上,然后計算節(jié)點從HDFS上獲取這些數(shù)據(jù)。當然我們也可以使用addFile函數(shù)來分發(fā)這些文件。注意,如果是spark程序通過yarn集群上加載配置文件,path必須是集群hdfs的絕對路徑,如:viewfs://58-cluster//home/hdp_lbg_supin/resultdata/zhaopin/recommend/config/redis.properties。

    addFile

    ??addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夾(如果是文件夾,必須是HDFS路徑),然后Spark的Driver和Exector可以通過SparkFiles.get()方法來獲取文件的絕對路徑(Get the absolute path of a file added through SparkContext.addFile()),addFile的函數(shù)原型如下:

    ?

    ?
  • def addFile(path: String): Unit

  • def addFile(path: String, recursive: Boolean): Unit

  • ?

    ????? addFile把添加的本地文件傳送給所有的Worker,這樣能夠保證在每個Worker上正確訪問到文件。另外,Worker會把文件放在臨時目錄下。因此,比較適合用于文件比較小,計算比較復雜的場景。如果文件比較大,網(wǎng)絡傳送的消耗時間也會增長。

    ????? path:可以是local、hdfs(任何hadoop支持的文件系統(tǒng))、HTTP、HTTPS、FTP等。local方式時,在windows下使用絕對路徑時需要加個“/”,如“d:/iteblog.data”得寫成“/d:/iteblog.data”或“file:///d:/iteblog.data”。

     ? recursive:如果path是一個目錄,那么我們可以設置recursive為true,這樣Spark會遞歸地分發(fā)這個路徑下面的所有文件到計算節(jié)點的臨時目錄。

     ? 通過SparkFiles.get(path:String)獲取添加的文件路徑。

    ?

    ?
  • var path = "/user/iteblog/ip.txt"

  • sc.addFile(path)

  • val rdd = sc.textFile(SparkFiles.get(path))

  • ?

    ????? 上面的實例展示了如何在Driver中獲取分發(fā)出去的文件,我們還可以在Exector獲取到分發(fā)的文件:

    ?

    ?
  • var path = "/user/iteblog/ip.txt"

  • sc.addFile(path)

  • val rdd = sc.parallelize((0 to 10))

  • rdd.foreach{ index =>

  • val path = SparkFiles.get(path)

  • ......

  • }

  • ????? 如果我們添加的是壓縮文件,比如.tar.gz、.tgz或者.tar,Spark會調(diào)用Linux的解壓縮命令tar去解壓縮這些文件。

    ?

    addJar

    ??addJar添加在這個SparkContext實例運行的作業(yè)所依賴的jar。,其函數(shù)原型如下:

    ?

    def addJar(path: String)

    ????? path:可以是本地文件(local file)、HDFS文件(其他所有的Hadoop支持的文件系統(tǒng)也可以)、HTTP、 HTTPS 或者是FTP URI文件等等。

    ?

     ? 其實Spark內(nèi)部通過spark.jars參數(shù)以及spark.yarn.dist.jars函數(shù)傳進去的Jar都是通過這個函數(shù)分發(fā)到Task的。

    ?

    將配置文件打到工程jar包里

    ?

    ????? 注意:IntelliJ IDEA創(chuàng)建Maven項目時,必須是Java項目,否則不能將配置文件打到工程jar包。

    ????? redis.properties配置文件示例如下:

    ?

    ?
  • redis.host=xx.xx.xxx.x

  • redis.port=6380

  • redis.password=6f3d16c5119bb946

  • redis.maxActive=500

  • redis.maxWait=10000

  • redis.maxidle=500

  • redis.minidle=10

  • redis.maxtotal=500

  • ????? RedisClient.scala加載配置文件示例如下:

    ?

    ?

    ?
  • package com.bj58.adsp.dp.files.redis

  • ?
  • import java.io.{InputStream, BufferedInputStream, FileInputStream}

  • import java.util.Properties

  • ?
  • import org.apache.commons.pool2.impl.GenericObjectPoolConfig

  • import redis.clients.jedis.JedisPool

  • import scala.collection.JavaConversions._

  • ?
  • /**

  • * Created by Administrator on 2015/10/23.

  • */

  • object RedisClient extends Serializable {

  • ?
  • val properties: Properties = new Properties

  • val in: InputStream = getClass.getResourceAsStream("/redis.properties")

  • properties.load(new BufferedInputStream(in))

  • val redisHost = properties.getProperty("redis.host")

  • val redisPort = properties.getProperty("redis.port")

  • val redisPwd = properties.getProperty("redis.password")

  • val redisTimeout = 30000

  • val config = new GenericObjectPoolConfig()

  • config.setTestOnBorrow(true)

  • config.setMaxIdle(properties.getProperty("redis.maxidle").toInt)

  • config.setMinIdle(properties.getProperty("redis.minidle").toInt)

  • config.setMaxTotal(properties.getProperty("redis.maxtotal").toInt)

  • ?
  • lazy val pool = new JedisPool(config, redisHost, redisPort.toInt, redisTimeout, redisPwd)

  • ?
  • lazy val hook = new Thread {

  • override def run = {

  • println("Execute hook thread: " + this)

  • pool.destroy()

  • }

  • }

  • sys.addShutdownHook(hook.run)

  • }

  • ?

    ????? 如果是讀取配置文件內(nèi)容,可以直接:

    ?
  • val properties: Properties = new Properties

  • val in: InputStream = getClass.getResourceAsStream("/redis.properties")

  • properties.load(new BufferedInputStream(in))

  • ????? 如果是只加載配置文件,可以直接:

    ?

    ?

    ?
  • public static RedisClient init() {

  • RedisClient client = null;

  • try {

  • // 根據(jù)配置文件初始化Redis客戶端

  • client = RedisClient.getInstance(getClass.getResource("/redis.properties"));

  • } catch (Exception e) {

  • e.printStackTrace();

  • }

  • return client;

  • }

  • ?

    直接將配置文件內(nèi)容寫到程序中

    示例如下:

    ?

    ?
  • lines.foreachRDD(rdd => {

  • //embedded function

  • def func(records: Iterator[String]) {

  • var conn: Connection = null

  • var stmt: PreparedStatement = null

  • try {

  • val url = "jdbc:mysql://xx.xxx.xx.xxx:3307/supindb"

  • val user = "root"

  • val password = "root"

  • conn = DriverManager.getConnection(url, user, password)

  • records.flatMap(_.split(" ")).foreach(word => {

  • val sql = "insert into mytable(word) values(?)"

  • stmt = conn.prepareStatement(sql)

  • stmt.setString(1, word)

  • stmt.executeUpdate()

  • })

  • } catch {

  • case e: Exception => e.printStackTrace()

  • } finally {

  • if (stmt != null) {

  • stmt.close()

  • }

  • if (conn != null) {

  • conn.close()

  • }

  • }

  • }

  • ?
  • val repartitionedRDD = rdd.repartition(4)

  • repartitionedRDD.foreachPartition(func)

  • })

  • ?

    使用addFile("__app__.jar")方式

    ????? 如果程序中調(diào)用其他服務,而其他服務需要加載配置文件,則可以將程序打成jar包,并命名為__app__.jar,使用sc.addFile("__app__.jar")方式即可。

    ?

    Refer:

    http://blog.csdn.net/aaa1117a8w5s6d/article/details/43090017

    http://www.iteblog.com/archives/1704

    總結(jié)

    以上是生活随笔為你收集整理的Spark加载外部配置文件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。