日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

全网最详细SpringBatch批处理读取分区(Paratition)文件讲解

發(fā)布時(shí)間:2025/1/21 javascript 68 豆豆
生活随笔 收集整理的這篇文章主要介紹了 全网最详细SpringBatch批处理读取分区(Paratition)文件讲解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

      • 一、分區(qū)Step
        • 1、數(shù)據(jù)分區(qū)
        • 2、分區(qū)處理
      • 二、實(shí)現(xiàn)分區(qū)關(guān)鍵接口
        • 1、Partitioner
        • 2、StepExecutionSplitter
        • 3、PartitionHandler
      • 三、基本配置和屬性說(shuō)明
        • 1、基本配置
        • 2、屬性說(shuō)明
      • 四、文件分區(qū)
        • 1、定義分區(qū)文件Partitioner
        • 2、定義文件讀
        • 3、定義分區(qū)job配置
        • 4、定義processor
        • 4、定義writer
        • 4、定義step監(jiān)聽(tīng)器
        • 6、運(yùn)行job

寫(xiě)在前面: 我是「境里婆娑」。我還是從前那個(gè)少年,沒(méi)有一絲絲改變,時(shí)間只不過(guò)是考驗(yàn),種在心中信念絲毫未減,眼前這個(gè)少年,還是最初那張臉,面前再多艱險(xiǎn)不退卻。
寫(xiě)博客的目的就是分享給大家一起學(xué)習(xí)交流,如果您對(duì) Java感興趣,可以關(guān)注我,我們一起學(xué)習(xí)。

前言:為什么要寫(xiě)這篇文章,在網(wǎng)上很難找到一篇關(guān)于SpringBatch批處理讀取分區(qū)文件基于JavaBean配置的文章,因此我決定寫(xiě)一篇關(guān)于SpringBatch讀取分區(qū)文件基于javaBean配置的文章,希望這篇文章可以幫助新手的你或者你有一定經(jīng)驗(yàn)的可以加深印象。

一、分區(qū)Step

何為分區(qū)Step:

通過(guò)將任務(wù)進(jìn)行分區(qū),不同的Step處理不同任務(wù)數(shù)據(jù)達(dá)到提高Job效率功能。

分區(qū)作業(yè)可以分區(qū)兩個(gè)處理階段,數(shù)據(jù)分區(qū)、分區(qū)處理;

1、數(shù)據(jù)分區(qū)

數(shù)據(jù)分區(qū):根據(jù)特殊的規(guī)則,將數(shù)據(jù)進(jìn)行合理分片,為不同的數(shù)據(jù)切片生成數(shù)據(jù)執(zhí)行上下文Execution Context、作業(yè)執(zhí)行器Step Execution。可以通過(guò)接口Partitioner生成自定義分區(qū)邏輯,SpringBatch批處理框架默認(rèn)對(duì)多文件實(shí)現(xiàn)MultiResourcePartititoner;也可以自行擴(kuò)展接口Partitioner實(shí)現(xiàn)自定義分區(qū)邏輯。

2、分區(qū)處理

分區(qū)處理:通過(guò)數(shù)據(jù)分區(qū)后,不同的數(shù)據(jù)已經(jīng)被分配到不同的作業(yè)執(zhí)行器中,接下來(lái)需要交給分區(qū)處理器進(jìn)行作業(yè),分區(qū)處理器可以在本地或遠(yuǎn)程執(zhí)行被劃分的作業(yè)。接口PartitionHandler定義了分區(qū)處理邏輯,SpringBatch批處理框架默認(rèn)實(shí)現(xiàn)了本地分區(qū)處理TaskExecutorPartitionHandler;也可以自行擴(kuò)展接口PartitionHandler來(lái)實(shí)現(xiàn)自定義分區(qū)邏輯。

分區(qū)作業(yè)邏輯結(jié)構(gòu)圖:

二、實(shí)現(xiàn)分區(qū)關(guān)鍵接口

實(shí)現(xiàn)分區(qū)關(guān)鍵接口有如下:PartitionHandler、StepExecutionSplitter、Partitioner。

1、Partitioner

Partitoner接口定義了如何根據(jù)給定的分區(qū)規(guī)則進(jìn)行創(chuàng)建作業(yè)執(zhí)行分區(qū)的上下文。

Partitioner接口定義如下:

public interface Partitioner {Map<String, ExecutionContext> partition(int gridSize); }

gridSize含義:根據(jù)給定的gridSize大小進(jìn)行執(zhí)行上下文劃分。

2、StepExecutionSplitter

StepExecutionSplitter接口定義了如何根據(jù)給定的分區(qū)規(guī)則進(jìn)行創(chuàng)建作業(yè)執(zhí)行分區(qū)的執(zhí)行器。

