日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HDFS数据定时采集demo 简单

發布時間:2025/3/15 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HDFS数据定时采集demo 简单 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、流程

啟動一個定時任務:

  --定時監測日志源目錄

  --獲取需要采集的文件

  --移動這些文件到一個待上傳臨時目錄中

  --遍歷待上傳目錄中各個文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄中去

啟動一個定時任務

  --探測備份目錄中的備份數據,檢查是否已經超出最長備份時長,如果超出,則刪除。

2、規劃各種路徑

日志路徑:d:/logs/accesslog/

待上傳路徑:d:/logs/toupload/

備份路徑:d:/logs/backup/日期/

?

HDFS存儲路徑 /logs/日期

hdfs中的文件前綴:access_log_

hdsf中的文件后綴:.log??

?

import java.util.Timer;public class DataCollectMain {public static void main(String[] args) {Timer timer = new Timer();//定時任務timer.schedule(new CollectTask(), 0, 60*60*1000L);timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);}}

  

package cn.edu360.hdfs.datacollect;import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID;import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger;public class CollectTask extends TimerTask {@Overridepublic void run() {/*** ——定時探測日志源目錄 ——獲取需要采集的文件 ——移動這些文件到一個待上傳臨時目錄* ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄* */try {// 獲取配置參數Properties props = PropertyHolderLazy.getProps();// 構造一個log4j日志對象Logger logger = Logger.getLogger("logRollingFile");// 獲取本次采集時的日期SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");String day = sdf.format(new Date());File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR));// 列出日志源目錄中需要采集的文件File[] listFiles = srcDir.listFiles(new FilenameFilter() {@Overridepublic boolean accept(File dir, String name) {if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) {return true;}return false;}});// 記錄日志logger.info("探測到如下文件需要采集:" + Arrays.toString(listFiles));// 將要采集的文件移動到待上傳臨時目錄File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR));for (File file : listFiles) {FileUtils.moveFileToDirectory(file, toUploadDir, true);}// 記錄日志logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath());// 構造一個HDFS的客戶端對象FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root");File[] toUploadFiles = toUploadDir.listFiles();// 檢查HDFS中的日期目錄是否存在,如果不存在,則創建Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day);if (!fs.exists(hdfsDestPath)) {fs.mkdirs(hdfsDestPath);}// 檢查本地的備份目錄是否存在,如果不存在,則創建File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/");if (!backupDir.exists()) {backupDir.mkdirs();}for (File file : toUploadFiles) {// 傳輸文件到HDFS并改名access_log_Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX));fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath);// 記錄日志logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath);// 將傳輸完成的文件移動到備份目錄FileUtils.moveFileToDirectory(file, backupDir, true);// 記錄日志logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir);}} catch (Exception e) {e.printStackTrace();}}}

  

package cn.edu360.hdfs.datacollect;import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask;import org.apache.commons.io.FileUtils;public class BackupCleanTask extends TimerTask {@Overridepublic void run() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");long now = new Date().getTime();try {// 探測本地備份目錄File backupBaseDir = new File("d:/logs/backup/");File[] dayBackDir = backupBaseDir.listFiles();// 判斷備份日期子目錄是否已超24小時for (File dir : dayBackDir) {long time = sdf.parse(dir.getName()).getTime();if(now-time>24*60*60*1000L){FileUtils.deleteDirectory(dir);}}} catch (Exception e) {e.printStackTrace();}}}

  

package cn.edu360.hdfs.datacollect;import java.util.Properties;/*** 單例模式:懶漢式——考慮了線程安全* @author ThinkPad**/public class PropertyHolderLazy {private static Properties prop = null;public static Properties getProps() throws Exception {if (prop == null) {synchronized (PropertyHolderLazy.class) {if (prop == null) {prop = new Properties();prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));}}}return prop;}}

  

package cn.edu360.hdfs.datacollect;public class Constants {/*** 日志源目錄參數key*/public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR";/*** 日志待上傳目錄參數key*/public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR";public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR";public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT";public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX";public static final String HDFS_URI = "HDFS_URI";public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR";public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX";public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX";}

  

LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log.HDFS_URI=hdfs://hdp-01:9000/ HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log <configuration> <property> <name>dfs.replication</name> <value>4</value> </property><property> <name>dfs.blocksize</name> <value>16m</value> </property> </configuration> ### \u8BBE\u7F6E### #log4j.rootLogger=debug,stdout,genlog log4j.rootLogger=INFO,logRollingFile,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### log4j.logger.logRollingFile= ERROR,test1 log4j.appender.test1 = org.apache.log4j.RollingFileAppender log4j.appender.test1.layout = org.apache.log4j.PatternLayout log4j.appender.test1.layout.ConversionPattern =%d{yyyy-MMM-dd HH:mm:ss}-[TS] %p %t %c - %m%n log4j.appender.test1.Threshold = DEBUG log4j.appender.test1.ImmediateFlush = TRUE log4j.appender.test1.Append = TRUE log4j.appender.test1.File = d:/logs/collect/collect.log log4j.appender.test1.MaxFileSize = 102400KB log4j.appender.test1.MaxBackupIndex = 200 ### log4j.appender.test1.Encoding = UTF-8

  

  

? ?

轉載于:https://www.cnblogs.com/zlz-bigdata/p/9566336.html

總結

以上是生活随笔為你收集整理的HDFS数据定时采集demo 简单的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。