有一段時間沒好好寫博客了,因為一直在做一個比較小型的工程項目,也常常用在企業里,就是將流式數據處理收集,再將這些流式數據進行一些計算以后再保存在mysql上,這是一套比較完整的流程,并且可以從數據庫中的數據再導入到hadoop上,再在hadoop上進行離線較慢的mapreduce計算,這是我后面要進行的項目。
項目準備環境
(1)zookeeper:
(2)spark
(3)kafka
(4)mysql
(5)navicat
(6)三臺虛擬機
(7)jdk
(8)intellij IDEA
(9)虛擬機vmware
虛擬機分別配置
虛擬機安裝環境
| node01 | kafka zookeeper jdk 192.168.19.110 |
| node02 | kafka zookeeper jdk spark 192.168.19.111 |
| node03 | kafka zookeeper jdk mysql 192.168.19.112 |
具體的虛擬機的細節配置就不多說了,肯定是要關閉防火墻的。
開始實行
(1)分別在三臺主機上開啟zookeeper(zookeeper的集群配置可以看我這篇博客zookeeper的安裝和使用)
(2)分別在三臺主機上開啟kafka
(3)開啟產生消息隊列命令(前提創建好topic:spark(我這里是spark話題))
(4)在node3上開啟mysql
在mysql地下創建bigdata數據庫,進入數據庫后新建wordcount表,創建相應字段即可
(5)將寫好的代碼打成jar包:
寫代碼時是要寫scala語言,所以要加載好相應的插件:
package com
.gzq
.spark
import java
.sql
.DriverManager
import org
.apache
.kafka
.clients
.consumer
.{ConsumerConfig
, ConsumerRecord
}
import org
.apache
.kafka
.common
.serialization
.StringDeserializer
import org
.apache
.log4j
.{Level
,Logger
}
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.streaming
.dstream
.InputDStream
import org
.apache
.spark
.streaming
.kafka010
.{ConsumerStrategies
, KafkaUtils
, LocationStrategies
}
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
object Sparkstream_kafka202020
{Logger
.getLogger("org").setLevel(Level
.WARN
)def
main(args
: Array
[String
]): Unit
= {val conf
: SparkConf
= new SparkConf().setMaster("local[*]").setAppName("Spark_stream")val ssc
: StreamingContext
= new StreamingContext(conf
,Seconds(3))val kafkaParams
: Map
[String
,Object
] = Map
[String
,Object
](ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
-> "node01:9092,node02:9092,node03:9092",ConsumerConfig
.GROUP_ID_CONFIG
-> "spark2020",ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
-> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
-> classOf
[StringDeserializer
])val kafkaDStream
: InputDStream
[ConsumerRecord
[String
,String
]] = KafkaUtils
.createDirectStream(ssc
,LocationStrategies
.PreferConsistent
,ConsumerStrategies
.Subscribe(Set("spark"), kafkaParams
))kafkaDStream
.map(_
.value()).flatMap(_
.split(" ")).map((_
,1)).reduceByKey(_
+_
).print()kafkaDStream
.foreachRDD(rdd
=> {rdd
.foreachPartition(partitionOfRecords
=> {val connection
= createConnection()partitionOfRecords
.foreach(record
=> {System
.out
.println(record
)val sql
= "insert into wordcount(word, wordcount) values(" + '"'+ record
.value() + '"' + "," + record
.offset() +");"connection
.createStatement().execute(sql
)})connection
.close()})})ssc
.start()ssc
.awaitTermination()}def
createConnection() = {Class
.forName("com.mysql.jdbc.Driver")DriverManager
.getConnection("jdbc:mysql://192.168.19.112:3306/bigdata", "root", "123456")}}
maven依賴(可以根據自己的版本修改)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0
</modelVersion><groupId>com.gzq.spark2020
</groupId><artifactId>spark2020
</artifactId><version>1.0-SNAPSHOT
</version><dependencies><dependency><groupId>mysql
</groupId><artifactId>mysql-connector-java
</artifactId><version>5.1.1
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-core_2.11
</artifactId><version>2.1.1
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming_2.11
</artifactId><version>2.1.1
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming-kafka-0-10_2.11
</artifactId><version>2.1.1
</version></dependency></dependencies></project>
點擊進去
選擇自己的main
接下來apply ok
再點擊
隨后點擊build即可:
輸出在out目錄下
將jar包上傳到node02(有spark,直接本地運行)
輸入上面的3條內容,可以看見node02上的輸出:
查看數據庫也輸出了:
ps:踩過的坑
(1):
這行sql語句一定要注意。
因為我的word列定義的是varchar類型,所以必須傳入的是字符串類型,lang.String,所以要在record.value()兩側加入雙引號。
(2):
為什么我打jar包時沒有用maven,是因為maven打出來jar包沒有我寫的主函數,所以在用spark執行時它會報錯說找不到main函數的入口,找不到類,后來發現需要在pom文件中做相關的配置:
<build><finalName>WordCount
</finalName><plugins><plugin><groupId>net.alchim31.maven
</groupId><artifactId>scala-maven-plugin
</artifactId><version>3.4.6
</version><executions><execution><goals><goal>compile
</goal><goal>testCompile
</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins
</groupId><artifactId>maven-assembly-plugin
</artifactId><version>3.0.0
</version><configuration><archive><manifest><mainClass>com.gzq.spark._01.WordCount
</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies
</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly
</id><phase>package
</phase><goals><goal>single
</goal></goals></execution></executions></plugin></plugins></build>
(3):
在開啟kafka時我發現開一會它就自動關閉,查看日志文件后發現我的kafka-logs文件出了問題,所以我將三臺主機這個文件夾下的所有文件全部刪除重啟kafka成功
(4):
因為我的zookeeper是多集群模式,所以它的選舉機制是必須要開啟半數以上,所以開啟zookeeper時要都開啟,如果只開啟了其中一臺也會啟動不起來。
總結
以上是生活随笔為你收集整理的用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。