StepExecutionSplitter接口定義如下

public interface StepExecutionSplitter {String getStepName();Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException; }

getStepName:獲取當(dāng)前定義的分區(qū)作業(yè)的名稱。
split:根據(jù)給定的分區(qū)規(guī)則為每個(gè)分區(qū)生成對(duì)應(yīng)的分區(qū)執(zhí)行器。

3、PartitionHandler

PartitionHandler接口定義了分區(qū)處理的邏輯,根據(jù)給定的StepExecutionSplitter進(jìn)行分區(qū)并執(zhí)行,最后將執(zhí)行的結(jié)果進(jìn)行收集,反饋給前端。

PartitionHandler接口定義如下

public interface PartitionHandler {Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception; }

三、基本配置和屬性說(shuō)明

上面兩節(jié)基本知識(shí)已經(jīng)介紹完畢,下面我們將講一個(gè)例子來(lái)鞏固之前知識(shí)。

1、基本配置

一個(gè)典型分區(qū)Job配置

@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}

2、屬性說(shuō)明

在配置分區(qū)Step之前,我們先看下分區(qū)Step的主要屬性定義和元素定義

屬性說(shuō)明
step用于指定分區(qū)step名稱
handler(屬性)屬性handler指定分區(qū)執(zhí)行器,需要實(shí)現(xiàn)接口PartitionHandler
handler(子元素)用于定義默認(rèn)實(shí)現(xiàn):TaskExecutorPartitionHandler
task-executor生命使用的線程池
grid-size聲明分區(qū)的HashMap的初始值大小

四、文件分區(qū)

SpringBatch框架提供了對(duì)文件分區(qū)的支持,實(shí)現(xiàn)類:MultiResourcePartitioner提供了對(duì)文件分區(qū)的默認(rèn)支持,根據(jù)文件名將不同文件處理進(jìn)行分區(qū),提升處理速度和效率。本文將按照此例子給出如何配置多文件分區(qū)實(shí)現(xiàn)。

讀取文件如下:

本節(jié)實(shí)例由于文件多,我們對(duì)文件進(jìn)行分區(qū),然后將文件的內(nèi)容寫(xiě)入DB,邏輯示意圖如下:

1、定義分區(qū)文件Partitioner

定義文件分區(qū),將不同的文件分配到不同的作業(yè)中,使用自定義MyMultiResourcePartitioner分區(qū)。

自定義分區(qū)MyMultiResourcePartitioner如下:

/*** @author shuliangzhao* @date 2020/12/4 23:14*/ public class MyMultiResourcePartitioner implements Partitioner {private static final String DEFAULT_KEY_NAME = "fileName";private static final String PARTITION_KEY = "partition";private Resource[] resources = new Resource[0];private String keyName = DEFAULT_KEY_NAME;public void setResources(Resource[] resources) {this.resources = resources;}public void setKeyName(String keyName) {this.keyName = keyName;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);int i = 0;for (Resource resource : resources) {ExecutionContext context = new ExecutionContext();Assert.state(resource.exists(), "Resource does not exist: "+resource);try {context.putString(keyName, resource.getURI().getPath());}catch (IOException e) {throw new IllegalArgumentException("File could not be located for: "+resource, e);}map.put(PARTITION_KEY + i, context);i++;}return map;} }

屬性keyName:用于指定作業(yè)上文中屬性名字,作用是在不同的作業(yè)上下文中可以獲取設(shè)置的對(duì)于屬性值。可以在讀寫(xiě)階段通過(guò)@Value("#{stepExecutionContext[fileName]}"方式獲取。

2、定義文件讀

配置好分區(qū)實(shí)現(xiàn),需要在每個(gè)分區(qū)作業(yè)中讀入不同文件,進(jìn)而提供文件處理效率。

PartitionMultiFileReader 實(shí)現(xiàn)

public class PartitionMultiFileReader extends FlatFileItemReader {public PartitionMultiFileReader(Class clz,String fileName) {setResource(new FileSystemResource(fileName.substring(1)));Field[] declaredFields = clz.getDeclaredFields();List<String> list = new ArrayList<>();for (Field field:declaredFields) {list.add(field.getName());}String[] names = new String[list.size()];DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();delimitedLineTokenizer.setDelimiter(",");delimitedLineTokenizer.setNames(list.toArray(names));DefaultLineMapper defaultLineMapper = new DefaultLineMapper();defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);CommonFieldSetMapper commonFieldSetMapper = new CommonFieldSetMapper();commonFieldSetMapper.setTargetType(clz);defaultLineMapper.setFieldSetMapper(commonFieldSetMapper);setLineMapper(defaultLineMapper);setName(clz.getSimpleName());} }

3、定義分區(qū)job配置

基于javabean方式實(shí)現(xiàn)job配置

