日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java任务_Java 任务处理

發布時間:2025/5/22 java 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java任务_Java 任务处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近梳理其他同事以前寫的 job 后有點想法,記錄下。

一、業務場景

在大多數的系統都有類似這樣的邏輯,比如下單了給用戶贈送積分,用戶在論壇上發表了帖子,給用戶增加積分等等。

下單贈送積分,那么一個訂單肯定不能重復贈送積分,所以需要一些狀態來比較來哪些是已贈送的,哪些是沒有贈送的。或許可以在訂單表里加個字段來標記是否贈送了積分。

有時候,業務人員出于營銷的需要,可能要搞個某某時間段內下單返券的活動。難道又在訂單表里加個字段?肯定不能,誰知道還要搞多少活動呢。

二、實現

為了使核心的業務流程盡可能簡單高效,贈送積分、返券(后面簡稱為task)之類的邏輯應該通過異步的job來處理。

因為 task 的處理狀態不能放在核心的業務表里,所以,可以另外建一個表示異步任務的 async_task 表,結構如下:

-- 業務job處理 任務

create table async_task (

id number(11) primary key,

key_work varchar2(32), -- 不同業務邏輯的task用不同的keyword

biz_id char(32), -- 業務數據 ID,比如訂單號

biz_data varchar2(256), -- 核心的業務數據,用于避免關聯業務表;具體結構取決于keyword

status number, -- 任務的處理狀態; -2:未處理, -1:處理中, 0:已處理, 大于 0 的數字表示失敗次數

create_tm date, -- 任務的創建時間

modify_tm date -- 任務的修改時間

);

處于性能考慮,可以在 key_work 字段上建立分區,在 biz_id 上建立索引。

當業務表有需要處理的數據時,就往 async_task 插入一條相應的記錄(可以異步插入),異步 job 再從 async_task 表里取數據來處理。

注意:處理 task 時,要保證數據的一致性。所在的項目組曾出現過,下單返券的活動里,送券與更新狀態的操作沒有放在同一個事務里,出現券送了,狀態沒更新,重復送券的問題。一定要注意事務的正確處理。

三、單線程、多線程處理 task

不管是用單線程還是多線程,都要考慮有大量 task 的情況,所以不能一次把所有符合條件的 task 都讀取到內存里,一定要分頁。

單機單線程

不用考慮數據被其他線程重復處理的情況,順序處理即可:取一批數據處理,處理完了再取下一批,直到所有的都處理完了。

單機多線程

數據量大了,就不能用單線程慢慢地處理了。可以采用一個線程去讀取未處理的 task,然后提交到線程池去處理,等這批 task 處理完后再去讀取下一批,主流程如下:

// 直接使用 ThreadPoolExecutor 是為了使線程池的線程有特定的名字,任務隊列有邊界。

ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,

TimeUnit.SECONDS, new ArrayBlockingQueue(1000), // 有界隊列

new ThreadFactory() { // 使用定制的

private AtomicInteger counter = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("task-handler-"

+ counter.incrementAndGet());

return null;

}

}, new CallerRunsPolicy());

do {

List tasks = getUnhandleTask();

if (tasks.isEmpty()) {

break;

}

List> callables = convert2callables(tasks);

executorService.invokeAll(callables);

} while (true);

executorService.shutdown();

線程池采用 CallerRunsPolicy 策略是為了在線程池處理不完任務,線程池的任務隊列滿的時候,讀取 task 的線程可以直接處理 task,這樣既減緩了 task 的讀取速度,又可以加快 task 的處理速度。

多機處理

數據量實在太多,一臺機器處理不完,可以用多臺。

在多機處理的時候,上面的代碼就有問題了,task 可能在不同的機器上被重復處理。

任務被 getUnhandleTask() 方法讀取處理后、處理完成前,另一臺機器上的線程也讀取到了這個任務,發現是未處理的,它也會進行處理,這樣就出現重復處理了。正確的主流程如下:

public class AsyncTask {

private long id;

private int status;

public static enum STATUS {

UNHANDLE, HANDLING

}

public long getId() {

return id;

}

public void setId(long id) {

this.id = id;

}

public int getStatus() {

return status;

}

public void setStatus(int status) {

this.status = status;

}

}

public class TestTaskHandle {

private Callable convert(final AsyncTask task) {

return new Callable() {

@Override

public Object call() throws Exception {

return doWithTask(task);

}

};

}

private Object doWithTask(AsyncTask task) {

return null;

}

private List getUnhandleTask() {

return null;

}

public void multiMachine() {

ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,

TimeUnit.SECONDS, new ArrayBlockingQueue(1000),

new ThreadFactory() {

private AtomicInteger counter = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("task-handler-"

+ counter.incrementAndGet());

return null;

}

}, new CallerRunsPolicy());

do {

List tasks = getUnhandleTask();

if (tasks.isEmpty()) {

break;

}

for (AsyncTask asyncTask : tasks) {

// 把 RDBMS 的 update 操作當作一個 CAS 命令

boolean isSuccess = updateStatus(asyncTask.getId(),

AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);

if (isSuccess) {

// 把 task 更新為處理中,成功表示搶占到了這個任務,可以繼續處理

executorService.submit(convert(asyncTask));

} // else 被其他線程處理了

}

} while (true);

executorService.shutdown();

}

public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,

AsyncTask.STATUS newStatus) {

return true;

}

}

在上面的實現中,每一條記錄都需要通過一個數據庫的 update 操作來判斷是否可以繼續處理,開銷不小。一個改進的做法是:在 async_task 表增加一個 owner 字段,每個線程使用一個唯一的標識 tid(比如 UUID)。當 task 讀取線程要讀取任務時,先對 async_task 表里的未處理 task 執行 update,把狀態更新為處理中, owner 更新為自己的 tid。如果這個 update 的影響行數大于 0,表示搶占到了任務,然后根據 tid 去讀取任務,再分發給線程池去處理。

在存在并發競爭的情景下,很重要的就是借助數據庫事務的ACID來達到一種 CAS 的效果;正確處理并發問題總是需要基礎的 CAS 操作或鎖。

歡迎關注我的微信公眾號: coderbee筆記,可以更及時回復你的討論。

總結

以上是生活随笔為你收集整理的java任务_Java 任务处理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。