HDFS数据定时采集demo 简单
生活随笔
收集整理的這篇文章主要介紹了
HDFS数据定时采集demo 简单
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、流程
啟動一個定時任務:
--定時監測日志源目錄
--獲取需要采集的文件
--移動這些文件到一個待上傳臨時目錄中
--遍歷待上傳目錄中各個文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄中去
啟動一個定時任務
--探測備份目錄中的備份數據,檢查是否已經超出最長備份時長,如果超出,則刪除。
2、規劃各種路徑
日志路徑:d:/logs/accesslog/
待上傳路徑:d:/logs/toupload/
備份路徑:d:/logs/backup/日期/
?
HDFS存儲路徑 /logs/日期
hdfs中的文件前綴:access_log_
hdsf中的文件后綴:.log??
?
import java.util.Timer;public class DataCollectMain {public static void main(String[] args) {Timer timer = new Timer();//定時任務timer.schedule(new CollectTask(), 0, 60*60*1000L);timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);}}package cn.edu360.hdfs.datacollect;import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID;import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger;public class CollectTask extends TimerTask {@Overridepublic void run() {/*** ——定時探測日志源目錄 ——獲取需要采集的文件 ——移動這些文件到一個待上傳臨時目錄* ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄* */try {// 獲取配置參數Properties props = PropertyHolderLazy.getProps();// 構造一個log4j日志對象Logger logger = Logger.getLogger("logRollingFile");// 獲取本次采集時的日期SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");String day = sdf.format(new Date());File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR));// 列出日志源目錄中需要采集的文件File[] listFiles = srcDir.listFiles(new FilenameFilter() {@Overridepublic boolean accept(File dir, String name) {if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) {return true;}return false;}});// 記錄日志logger.info("探測到如下文件需要采集:" + Arrays.toString(listFiles));// 將要采集的文件移動到待上傳臨時目錄File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR));for (File file : listFiles) {FileUtils.moveFileToDirectory(file, toUploadDir, true);}// 記錄日志logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath());// 構造一個HDFS的客戶端對象FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root");File[] toUploadFiles = toUploadDir.listFiles();// 檢查HDFS中的日期目錄是否存在,如果不存在,則創建Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day);if (!fs.exists(hdfsDestPath)) {fs.mkdirs(hdfsDestPath);}// 檢查本地的備份目錄是否存在,如果不存在,則創建File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/");if (!backupDir.exists()) {backupDir.mkdirs();}for (File file : toUploadFiles) {// 傳輸文件到HDFS并改名access_log_Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX));fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath);// 記錄日志logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath);// 將傳輸完成的文件移動到備份目錄FileUtils.moveFileToDirectory(file, backupDir, true);// 記錄日志logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir);}} catch (Exception e) {e.printStackTrace();}}}
package cn.edu360.hdfs.datacollect;import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask;import org.apache.commons.io.FileUtils;public class BackupCleanTask extends TimerTask {@Overridepublic void run() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");long now = new Date().getTime();try {// 探測本地備份目錄File backupBaseDir = new File("d:/logs/backup/");File[] dayBackDir = backupBaseDir.listFiles();// 判斷備份日期子目錄是否已超24小時for (File dir : dayBackDir) {long time = sdf.parse(dir.getName()).getTime();if(now-time>24*60*60*1000L){FileUtils.deleteDirectory(dir);}}} catch (Exception e) {e.printStackTrace();}}}
package cn.edu360.hdfs.datacollect;import java.util.Properties;/*** 單例模式:懶漢式——考慮了線程安全* @author ThinkPad**/public class PropertyHolderLazy {private static Properties prop = null;public static Properties getProps() throws Exception {if (prop == null) {synchronized (PropertyHolderLazy.class) {if (prop == null) {prop = new Properties();prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));}}}return prop;}}
package cn.edu360.hdfs.datacollect;public class Constants {/*** 日志源目錄參數key*/public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR";/*** 日志待上傳目錄參數key*/public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR";public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR";public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT";public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX";public static final String HDFS_URI = "HDFS_URI";public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR";public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX";public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX";}
LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log.HDFS_URI=hdfs://hdp-01:9000/ HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log <configuration> <property> <name>dfs.replication</name> <value>4</value> </property><property> <name>dfs.blocksize</name> <value>16m</value> </property> </configuration> ### \u8BBE\u7F6E### #log4j.rootLogger=debug,stdout,genlog log4j.rootLogger=INFO,logRollingFile,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### log4j.logger.logRollingFile= ERROR,test1 log4j.appender.test1 = org.apache.log4j.RollingFileAppender log4j.appender.test1.layout = org.apache.log4j.PatternLayout log4j.appender.test1.layout.ConversionPattern =%d{yyyy-MMM-dd HH:mm:ss}-[TS] %p %t %c - %m%n log4j.appender.test1.Threshold = DEBUG log4j.appender.test1.ImmediateFlush = TRUE log4j.appender.test1.Append = TRUE log4j.appender.test1.File = d:/logs/collect/collect.log log4j.appender.test1.MaxFileSize = 102400KB log4j.appender.test1.MaxBackupIndex = 200 ### log4j.appender.test1.Encoding = UTF-8
? ?
轉載于:https://www.cnblogs.com/zlz-bigdata/p/9566336.html
總結
以上是生活随笔為你收集整理的HDFS数据定时采集demo 简单的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos查看yum包所有版本(查看包
- 下一篇: 2018.09.01 独立集(树形dp)