package com.sl.config; //包導(dǎo)入省略/*** @author shuliangzhao* @Title: PartitionFileConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2020/12/4 21:09*/ @Configuration @EnableBatchProcessing public class PartitionMultiFileConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate PartitonMultiFileProcessor partitonMultiFileProcessor;@Autowiredprivate PartitionMultiFileWriter partitionMultiFileWriter;@Beanpublic Job partitionMultiFileJob() {return jobBuilderFactory.get("partitionMultiFileJob").start(partitionMasterMultiFileStep()).build();}@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}@Beanpublic PartitionHandler multiFilePartitionHandler() {TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(2);handler.setStep(partitionSlaveMultiFileStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());return handler;}@Beanpublic Step partitionSlaveMultiFileStep() {return stepBuilderFactory.get("partitionSlaveMultiFileStep").<CreditBill,CreditBill>chunk(1).reader(partitionMultiFileReader(null)).processor(partitonMultiFileProcessor).writer(partitionMultiFileWriter).build();}@Bean@StepScopepublic PartitionMultiFileReader partitionMultiFileReader(@Value("#{stepExecutionContext[fileName]}")String fileName) {return new PartitionMultiFileReader(CreditBill.class,fileName);}@Beanpublic MyMultiResourcePartitioner multiResourcePartitioner() {MyMultiResourcePartitioner multiResourcePartitioner = new MyMultiResourcePartitioner();multiResourcePartitioner.setKeyName("fileName");multiResourcePartitioner.setResources(getResource());return multiResourcePartitioner;}private Resource[] getResource() {String filePath = "D:\\aplus\\bill\\";File file = new File(filePath);List<Resource> resourceList = new ArrayList<>();if (file.isDirectory()) {String[] list = file.list();if (list != null) {for (String str : list) {String resource = file.getPath() + "\\" + str;FileSystemResource fileSystemResource = new FileSystemResource(resource);resourceList.add(fileSystemResource);}}}Resource[] resources = new Resource[resourceList.size()];return resourceList.toArray(resources);}}

4、定義processor

定義processor

/*** @author shuliangzhao* @date 2020/12/4 22:11*/ @Component @StepScope public class PartitonMultiFileProcessor implements ItemProcessor<CreditBill,CreditBill> {@Overridepublic CreditBill process(CreditBill item) throws Exception {CreditBill creditBill = new CreditBill();creditBill.setAcctid(item.getAcctid());creditBill.setAddress(item.getAddress());creditBill.setAmout(item.getAmout());creditBill.setDate(item.getDate());creditBill.setName(item.getName());return creditBill;} }

4、定義writer

/*** @author shuliangzhao* @date 2020/12/4 22:29*/ @Component @StepScope public class PartitionMultiFileWriter implements ItemWriter<CreditBill> {@Autowiredprivate CreditBillMapper creditBillMapper;@Overridepublic void write(List<? extends CreditBill> items) throws Exception {if (items != null && items.size() > 0) {items.stream().forEach(item -> {creditBillMapper.insert(item);});}} }

4、定義step監(jiān)聽(tīng)器

定義step監(jiān)聽(tīng)器目的是在處理作業(yè)之前打印線程名字和讀取文件名字

@Component public class PartitionStepListener implements StepExecutionListener {private static final Logger logger = LoggerFactory.getLogger(PartitionStepListener.class);@Overridepublic void beforeStep(StepExecution stepExecution) {logger.info("ThreadName={},steName={},FileName={}",Thread.currentThread().getName(),stepExecution.getStepName(),stepExecution.getExecutionContext().getString("fileName"));}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {return null;} }

6、運(yùn)行job

執(zhí)行job查看結(jié)果,可以看出不同的文件有不同的線程來(lái)處理,并且被分配到不同的分區(qū)作業(yè)步中執(zhí)行

2020-12-05 15:58:34.100 INFO 13208 --- [cTaskExecutor-1] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-1,steName=partitionSlaveMultiFileStep:partition1,FileName=/D:/aplus/bill/bill2.csv 2020-12-05 15:58:34.114 INFO 13208 --- [cTaskExecutor-3] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-3,steName=partitionSlaveMultiFileStep:partition0,FileName=/D:/aplus/bill/bill1.csv 2020-12-05 15:58:34.122 INFO 13208 --- [cTaskExecutor-2] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-2,steName=partitionSlaveMultiFileStep:partition2,FileName=/D:/aplus/bill/bill3.csv

至此,我們完成了對(duì)文件分區(qū)的處理。
如果想更詳細(xì)查看以上所有代碼請(qǐng)移步到github:文件分區(qū)詳細(xì)代碼

總結(jié)

以上是生活随笔為你收集整理的全网最详细SpringBatch批处理读取分区(Paratition)文件讲解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。