日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HDP聚合日志解析内容-ifile和tfile

發布時間:2024/3/26 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HDP聚合日志解析内容-ifile和tfile 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
解析hdfs上的聚合日志, 共4個類, 打包后上傳到服務器, 將hdfs上的日志文件下載到本地, 使用命令java -jar 包名 日志路徑名

效果圖:

代碼:

package YarnLogFileReader;import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream; import org.apache.hadoop.io.file.tfile.Compression; import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;import java.io.*; import java.util.*;public class IndexedFormatLogReader implements LogReader {public void printContainerLogForFile(Path path, Configuration conf) throws Exception{Compression.Algorithm compressName = Compression.getCompressionAlgorithmByName("gz");Decompressor decompressor = compressName.getDecompressor();FileContext fileContext = FileContext.getFileContext(path.toUri(), conf);FSDataInputStream fsDataInputStream = fileContext.open(path);FSDataInputStream fsDataInputStream1 = fileContext.open(path);long fileLength = fileContext.getFileStatus(path).getLen();fsDataInputStream.seek(fileLength - 4L - 32L);int offset = fsDataInputStream.readInt();byte[] array = new byte[offset];fsDataInputStream.seek(fileLength - (long) offset - 4L - 32L);int actual = fsDataInputStream.read(array);LogAggregationIndexedFileController.IndexedLogsMeta logMeta = (LogAggregationIndexedFileController.IndexedLogsMeta) SerializationUtils.deserialize(array);Iterator iter = logMeta.getLogMetas().iterator();while(iter.hasNext()) {LogAggregationIndexedFileController.IndexedPerAggregationLogMeta perAggregationLogMeta = (LogAggregationIndexedFileController.IndexedPerAggregationLogMeta) iter.next();Iterator iter1 = new TreeMap(perAggregationLogMeta.getLogMetas()).entrySet().iterator();while(iter1.hasNext()) {Map.Entry<String, List<LogAggregationIndexedFileController.IndexedFileLogMeta>> log = (Map.Entry) iter1.next();Iterator iter2 = log.getValue().iterator();InputStream in = null;while(iter2.hasNext()) {LogAggregationIndexedFileController.IndexedFileLogMeta indexedFileLogMeta = (LogAggregationIndexedFileController.IndexedFileLogMeta) iter2.next();in = compressName.createDecompressionStream(new BoundedRangeFileInputStream(fsDataInputStream1, indexedFileLogMeta.getStartIndex(), indexedFileLogMeta.getFileCompressedSize()), decompressor, 262144);StringBuilder sb = new StringBuilder();String containerStr = String.format("Container: %s on %s", indexedFileLogMeta.getContainerId(), path.getName());sb.append(containerStr + "\n");sb.append("LogType: " + indexedFileLogMeta.getFileName() + "\n");sb.append("LogLastModifiedTime: " + new Date(indexedFileLogMeta.getLastModifiedTime()) + "\n");sb.append("LogLength: " + indexedFileLogMeta.getFileSize() + "\n");sb.append("LogContents:\n");BufferedReader br = new BufferedReader(new InputStreamReader(in));System.out.println(sb.toString());String line = null;while((line = br.readLine()) != null) {System.out.println(line);}System.out.printf("End of LogType: %s\n", indexedFileLogMeta.getFileName());System.out.printf("*****************************************************************************\n\n");}}}fsDataInputStream.close();fsDataInputStream1.close();} } package YarnLogFileReader;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path;import java.io.DataInputStream;public interface LogReader {public void printContainerLogForFile(Path path, Configuration conf) throws Exception;} package YarnLogFileReader;import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.LogToolUtils; import org.apache.hadoop.yarn.util.Times;import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset;public class TFileLogReader implements LogReader {@Overridepublic void printContainerLogForFile(Path path, Configuration conf) throws Exception {try {AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, path);AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();FileContext context = FileContext.getFileContext(path.toUri(), conf);FileStatus status = context.getFileStatus(path);long size = context.getFileStatus(path).getLen();byte[] buf = new byte['\uffff'];DataInputStream valueStream = reader.next(key);while (true) {try {String fileType = valueStream.readUTF();String fileLengthStr = valueStream.readUTF();long fileLength = Long.parseLong(fileLengthStr);LogToolUtils.outputContainerLog(key.toString(), path.getName(), fileType, fileLength, size, Times.format(status.getModificationTime()), valueStream, (OutputStream) System.out, buf, ContainerLogAggregationType.AGGREGATED);byte[] b = this.aggregatedLogSuffix(fileType).getBytes(Charset.forName("UTF-8"));((OutputStream) System.out).write(b, 0, b.length);} catch (EOFException eofException) {break;}}}catch(IOException ioe) {if("Not a valid BCFile.".equals(ioe.getMessage())) {return;} elsethrow ioe;}}private String aggregatedLogSuffix(String fileName) {StringBuilder sb = new StringBuilder();String endOfFile = "End of LogType:" + fileName;sb.append("\n" + endOfFile + "\n");sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + "\n\n");return sb.toString();}} package YarnLogFileReader;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.azure.AzureException; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*;import org.apache.hadoop.io.file.tfile.Compression.Algorithm; import org.apache.hadoop.io.file.tfile.Compression;import java.security.SecureRandom;public class YarnLogFileReader {private static List list = new ArrayList();private static Configuration conf = new YarnConfiguration();private static final SecureRandom RAN = new SecureRandom();static {conf.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");conf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");conf.set("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem");conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");conf.set("fs.AbstractFileSystem.abfs.impl", "org.apache.hadoop.fs.azurebfs.Abfs");conf.set("fs.AbstractFileSystem.abfss.impl", "org.apache.hadoop.fs.azurebfs.Abfss"); // conf.addResource("D:\\xxd\\code-20211031\\YarnLogFileReader-master\\src\\core-site.xml"); // conf.addResource("D:\\xxd\\code-20211031\\YarnLogFileReader-master\\src\\hdfs-site.xml"); // conf.addResource("D:\\xxd\\code-20211031\\YarnLogFileReader-master\\src\\yarn-site.xml");}public static void main( String[] args ) throws Exception{if(args.length != 1) {System.out.println("Usage: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader <path of folder contains logs>" );System.out.println("Example: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader wasb://lazhuhdi-2019-05-09t07-12-39-811z@lzlazhuhdi.blob.core.windows.net//app-logs/chenghao.guo/logs-ifile/application_1557457099458_0010");System.exit(1);}try {InetAddress headnodehost = InetAddress.getByName("headnodehost");} catch(UnknownHostException ex) {System.out.println("Not running on cluster");conf.set("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");conf.set("fs.adls.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");conf.set("fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl");conf.set("fs.AbstractFileSystem.adls.impl", "org.apache.hadoop.fs.adl.Adl");YarnLogFileReader app = new YarnLogFileReader(false, args[0]);app.printAllContainerLog(args[0]);System.exit(0);}YarnLogFileReader app = new YarnLogFileReader(true, "");app.printAllContainerLog(args[0]);}public YarnLogFileReader(boolean isCluster, String path) throws IOException {if (!isCluster) {Console console = System.console();BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));int schemeIndex;if ((schemeIndex = path.indexOf("://")) != -1) {String scheme = path.substring(0, schemeIndex);scheme = "rgetebhethnnr";switch (scheme) {case "wasb":case "wasbs":case "abfs":case "abfss":if("wasb".equals(scheme) || "wasbs".equals("scheme"))System.out.println("Scheme is blob storage");elseSystem.out.println("Scheme is adls gen2");String accountName = path.substring(path.indexOf("@")+1, path.indexOf("/", schemeIndex+3));System.out.printf("Storage key (%s):", accountName);char[] storageKeyChars = console.readPassword();String storageKey = new String(storageKeyChars);conf.set("fs.azure.account.key."+accountName, storageKey);conf.set("fs.defaultFS", path.substring(0, path.indexOf("/", schemeIndex+3)));break;case "adl":System.out.println("Scheme is adls gen1");String adlsAccountName = path.substring(schemeIndex+3, path.indexOf("/", schemeIndex+3));System.out.printf("Client ID (%s): ", adlsAccountName);String clientId = reader.readLine();System.out.printf("Client Secret (%s): ", adlsAccountName);char[] clientSecretChars = console.readPassword();String clientSecret = new String(clientSecretChars);System.out.printf("Tenant ID (%s): ", adlsAccountName);String tenantId = reader.readLine();conf.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId+"/oauth2/token");conf.set("dfs.adls.oauth2.client.id", clientId);conf.set("dfs.adls.oauth2.credential", clientSecret);conf.set("fs.defaultFS", path.substring(0, path.indexOf("/", schemeIndex+3)));break;default:conf.set("fs.defaultFS", "file:///");conf.set("fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs");System.out.println("Try local file system");}} else {System.out.print("Type scheme (wasb, wasbs, abfs, abfss, adl):");//String scheme = reader.readLine();String scheme = "rgetebhethnnr"; // 跳過不管switch (scheme) {case "wasb":case "wasbs":case "abfs":case "abfss":if("wasb".equals(scheme) || "wasbs".equals(scheme))System.out.println("Scheme is blob storage");elseSystem.out.println("Scheme is adls gen2");System.out.print("Storage Account Name:");String accountName = reader.readLine();accountName = resolveAccountName(accountName, scheme);System.out.printf("Container Name (%s): ", accountName);String containerName = reader.readLine();System.out.printf("Storage key (%s): ", accountName);char[] storageKeyChars = console.readPassword();String storageKey = new String(storageKeyChars);if("wasb".equals(scheme) || "wasbs".equals(scheme)) {conf.set("fs.defaultFS", scheme + "://" + containerName + "@" + accountName);conf.set("fs.azure.account.key." + accountName, storageKey);} else {conf.set("fs.defaultFS", scheme + "://" + containerName + "@" + accountName);conf.set("fs.azure.account.key." + accountName, storageKey);}break;case "adl":case "adls":System.out.println("Scheme is adls gen1");System.out.print("Data Lake Account Name:");String adlsAccountName = reader.readLine();adlsAccountName = resolveAccountName(adlsAccountName, scheme);System.out.printf("Client ID (%s): ", adlsAccountName);String clientId = reader.readLine();System.out.printf("Client Secret (%s): ", adlsAccountName);char[] clientSecretChars = console.readPassword();String clientSecret = new String(clientSecretChars);System.out.printf("Tenant ID (%s): ", adlsAccountName);String tenantId = reader.readLine();conf.set("fs.defaultFS", scheme + "://" + adlsAccountName);conf.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId+"/oauth2/token");conf.set("dfs.adls.oauth2.client.id", clientId);conf.set("dfs.adls.oauth2.credential", clientSecret);break;default:conf.set("fs.defaultFS", "file:///");}}}}private String resolveAccountName(String accountName, String scheme) {if(accountName.indexOf(".") != -1)accountName = accountName.substring(0, accountName.indexOf("."));switch(scheme) {case "wasb":case "wasbs":accountName += ".blob.core.windows.net";break;case "abfs":case "abfss":accountName += ".dfs.core.windows.net";break;case "adl":accountName += ".azuredatalakestore.net";break;}return accountName;}private void printAllContainerLog(String file) throws Exception {List result = getAllFiles(new Path(file));if(result.size() == 0) {System.out.println("No file found");System.exit(0);}for(int i = 0; i < result.size(); i++) {printContainerLogForFile((Path) result.get(i));}}private void printContainerLogForFile(Path path) throws Exception {Algorithm compressName = Compression.getCompressionAlgorithmByName("gz");Decompressor decompressor = compressName.getDecompressor();try {LogReader logReader = probeFileFormat(path);logReader.printContainerLogForFile(path, conf);}catch(Exception ex){return;}}private LogReader probeFileFormat(Path path) throws Exception {FileContext fileContext = FileContext.getFileContext(path.toUri(), conf);FSDataInputStream fsDataInputStream = fileContext.open(path);long fileLength = fileContext.getFileStatus(path).getLen();try {fsDataInputStream.seek(fileLength - 4L - 32L);int offset = fsDataInputStream.readInt();if(offset >= 10485760)throw new Exception();byte[] array = new byte[offset];fsDataInputStream.seek(fileLength - (long) offset - 4L - 32L);fsDataInputStream.close();return new IndexedFormatLogReader();} catch (Exception eofex) {try {AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, path);return new TFileLogReader();} catch(Exception ex) {System.out.printf("The file %s is not an indexed formatted log file\n", path.toString());throw ex;}}}private List getAllFiles(Path path) throws Exception {try {FileSystem fs = FileSystem.newInstance(conf);if (!fs.getFileStatus(path).isDirectory())list.add(path);else {FileStatus[] files = fs.listStatus(path);for (int i = 0; i < files.length; i++) {if (files[i].isDirectory())getAllFiles(files[i].getPath());elselist.add(files[i].getPath());}}return list;} catch (AzureException ex) {System.out.println("Unable to initialize the filesystem or unable to list file status, please check input parameters");throw ex;}}} <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.microsoft.css.apacbigdata</groupId><artifactId>YarnLogFileReader</artifactId><version>1.0-SNAPSHOT</version><name>YarnLogFileReader</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-azure</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-azure-datalake</artifactId><version>3.2.1</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.4.1</version><configuration><!-- get all project dependencies --><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!-- MainClass in mainfest make a executable jar --><archive><manifest><mainClass>YarnLogFileReader.YarnLogFileReader</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><!-- bind to the packaging phase --><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><!-- <build>--> <!-- <pluginManagement>&lt;!&ndash; lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) &ndash;&gt;--> <!-- <plugins>--> <!-- <plugin>--> <!-- <artifactId>maven-assembly-plugin</artifactId>--> <!-- <version>2.4</version>--> <!-- <configuration>--> <!-- <archive>--> <!-- <manifest>--> <!-- <mainClass>YarnLogFileReader.YarnLogFileReader</mainClass>--> <!-- </manifest>--> <!-- </archive>--> <!-- <descriptor>--> <!-- src/assembly/dep.xml--> <!-- </descriptor>--> <!-- </configuration>--> <!-- <executions>--> <!-- <execution>--> <!-- <id>make-assembly</id>--> <!-- <phase>package</phase>--> <!-- <goals>--> <!-- <goal>single</goal>--> <!-- </goals>--> <!-- </execution>--> <!-- </executions>--> <!-- </plugin>--> <!-- </plugins>--> <!-- </pluginManagement>--> <!-- </build>--> </project>

總結

以上是生活随笔為你收集整理的HDP聚合日志解析内容-ifile和tfile的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。