JdbcPagingItemReader多线程的Step
2019獨角獸企業重金招聘Python工程師標準>>>
JdbcPagingItemReader多線程的Step
我們最經常使用的就是?JdbcCursorItemReader,使用游標的方式逐條數據的讀取。但是從spring 官方文檔我們知道 ,他不是線程安全的。在這里,我們使用?JdbcPagingItemReader從數據庫讀取數據,并且是分頁的讀,而且這個類是線程安全的,那么我們就可以使用多線程的Step,從而提高JOB的執行效率。
下面是主要的配置文件:
<beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?xmlns:batch="http://www.springframework.org/schema/batch"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/batch?http://www.springframework.org/schema/batch/spring-batch.xsdhttp://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd"><!--?包的掃描?--><context:component-scan?base-package="com.lyx.batch"?/><bean?id="exceptionHandler"?class="com.lyx.batch.ExceptionListener"?/><batch:step?id="abstractStep"?abstract="true"><batch:listeners><batch:listener?ref="exceptionHandler"?/></batch:listeners></batch:step><bean?id="abstractJdbcPagingItemReader"?abstract="true"class="org.springframework.batch.item.database.JdbcPagingItemReader"><property?name="dataSource"?ref="dataSource"?/></bean><!--?add?people?desc?job?begin?--><batch:job?id="addPeopleDescJob"><batch:step?id="addDescStep"?parent="abstractStep"><batch:tasklet><batch:chunk?reader="peopleAddDescReader"?processor="addDescProcessor"writer="addDescPeopleWriter"?commit-interval="2"?/></batch:tasklet></batch:step></batch:job><!--?add?people?desc?job?end?--><!--?使用分頁的reader?begin?--><bean?id="peopleAddDescReader"?parent="abstractJdbcPagingItemReader"scope="step"><property?name="queryProvider"><beanclass="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"><property?name="dataSource"?ref="dataSource"?/><property?name="selectClause"?value="select?person_id,?first_name,?last_name"?/><property?name="fromClause"?value="from?people"?/><property?name="whereClause"value="where?(?first_name?like?:first_name?or?last_name?like?:last_name?)?"?/><property?name="sortKey"?value="person_id"?/></bean></property><property?name="parameterValues"><map><entry?key="first_name"?value="#{jobParameters['first_name']}"?/><entry?key="last_name"?value="#{jobParameters['last_name']}"?/></map></property><!--?配置limit的大小?--><property?name="pageSize"?value="2"?/><property?name="rowMapper"?ref="peopleRowMapper"?/></bean><!--?使用分頁的reader?end?--><bean?id="peopleRowMapper"?class="com.lyx.batch.PeopleRowMapper"?/><bean?id="addDescProcessor"?class="com.lyx.batch.AddPeopleDescProcessor"?/><bean?id="addDescPeopleWriter"?class="com.lyx.batch.AddDescPeopleWriter"?/><!--tomcat?jdbc?pool數據源配置?--><bean?id="dataSource"?class="org.apache.tomcat.jdbc.pool.DataSource"destroy-method="close"><property?name="poolProperties"><bean?class="org.apache.tomcat.jdbc.pool.PoolProperties"><property?name="driverClassName"?value="com.mysql.jdbc.Driver"?/><property?name="url"?value="jdbc:mysql://localhost:3306/test"?/><property?name="username"?value="root"?/><property?name="password"?value="034039"?/></bean></property></bean><!--?spring?batch?配置jobRepository?--><batch:job-repository?id="jobRepository"data-source="dataSource"?transaction-manager="transactionManager"isolation-level-for-create="REPEATABLE_READ"?table-prefix="BATCH_"max-varchar-length="1000"?/><!--?spring的事務管理器?--><bean?id="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property?name="dataSource"?ref="dataSource"?/></bean><!--?batch?luncher?--><bean?id="jobLauncher"class="org.springframework.batch.core.launch.support.SimpleJobLauncher"><property?name="jobRepository"?ref="jobRepository"?/></bean> </beans>主要的配置就是:
<!--?使用分頁的reader?begin?--> <bean?id="peopleAddDescReader"?parent="abstractJdbcPagingItemReader"scope="step"><property?name="queryProvider"><beanclass="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"><property?name="dataSource"?ref="dataSource"?/><property?name="selectClause"?value="select?person_id,?first_name,?last_name"?/><property?name="fromClause"?value="from?people"?/><property?name="whereClause"value="where?(?first_name?like?:first_name?or?last_name?like?:last_name?)?"?/><property?name="sortKey"?value="person_id"?/></bean></property><property?name="parameterValues"><map><entry?key="first_name"?value="#{jobParameters['first_name']}"?/><entry?key="last_name"?value="#{jobParameters['last_name']}"?/></map></property><!--?配置limit的大小?--><property?name="pageSize"?value="2"?/><property?name="rowMapper"?ref="peopleRowMapper"?/> </bean> <!--?使用分頁的reader?end?-->其他類的從前面的文章找出處,下面是我為了調試與前面不同的類
PeopleRowMapper.java
package?com.lyx.batch;import?java.sql.ResultSet; import?java.sql.SQLException;import?org.springframework.jdbc.core.RowMapper;public?class?PeopleRowMapper?implements?RowMapper<People>?{public?People?mapRow(ResultSet?rs,?int?rowNum)?throws?SQLException?{People?p?=?new?People();System.out.println("-----------------------person_id-"+?rs.getInt("person_id"));p.setId(rs.getInt("person_id"));p.setFirstName(rs.getString("first_name"));p.setLastName(rs.getString("last_name"));return?p;}}?
運行程序AppMain14.java
package?com.lyx.batch;import?org.springframework.batch.core.ExitStatus; import?org.springframework.batch.core.Job; import?org.springframework.batch.core.JobExecution; import?org.springframework.batch.core.JobParametersBuilder; import?org.springframework.batch.core.JobParametersInvalidException; import?org.springframework.batch.core.launch.JobLauncher; import?org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import?org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import?org.springframework.batch.core.repository.JobRestartException; import?org.springframework.context.ApplicationContext; import?org.springframework.context.support.ClassPathXmlApplicationContext;/***?測試?使用分頁的?reader*?*?@author?Lenovo**/ public?class?AppMain14?{public?static?void?main(String[]?args)throws?JobExecutionAlreadyRunningException,?JobRestartException,JobInstanceAlreadyCompleteException,?JobParametersInvalidException?{long?startTime?=?System.currentTimeMillis();?//?獲取開始時間@SuppressWarnings("resource")ApplicationContext?context?=?new?ClassPathXmlApplicationContext(new?String[]?{?"classpath:spring-batch-paging.xml"?});JobParametersBuilder?jobParametersBuilder?=?new?JobParametersBuilder();jobParametersBuilder.addString("first_name",?"%JOHN%");jobParametersBuilder.addString("last_name",?"%DOE%");Job?job?=?(Job)?context.getBean("addPeopleDescJob");JobLauncher?launcher?=?(JobLauncher)?context.getBean("jobLauncher");JobExecution?result?=?launcher.run(job,jobParametersBuilder.toJobParameters());ExitStatus?es?=?result.getExitStatus();if?(es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode()))?{System.out.println("任務正常完成");}?else?{System.out.println("任務失敗,exitCode="?+?es.getExitCode());}long?endTime?=?System.currentTimeMillis();?//?獲取結束時間System.out.println("程序運行時間:?"?+?(endTime?-?startTime)?+?"ms");}}運行結果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
任務正常完成
程序運行時間: 11929ms
十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
最后是成功了。其實更重要的是JdbcPagingItemReader 的實現方式和源碼。為什么他是線程安全的,為什么他能分頁讀,這才是我們最終關心的。
這里我們使用的還是單線程的方式運行的該job ,下面我們來配置多線程的step。配置很簡單。就是配置一個異步的spring task executor,使用該異步 task executor 來運行我們的job。
?
看如下的配置:
<bean?id="taskExecutor"?class="org.springframework.core.task.SimpleAsyncTaskExecutor"?/> <!--?add?people?desc?job?begin?--> <batch:job?id="addPeopleDescJob"><batch:step?id="addDescStep"?parent="abstractStep"><batch:tasklet?task-executor="taskExecutor"><batch:chunk?reader="peopleAddDescReader"?processor="addDescProcessor"writer="addDescPeopleWriter"?commit-interval="2"?/></batch:tasklet></batch:step> </batch:job> <!--?add?people?desc?job?end?-->這里的?taskExecutor 就是一個異步的 task executor。
下面運行一下多線程的step。運行結果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
任務正常完成
程序運行時間: 8577ms
成功,好的,你是否注意到和上面的單線程的step 比,是不是程序運行的時間要少了。
關于?JdbcPagingItemReader 的實現方式和其線程安全性,如何分頁,JdbcPagingItemReader的分頁策略我們在下面文章道來。
==============END==============
轉載于:https://my.oschina.net/xinxingegeya/blog/347379
總結
以上是生活随笔為你收集整理的JdbcPagingItemReader多线程的Step的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Socket服务编程之-高效连
- 下一篇: 【转】Docker —— 从入门到实践