vb.net datagridview数据批量导入sql_导入:Java实现大批量数据导入导出(100W以上)
閱讀文本大概需要3分鐘。
來源:https://www.cnblogs.com/barrywxx/p/10700221.html
最近業務方有一個需求,需要一次導入超過100萬數據到系統數據庫。可能大家首先會想,這么大的數據,干嘛通過程序去實現導入,為什么不直接通過SQL導入到數據庫。
一、為什么一定要在代碼實現
說說為什么不能通過SQL直接導入到數據庫,而是通過程序實現:
1. 首先,這個導入功能開始提供頁面導入,只是開始業務方保證的一次只有<3W的數據導入;
2. 其次,業務方導入的內容需要做校驗,比如門店號,商品號等是否系統存在,需要程序校驗;
3. 最后,業務方導入的都是編碼,數據庫中還要存入對應名稱,方便后期查詢,SQL導入也是無法實現的。
基于以上上三點,就無法直接通過SQL語句導入數據庫。那就只能老老實實的想辦法通過程序實現。
二、程序實現有以下技術難點
1. 一次讀取這么大的數據量,肯定會導致服務器內存溢出;
2. 調用接口保存一次傳輸數據量太大,網絡傳輸壓力會很大;
3. 最終通過SQL一次批量插入,對數據庫壓力也比較大,如果業務同時操作這個表數據,很容易造成死鎖。
三、解決思路
根據列舉的技術難點我的解決思路是:
1. 既然一次讀取整個導入文件,那就先將文件流上傳到服務器磁盤,然后分批從磁盤讀取(支持多線程讀取),這樣就防止內存溢出;
2. 調用插入數據庫接口也是根據分批讀取的內容進行調用;
3. 分批插入數據到數據庫。
四、具體實現代碼
1. 流式上傳文件到服務器磁盤
略,一般Java上傳就可以實現,這里就不貼出。
2. 多線程分批從磁盤讀取
批量讀取文件:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* 類功能描述:批量讀取文件
*
* @author WangXueXing create at 19-3-14 下午6:47
* @version 1.0.0
*/
public class BatchReadFile {
private final Logger LOGGER = LoggerFactory.getLogger(BatchReadFile.class);
/**
* 字符集UTF-8
*/
public static final String CHARSET_UTF8 = "UTF-8";
/**
* 字符集GBK
*/
public static final String CHARSET_GBK = "GBK";
/**
* 字符集gb2312
*/
public static final String CHARSET_GB2312 = "gb2312";
/**
* 文件內容分割符-逗號
*/
public static final String SEPARATOR_COMMA = ",";
private int bufSize = 1024;
// 換行符
private byte key = "\n".getBytes()[0];
// 當前行數
private long lineNum = 0;
// 文件編碼,默認為gb2312
private String encode = CHARSET_GB2312;
// 具體業務邏輯監聽器
private ReaderFileListener readerListener;
public void setEncode(String encode) {
this.encode = encode;
}
public void setReaderListener(ReaderFileListener readerListener) {
this.readerListener = readerListener;
}
/**
* 獲取準確開始位置
* @param file
* @param position
* @return
* @throws Exception
*/
public long getStartNum(File file, long position) throws Exception {
long startNum = position;
FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
fcin.position(position);
try {
int cache = 1024;
ByteBuffer rBuffer = ByteBuffer.allocate(cache);
// 每次讀取的內容
byte[] bs = new byte[cache];
// 緩存
byte[] tempBs = new byte[0];
while (fcin.read(rBuffer) != -1) {
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果發現有上次未讀完的緩存,則將它加到當前讀取的內容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 獲取開始位置之后的第一個換行符
int endIndex = indexOf(newStrByte, 0);
if (endIndex != -1) {
return startNum + endIndex;
}
tempBs = substring(newStrByte, 0, newStrByte.length);
startNum += 1024;
}
} finally {
fcin.close();
}
return position;
}
/**
* 從設置的開始位置讀取文件,一直到結束為止。如果 end設置為負數,剛讀取到文件末尾
* @param fullPath
* @param start
* @param end
* @throws Exception
*/
public void readFileByLine(String fullPath, long start, long end) throws Exception {
File fin = new File(fullPath);
if (!fin.exists()) {
throw new FileNotFoundException("沒有找到文件:" + fullPath);
}
FileChannel fileChannel = new RandomAccessFile(fin, "r").getChannel();
fileChannel.position(start);
try {
ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
// 每次讀取的內容
byte[] bs = new byte[bufSize];
// 緩存
byte[] tempBs = new byte[0];
String line;
// 當前讀取文件位置
long nowCur = start;
while (fileChannel.read(rBuffer) != -1) {
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte;
//去掉表頭
if(nowCur == start){
int firstLineIndex = indexOf(bs, 0);
int newByteLenth = bs.length-firstLineIndex-1;
newStrByte = new byte[newByteLenth];
System.arraycopy(bs, firstLineIndex+1, newStrByte, 0, newByteLenth);
} else {
newStrByte = bs;
}
// 如果發現有上次未讀完的緩存,則將它加到當前讀取的內容前面
if (null != tempBs && tempBs.length != 0) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 是否已經讀到最后一位
boolean isEnd = false;
nowCur += bufSize;
// 如果當前讀取的位數已經比設置的結束位置大的時候,將讀取的內容截取到設置的結束位置
if (end > 0 && nowCur > end) {
// 緩存長度 - 當前已經讀取位數 - 最后位數
int l = newStrByte.length - (int) (nowCur - end);
newStrByte = substring(newStrByte, 0, l);
isEnd = true;
}
int fromIndex = 0;
int endIndex = 0;
// 每次讀一行內容,以 key(默認為\n) 作為結束符
while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
byte[] bLine = substring(newStrByte, fromIndex, endIndex);
line = new String(bLine, 0, bLine.length, encode);
lineNum++;
// 輸出一行內容,處理方式由調用方提供
readerListener.outLine(line.trim(), lineNum, false);
fromIndex = endIndex + 1;
}
// 將未讀取完成的內容放到緩存中
tempBs = substring(newStrByte, fromIndex, newStrByte.length);
if (isEnd) {
break;
}
}
// 將剩下的最后內容作為一行,輸出,并指明這是最后一行
String lineStr = new String(tempBs, 0, tempBs.length, encode);
readerListener.outLine(lineStr.trim(), lineNum, true);
} finally {
fileChannel.close();
fin.deleteOnExit();
}
}
/**
* 查找一個byte[]從指定位置之后的一個換行符位置
*
* @param src
* @param fromIndex
* @return
* @throws Exception
*/
private int indexOf(byte[] src, int fromIndex) throws Exception {
for (int i = fromIndex; i < src.length; i++) {
if (src[i] == key) {
return i;
}
}
return -1;
}
/**
* 從指定開始位置讀取一個byte[]直到指定結束位置為止生成一個全新的byte[]
*
* @param src
* @param fromIndex
* @param endIndex
* @return
* @throws Exception
*/
private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
int size = endIndex - fromIndex;
byte[] ret = new byte[size];
System.arraycopy(src, fromIndex, ret, 0, size);
return ret;
}
}
以上是關鍵代碼:利用FileChannel與ByteBuffer從磁盤中分批讀取數據
多線程調用批量讀取:?
/**
* 類功能描述: 線程讀取文件
*
* @author WangXueXing create at 19-3-14 下午6:51
* @version 1.0.0
*/
public class ReadFileThread extends Thread {
private ReaderFileListener processDataListeners;
private String filePath;
private long start;
private long end;
private Thread preThread;
public ReadFileThread(ReaderFileListener processDataListeners,
long start,long end,
String file) {
this(processDataListeners, start, end, file, null);
}
public ReadFileThread(ReaderFileListener processDataListeners,
long start,long end,
String file,
Thread preThread) {
this.setName(this.getName()+"-ReadFileThread");
this.start = start;
this.end = end;
this.filePath = file;
this.processDataListeners = processDataListeners;
this.preThread = preThread;
}
@Override
public void run() {
BatchReadFile readFile = new BatchReadFile();
readFile.setReaderListener(processDataListeners);
readFile.setEncode(processDataListeners.getEncode());
try {
readFile.readFileByLine(filePath, start, end + 1);
if(this.preThread != null){
this.preThread.join();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
監聽讀取:
import java.util.ArrayList;
import java.util.List;
/**
* 類功能描述:讀文件監聽父類
*
* @author WangXueXing create at 19-3-14 下午6:52
* @version 1.0.0
*/
public abstract class ReaderFileListener<T> {
// 一次讀取行數,默認為1000
private int readColNum = 1000;
/**
* 文件編碼
*/
private String encode;
/**
* 分批讀取行列表
*/
private List<String> rowList = new ArrayList<>();
/**
*其他參數
*/
private T otherParams;
/**
* 每讀取到一行數據,添加到緩存中
* @param lineStr 讀取到的數據
* @param lineNum 行號
* @param over 是否讀取完成
* @throws Exception
*/
public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
if(null != lineStr && !lineStr.trim().equals("")){
rowList.add(lineStr);
}
if (!over && (lineNum % readColNum == 0)) {
output(rowList);
rowList = new ArrayList<>();
} else if (over) {
output(rowList);
rowList = new ArrayList<>();
}
}
/**
* 批量輸出
*
* @param stringList
* @throws Exception
*/
public abstract void output(List<String> stringList) throws Exception;
/**
* 設置一次讀取行數
* @param readColNum
*/
protected void setReadColNum(int readColNum) {
this.readColNum = readColNum;
}
public String getEncode() {
return encode;
}
public void setEncode(String encode) {
this.encode = encode;
}
public T getOtherParams() {
return otherParams;
}
public void setOtherParams(T otherParams) {
this.otherParams = otherParams;
}
public List<String> getRowList() {
return rowList;
}
public void setRowList(List<String> rowList) {
this.rowList = rowList;
}
}
實現監聽讀取并分批調用插入數據接口:
import com.today.api.finance.ImportServiceClient;
import com.today.api.finance.request.ImportRequest;
import com.today.api.finance.response.ImportResponse;
import com.today.api.finance.service.ImportService;
import com.today.common.Constants;
import com.today.domain.StaffSimpInfo;
import com.today.util.EmailUtil;
import com.today.util.UserSessionHelper;
import com.today.util.readfile.ReadFile;
import com.today.util.readfile.ReadFileThread;
import com.today.util.readfile.ReaderFileListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;
/**
* 類功能描述:報表導入服務實現
*
* @author WangXueXing create at 19-3-19 下午1:43
* @version 1.0.0
*/
@Service
public class ImportReportServiceImpl extends ReaderFileListener<ImportRequest> {
private final Logger LOGGER = LoggerFactory.getLogger(ImportReportServiceImpl.class);
@Value("${READ_COL_NUM_ONCE}")
private String readColNum;
@Value("${REPORT_IMPORT_RECEIVER}")
private String reportImportReceiver;
/**
* 財務報表導入接口
*/
private ImportService service = new ImportServiceClient();
/**
* 讀取文件內容
* @param file
*/
public void readTxt(File file, ImportRequest importRequest) throws Exception {
this.setOtherParams(importRequest);
ReadFile readFile = new ReadFile();
try(FileInputStream fis = new FileInputStream(file)){
int available = fis.available();
long maxThreadNum = 3L;
// 線程粗略開始位置
long i = available / maxThreadNum;
this.setRowList(new ArrayList<>());
StaffSimpInfo staffSimpInfo = ((StaffSimpInfo)UserSessionHelper.getCurrentUserInfo().getData());
String finalReportReceiver = getEmail(staffSimpInfo.getEmail(), reportImportReceiver);
this.setReadColNum(Integer.parseInt(readColNum));
this.setEncode(ReadFile.CHARSET_GB2312);
//這里單獨使用一個線程是為了當maxThreadNum大于1的時候,統一管理這些線程
new Thread(()->{
Thread preThread = null;
FutureTask futureTask = null ;
try {
for (long j = 0; j < maxThreadNum; j++) {
//計算精確開始位置
long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2L;
//具體監聽實現
preThread = new ReadFileThread(this, startNum, endNum, file.getPath(), preThread);
futureTask = new FutureTask(preThread, new Object());
futureTask.run();
}
if(futureTask.get() != null) {
EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表成功", "導入報表成功" ); //todo 等文案
}
} catch (Exception e){
futureTask.cancel(true);
try {
EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表失敗", e.getMessage());
} catch (Exception e1){
//ignore
LOGGER.error("發送郵件失敗", e1);
}
LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗", e);
} finally {
futureTask.cancel(true);
}
}).start();
}
}
private String getEmail(String infoEmail, String reportImportReceiver){
if(StringUtils.isEmpty(infoEmail)){
return reportImportReceiver;
}
return infoEmail;
}
/**
* 每批次調用導入接口
* @param stringList
* @throws Exception
*/
@Override
public void output(List<String> stringList) throws Exception {
ImportRequest importRequest = this.getOtherParams();
List<List<String>> dataList = stringList.stream()
.map(x->Arrays.asList(x.split(ReadFile.SEPARATOR_COMMA)).stream().map(String::trim).collect(Collectors.toList()))
.collect(Collectors.toList());
LOGGER.info("上傳數據:{}", dataList);
importRequest.setDataList(dataList);
// LOGGER.info("request對象:{}",importRequest, "request增加請求字段:{}", importRequest.data);
ImportResponse importResponse = service.batchImport(importRequest);
LOGGER.info("===========SUCESS_CODE======="+importResponse.getCode());
//導入錯誤,輸出錯誤信息
if(!Constants.SUCESS_CODE.equals(importResponse.getCode())){
LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗","返回碼為:", importResponse.getCode() ,"返回信息:",importResponse.getMessage());
throw new RuntimeException("導入報表類型:"+importRequest.getReportType()+"失敗"+"返回碼為:"+ importResponse.getCode() +"返回信息:"+importResponse.getMessage());
}
// if(importResponse.data != null && importResponse.data.get().get("batchImportFlag")!=null) {
// LOGGER.info("eywa-service請求batchImportFlag不為空");
// }
importRequest.setData(importResponse.data);
}
}
就是設置分批讀取磁盤文件的線程數,我設置為3,大家不要設置太大,不然多個線程讀取到內存,也會造成服務器內存溢出。
以上所有批次的批量讀取并調用插入接口都成功發送郵件通知給導入人,任何一個批次失敗直接發送失敗郵件。?
數據庫分批插入數據:
/**
* 批量插入非聯機第三方導入賬單
* @param dataList
*/
def insertNonOnlinePayment(dataList: List[NonOnlineSourceData]) : Unit = {
if (dataList.nonEmpty) {
CheckAccountDataSource.mysqlData.withConnection { conn =>
val sql =
s""" INSERT INTO t_pay_source_data
(store_code,
store_name,
source_date,
order_type,
trade_type,
third_party_payment_no,
business_type,
business_amount,
trade_time,
created_at,
updated_at)
VALUES (?,?,?,?,?,?,?,?,?,NOW(),NOW())"""
conn.setAutoCommit(false)
var stmt = conn.prepareStatement(sql)
var i = 0
dataList.foreach { x =>
stmt.setString(1, x.storeCode)
stmt.setString(2, x.storeName)
stmt.setString(3, x.sourceDate)
stmt.setInt(4, x.orderType)
stmt.setInt(5, x.tradeType)
stmt.setString(6, x.tradeNo)
stmt.setInt(7, x.businessType)
stmt.setBigDecimal(8, x.businessAmount.underlying())
stmt.setString(9, x.tradeTime.getOrElse(null))
stmt.addBatch()
if ((i % 5000 == 0) && (i != 0)) { //分批提交
stmt.executeBatch
conn.commit
conn.setAutoCommit(false)
stmt = conn.prepareStatement(sql)
}
i += 1
}
stmt.executeBatch()
conn.commit()
}
}
}
以上代碼實現每5000 行提交一次批量插入,防止一次提較數據庫的壓力。
☆
往期精彩
☆
01?漫談發版哪些事,好課程推薦
02?Linux的常用最危險的命令
03?精講Spring Boot—入門+進階+實例
04?優秀的Java程序員必須了解的GC哪些
05?互聯網支付系統整體架構詳解
關注我
每天進步一點點
喜歡!在看? 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的vb.net datagridview数据批量导入sql_导入:Java实现大批量数据导入导出(100W以上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 空调防直吹是什么功能(如何选择空调)
- 下一篇: java 获取枚举对象_Java:获取与