日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

發布時間:2024/4/18 数据库 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

有一段時間沒好好寫博客了,因為一直在做一個比較小型的工程項目,也常常用在企業里,就是將流式數據處理收集,再將這些流式數據進行一些計算以后再保存在mysql上,這是一套比較完整的流程,并且可以從數據庫中的數據再導入到hadoop上,再在hadoop上進行離線較慢的mapreduce計算,這是我后面要進行的項目。

項目準備環境

(1)zookeeper:
(2)spark

(3)kafka

(4)mysql

(5)navicat

(6)三臺虛擬機

(7)jdk

(8)intellij IDEA

(9)虛擬機vmware

虛擬機分別配置

虛擬機安裝環境
node01kafka zookeeper jdk 192.168.19.110
node02kafka zookeeper jdk spark 192.168.19.111
node03kafka zookeeper jdk mysql 192.168.19.112

具體的虛擬機的細節配置就不多說了,肯定是要關閉防火墻的。

開始實行

(1)分別在三臺主機上開啟zookeeper(zookeeper的集群配置可以看我這篇博客zookeeper的安裝和使用)

(2)分別在三臺主機上開啟kafka

(3)開啟產生消息隊列命令(前提創建好topic:spark(我這里是spark話題))

(4)在node3上開啟mysql

在mysql地下創建bigdata數據庫,進入數據庫后新建wordcount表,創建相應字段即可

(5)將寫好的代碼打成jar包:
寫代碼時是要寫scala語言,所以要加載好相應的插件:

package com.gzq.sparkimport java.sql.DriverManager import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level,Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}/*** @Auther: gzq* @Date: 2020/11/23 - 11 - 23 - 22:37 * @Description:*/ object Sparkstream_kafka202020 {Logger.getLogger("org").setLevel(Level.WARN)def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark_stream")val ssc : StreamingContext = new StreamingContext(conf,Seconds(3))val kafkaParams: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "spark2020",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("spark"), kafkaParams))kafkaDStream.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()kafkaDStream.foreachRDD(rdd => {rdd.foreachPartition(partitionOfRecords => {val connection = createConnection()partitionOfRecords.foreach(record => {System.out.println(record)// wordcount里的record.value()一定要加雙引號這樣才能是字符串類型val sql = "insert into wordcount(word, wordcount) values(" + '"'+ record.value() + '"' + "," + record.offset() +");"connection.createStatement().execute(sql)})connection.close()})})ssc.start()ssc.awaitTermination()}def createConnection() = {Class.forName("com.mysql.jdbc.Driver")DriverManager.getConnection("jdbc:mysql://192.168.19.112:3306/bigdata", "root", "123456")}}

maven依賴(可以根據自己的版本修改)

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gzq.spark2020</groupId><artifactId>spark2020</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.1.1</version></dependency></dependencies></project>


點擊進去

選擇自己的main
接下來apply ok
再點擊
隨后點擊build即可:

輸出在out目錄下
將jar包上傳到node02(有spark,直接本地運行)


輸入上面的3條內容,可以看見node02上的輸出:


查看數據庫也輸出了:

ps:踩過的坑
(1):

這行sql語句一定要注意。
因為我的word列定義的是varchar類型,所以必須傳入的是字符串類型,lang.String,所以要在record.value()兩側加入雙引號。
(2):
為什么我打jar包時沒有用maven,是因為maven打出來jar包沒有我寫的主函數,所以在用spark執行時它會報錯說找不到main函數的入口,找不到類,后來發現需要在pom文件中做相關的配置:

<build><finalName>WordCount</finalName><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><archive><manifest><mainClass>com.gzq.spark._01.WordCount</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

(3):
在開啟kafka時我發現開一會它就自動關閉,查看日志文件后發現我的kafka-logs文件出了問題,所以我將三臺主機這個文件夾下的所有文件全部刪除重啟kafka成功
(4):
因為我的zookeeper是多集群模式,所以它的選舉機制是必須要開啟半數以上,所以開啟zookeeper時要都開啟,如果只開啟了其中一臺也會啟動不起來。

總結

以上是生活随笔為你收集整理的用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。