java任务_Java 任务处理
最近梳理其他同事以前寫的 job 后有點想法,記錄下。
一、業務場景
在大多數的系統都有類似這樣的邏輯,比如下單了給用戶贈送積分,用戶在論壇上發表了帖子,給用戶增加積分等等。
下單贈送積分,那么一個訂單肯定不能重復贈送積分,所以需要一些狀態來比較來哪些是已贈送的,哪些是沒有贈送的。或許可以在訂單表里加個字段來標記是否贈送了積分。
有時候,業務人員出于營銷的需要,可能要搞個某某時間段內下單返券的活動。難道又在訂單表里加個字段?肯定不能,誰知道還要搞多少活動呢。
二、實現
為了使核心的業務流程盡可能簡單高效,贈送積分、返券(后面簡稱為task)之類的邏輯應該通過異步的job來處理。
因為 task 的處理狀態不能放在核心的業務表里,所以,可以另外建一個表示異步任務的 async_task 表,結構如下:
-- 業務job處理 任務
create table async_task (
id number(11) primary key,
key_work varchar2(32), -- 不同業務邏輯的task用不同的keyword
biz_id char(32), -- 業務數據 ID,比如訂單號
biz_data varchar2(256), -- 核心的業務數據,用于避免關聯業務表;具體結構取決于keyword
status number, -- 任務的處理狀態; -2:未處理, -1:處理中, 0:已處理, 大于 0 的數字表示失敗次數
create_tm date, -- 任務的創建時間
modify_tm date -- 任務的修改時間
);
處于性能考慮,可以在 key_work 字段上建立分區,在 biz_id 上建立索引。
當業務表有需要處理的數據時,就往 async_task 插入一條相應的記錄(可以異步插入),異步 job 再從 async_task 表里取數據來處理。
注意:處理 task 時,要保證數據的一致性。所在的項目組曾出現過,下單返券的活動里,送券與更新狀態的操作沒有放在同一個事務里,出現券送了,狀態沒更新,重復送券的問題。一定要注意事務的正確處理。
三、單線程、多線程處理 task
不管是用單線程還是多線程,都要考慮有大量 task 的情況,所以不能一次把所有符合條件的 task 都讀取到內存里,一定要分頁。
單機單線程
不用考慮數據被其他線程重復處理的情況,順序處理即可:取一批數據處理,處理完了再取下一批,直到所有的都處理完了。
單機多線程
數據量大了,就不能用單線程慢慢地處理了。可以采用一個線程去讀取未處理的 task,然后提交到線程池去處理,等這批 task 處理完后再去讀取下一批,主流程如下:
// 直接使用 ThreadPoolExecutor 是為了使線程池的線程有特定的名字,任務隊列有邊界。
ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue(1000), // 有界隊列
new ThreadFactory() { // 使用定制的
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("task-handler-"
+ counter.incrementAndGet());
return null;
}
}, new CallerRunsPolicy());
do {
List tasks = getUnhandleTask();
if (tasks.isEmpty()) {
break;
}
List> callables = convert2callables(tasks);
executorService.invokeAll(callables);
} while (true);
executorService.shutdown();
線程池采用 CallerRunsPolicy 策略是為了在線程池處理不完任務,線程池的任務隊列滿的時候,讀取 task 的線程可以直接處理 task,這樣既減緩了 task 的讀取速度,又可以加快 task 的處理速度。
多機處理
數據量實在太多,一臺機器處理不完,可以用多臺。
在多機處理的時候,上面的代碼就有問題了,task 可能在不同的機器上被重復處理。
任務被 getUnhandleTask() 方法讀取處理后、處理完成前,另一臺機器上的線程也讀取到了這個任務,發現是未處理的,它也會進行處理,這樣就出現重復處理了。正確的主流程如下:
public class AsyncTask {
private long id;
private int status;
public static enum STATUS {
UNHANDLE, HANDLING
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}
public class TestTaskHandle {
private Callable convert(final AsyncTask task) {
return new Callable() {
@Override
public Object call() throws Exception {
return doWithTask(task);
}
};
}
private Object doWithTask(AsyncTask task) {
return null;
}
private List getUnhandleTask() {
return null;
}
public void multiMachine() {
ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue(1000),
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("task-handler-"
+ counter.incrementAndGet());
return null;
}
}, new CallerRunsPolicy());
do {
List tasks = getUnhandleTask();
if (tasks.isEmpty()) {
break;
}
for (AsyncTask asyncTask : tasks) {
// 把 RDBMS 的 update 操作當作一個 CAS 命令
boolean isSuccess = updateStatus(asyncTask.getId(),
AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);
if (isSuccess) {
// 把 task 更新為處理中,成功表示搶占到了這個任務,可以繼續處理
executorService.submit(convert(asyncTask));
} // else 被其他線程處理了
}
} while (true);
executorService.shutdown();
}
public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,
AsyncTask.STATUS newStatus) {
return true;
}
}
在上面的實現中,每一條記錄都需要通過一個數據庫的 update 操作來判斷是否可以繼續處理,開銷不小。一個改進的做法是:在 async_task 表增加一個 owner 字段,每個線程使用一個唯一的標識 tid(比如 UUID)。當 task 讀取線程要讀取任務時,先對 async_task 表里的未處理 task 執行 update,把狀態更新為處理中, owner 更新為自己的 tid。如果這個 update 的影響行數大于 0,表示搶占到了任務,然后根據 tid 去讀取任務,再分發給線程池去處理。
在存在并發競爭的情景下,很重要的就是借助數據庫事務的ACID來達到一種 CAS 的效果;正確處理并發問題總是需要基礎的 CAS 操作或鎖。
歡迎關注我的微信公眾號: coderbee筆記,可以更及時回復你的討論。
總結
以上是生活随笔為你收集整理的java任务_Java 任务处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 无法找到ant_无法找到与AN
- 下一篇: php面试基础项目,PHP面试经典题