Java EE 7批处理和魔兽世界–第1部分
這是我在上一個JavaOne上的會議之一。 這篇文章將擴(kuò)展主題并使用Batch JSR-352 API進(jìn)入一個實(shí)際的應(yīng)用程序。 此應(yīng)用程序與MMORPG 魔獸世界集成。
由于JSR-352是Java EE世界中的新規(guī)范,所以我認(rèn)為許多人不知道如何正確使用它。 確定本規(guī)范適用的用例也可能是一個挑戰(zhàn)。 希望該示例可以幫助您更好地理解用例。
抽象
《魔獸世界》是一款全球超過800萬玩家玩的游戲。 該服務(wù)按地區(qū)提供:美國(US) ,歐洲(EU) ,中國和韓國。 每個區(qū)域都有一組稱為Realm的服務(wù)器,您可以使用這些服務(wù)器進(jìn)行連接以玩游戲。 對于此示例,我們僅研究美國和歐盟地區(qū)。
該游戲最有趣的功能之一是允許您使用拍賣行買賣稱為“ 物品”的游戲內(nèi)商品。 每個領(lǐng)域都有兩個拍賣行 。 平均每個領(lǐng)域交易約70.000 項(xiàng) 。 讓我們計算一些數(shù)字:
- 512 境界 ( 美國和歐盟 )
- 每個領(lǐng)域 7萬個 物品
- 整個商品超過3500萬
數(shù)據(jù)
《魔獸世界》的另一個有趣之處在于,開發(fā)人員提供了REST API來訪問大多數(shù)游戲內(nèi)信息,包括拍賣行的數(shù)據(jù)。 在此處檢查完整的API。
拍賣行的數(shù)據(jù)分兩步獲得。 首先,我們需要查詢對應(yīng)的Auction House Realm REST端點(diǎn),以獲取對JSON文件的引用。 接下來,我們需要訪問該URL并下載包含所有拍賣行 物品信息的文件。 這是一個例子:
http://eu.battle.net/api/wow/auction/data/aggra-portugues
應(yīng)用程序
我們的目標(biāo)是建立一個下載拍賣行的應(yīng)用程序,對其進(jìn)行處理并提取指標(biāo)。 這些指標(biāo)將建立商品價格隨時間變化的歷史記錄。 誰知道? 也許借助這些信息,我們可以預(yù)測價格波動并在最佳時間購買或出售商品 。
設(shè)置
對于設(shè)置,我們將在Java EE 7中使用一些其他功能:
- Java EE 7
- 角JS
- 角度ng-grid
- UI引導(dǎo)程序
- 谷歌圖表
- 野蠅
職位
批處理JSR-352作業(yè)將執(zhí)行主要工作。 作業(yè)是封裝整個批處理過程的實(shí)體。 作業(yè)將通過作業(yè)規(guī)范語言連接在一起。 使用JSR-352 ,作業(yè)只是這些步驟的容器。 它組合了邏輯上屬于流程的多個步驟。
我們將把業(yè)務(wù)登錄分為三個工作:
- 準(zhǔn)備 –創(chuàng)建所需的所有支持?jǐn)?shù)據(jù)。 列出領(lǐng)域 ,創(chuàng)建文件夾以復(fù)制文件。
- 文件 –查詢領(lǐng)域以檢查是否有新文件要處理。
- 處理 –下載文件,處理數(shù)據(jù),提取指標(biāo)。
編碼
后端–具有Java 8的Java EE 7
大多數(shù)代碼將在后端。 我們需要Batch JSR-352 ,但我們還將使用Java EE的許多其他技術(shù):例如JPA , JAX-RS , CDI和JSON-P 。
由于“ 準(zhǔn)備工作”僅用于初始化應(yīng)用程序資源以進(jìn)行處理,因此我將跳過它,而深入到最有趣的部分。
文件作業(yè)
文件作業(yè)是AbstractBatchlet的實(shí)現(xiàn)。 批處理是批處理規(guī)范中可用的最簡單的處理樣式。 這是一個面向任務(wù)的步驟,其中任務(wù)被調(diào)用一次,執(zhí)行并返回退出狀態(tài)。 對于執(zhí)行各種非面向項(xiàng)目的任務(wù),例如執(zhí)行命令或執(zhí)行文件傳輸,此類型最有用。 在這種情況下,我們的Batchlet將在每個Realm上對每個域發(fā)出REST請求,以進(jìn)行迭代,并使用包含要處理的數(shù)據(jù)的文件檢索URL。 這是代碼:
LoadAuctionFilesBatchlet
@Named public class LoadAuctionFilesBatchlet extends AbstractBatchlet {@Injectprivate WoWBusiness woWBusiness;@Inject@BatchProperty(name = "region")private String region;@Inject@BatchProperty(name = "target")private String target;@Overridepublic String process() throws Exception {List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);return "COMPLETED";}void getRealmAuctionFileInformation(Realm realm) {try {Client client = ClientBuilder.newClient();Files files = client.target(target + realm.getSlug()).request(MediaType.TEXT_PLAIN).async().get(Files.class).get(2, TimeUnit.SECONDS);files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile));} catch (Exception e) {getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail());}}void createAuctionFile(Realm realm, AuctionFile auctionFile) {auctionFile.setRealm(realm);auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json");auctionFile.setFileStatus(FileStatus.LOADED);if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {woWBusiness.createAuctionFile(auctionFile);}} }關(guān)于此的一個很酷的事情是Java 8的使用parallelStream()一次調(diào)用多個REST請求很容易! 您真的可以注意到其中的區(qū)別。 如果您想嘗試一下,只需運(yùn)行示例,然后用stream()替換parallelStream() stream()并檢出即可。 在我的機(jī)器上,使用parallelStream()可使任務(wù)執(zhí)行速度提高約5或6倍。
更新資料
通常,我不會使用這種方法。 我這樣做了,因?yàn)椴糠诌壿嬌婕罢{(diào)用慢速的REST請求,而parallelStreams確實(shí)在這里閃耀。 可以使用批處理分區(qū)執(zhí)行此操作,但是很難實(shí)現(xiàn)。 我們還需要每次都在服務(wù)器池中收集新數(shù)據(jù),因此,如果跳過一個或兩個文件,這并不可怕。 請記住,如果您不想錯過任何一條記錄,塊處理樣式將更適合。 感謝Simon Simonelli引起我的注意。
由于美國和歐盟的領(lǐng)域要求調(diào)用不同的REST端點(diǎn),因此它們非常適合分區(qū)。 分區(qū)意味著該任務(wù)將運(yùn)行到多個線程中。 每個分區(qū)一個線程。 在這種情況下,我們有兩個分區(qū)。
要完成作業(yè)定義,我們需要提供一個JoB XML文件。 這需要放置在META-INF/batch-jobs目錄中。 這是此作業(yè)的files-job.xml :
files-job.xml
<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"><step id="loadRealmAuctionFileStep"><batchlet ref="loadAuctionFilesBatchlet"><properties><property name="region" value="#{partitionPlan['region']}"/><property name="target" value="#{partitionPlan['target']}"/></properties></batchlet><partition><plan partitions="2"><properties partition="0"><property name="region" value="US"/><property name="target" value="http://us.battle.net/api/wow/auction/data/"/></properties><properties partition="1"><property name="region" value="EU"/><property name="target" value="http://eu.battle.net/api/wow/auction/data/"/></properties></plan></partition></step> </job>在files-job.xml我們需要定義我們Batchlet在batchlet元素。 對于分區(qū),只需定義partition元素并為每個plan分配不同的properties 。 然后,可以使用這些properties使用表達(dá)式#{partitionPlan['region']}和#{partitionPlan['target']}將值后期綁定到LoadAuctionFilesBatchlet 。 這是一種非常簡單的表達(dá)式綁定機(jī)制,僅適用于簡單的屬性和字符串。
處理作業(yè)
現(xiàn)在,我們要處理領(lǐng)域拍賣數(shù)據(jù)文件。 使用上一份工作中的信息,我們現(xiàn)在可以下載文件并對數(shù)據(jù)進(jìn)行某些處理。 JSON文件具有以下結(jié)構(gòu):
item-auctions-sample.json
{"realm": {"name": "Grim Batol","slug": "grim-batol"},"alliance": {"auctions": [{"auc": 279573567, // Auction Id"item": 22792, // Item for sale Id"owner": "Miljanko", // Seller Name"ownerRealm": "GrimBatol", // Realm"bid": 3800000, // Bid Value"buyout": 4000000, // Buyout Value"quantity": 20, // Numbers of items in the Auction"timeLeft": "LONG", // Time left for the Auction"rand": 0,"seed": 1069994368},{"auc": 278907544,"item": 40195,"owner": "Mongobank","ownerRealm": "GrimBatol","bid": 38000,"buyout": 40000,"quantity": 1,"timeLeft": "VERY_LONG","rand": 0,"seed": 1978036736}]},"horde": {"auctions": [{"auc": 278268046,"item": 4306,"owner": "Thuglifer","ownerRealm": "GrimBatol","bid": 570000,"buyout": 600000,"quantity": 20,"timeLeft": "VERY_LONG","rand": 0,"seed": 1757531904},{"auc": 278698948,"item": 4340,"owner": "Celticpala","ownerRealm": "Aggra(Português)","bid": 1000000,"buyout": 1000000,"quantity": 10,"timeLeft": "LONG","rand": 0,"seed": 0}]} }該文件包含從其下載的領(lǐng)域的拍賣列表。 在每個記錄中,我們可以檢查待售物品,價格,賣方和拍賣結(jié)束前的剩余時間。 拍賣的算法按拍賣行類型進(jìn)行匯總: Alliance和Horde 。
對于process-job我們要讀取JSON文件,轉(zhuǎn)換數(shù)據(jù)并將其保存到數(shù)據(jù)庫。 這可以通過塊處理來實(shí)現(xiàn)。 塊是一種ETL(提取–轉(zhuǎn)換–加載)樣式的處理,適合處理大量數(shù)據(jù)。 塊一次讀取一個數(shù)據(jù),并在事務(wù)內(nèi)創(chuàng)建要寫出的塊。 從ItemReader讀入一項(xiàng),交給ItemProcessor并進(jìn)行聚合。 一旦讀取的項(xiàng)目數(shù)等于提交間隔,就通過ItemWriter寫入整個塊,然后提交事務(wù)。
ItemReader
實(shí)際文件太大,以致無法將它們完全加載到內(nèi)存中,否則可能會耗盡它。 相反,我們使用JSON-P API以流方式解析數(shù)據(jù)。
AuctionDataItemReader
@Named public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {private JsonParser parser;private AuctionHouse auctionHouse;@Injectprivate JobContext jobContext;@Injectprivate WoWBusiness woWBusiness;@Overridepublic void open(Serializable checkpoint) throws Exception {setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));AuctionFile fileToProcess = getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSING);woWBusiness.updateAuctionFile(fileToProcess);}@Overridepublic void close() throws Exception {AuctionFile fileToProcess = getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSED);woWBusiness.updateAuctionFile(fileToProcess);}@Overridepublic Object readItem() throws Exception {while (parser.hasNext()) {JsonParser.Event event = parser.next();Auction auction = new Auction();switch (event) {case KEY_NAME:updateAuctionHouseIfNeeded(auction);if (readAuctionItem(auction)) {return auction;}break;}}return null;}@Overridepublic Serializable checkpointInfo() throws Exception {return null;}protected void updateAuctionHouseIfNeeded(Auction auction) {if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {auctionHouse = AuctionHouse.ALLIANCE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {auctionHouse = AuctionHouse.HORDE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {auctionHouse = AuctionHouse.NEUTRAL;}auction.setAuctionHouse(auctionHouse);}protected boolean readAuctionItem(Auction auction) {if (parser.getString().equalsIgnoreCase("auc")) {parser.next();auction.setAuctionId(parser.getLong());parser.next();parser.next();auction.setItemId(parser.getInt());parser.next();parser.next();parser.next();parser.next();auction.setOwnerRealm(parser.getString());parser.next();parser.next();auction.setBid(parser.getInt());parser.next();parser.next();auction.setBuyout(parser.getInt());parser.next();parser.next();auction.setQuantity(parser.getInt());return true;}return false;}public void setParser(JsonParser parser) {this.parser = parser;} }要打開JSON Parse流,我們需要Json.createParser并傳遞輸入流的引用。 要讀取元素,我們只需要調(diào)用hasNext()和next()方法。 這將返回一個JsonParser.Event ,它使我們能夠檢查解析器在流中的位置。 從Batch API ItemReader的readItem()方法中讀取并返回元素。 當(dāng)沒有更多元素可讀取時,返回null以完成處理。 注意,我們還實(shí)現(xiàn)了從ItemReader open和close的方法。 這些用于初始化和清理資源。 它們只執(zhí)行一次。
ItemProcessor
ItemProcessor是可選的。 它用于轉(zhuǎn)換讀取的數(shù)據(jù)。 在這種情況下,我們需要向競價添加其他信息。
AuctionDataItemProcessor
@Named public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {@Overridepublic Object processItem(Object item) throws Exception {Auction auction = (Auction) item;auction.setRealm(getContext().getRealm());auction.setAuctionFile(getContext().getFileToProcess());return auction;} }ItemWriter
最后,我們只需要將數(shù)據(jù)寫到數(shù)據(jù)庫中即可:
AuctionDataItemWriter
@Named public class AuctionDataItemWriter extends AbstractItemWriter {@PersistenceContextprotected EntityManager em;@Overridepublic void writeItems(List<Object> items) throws Exception {items.forEach(em::persist);} }在我的計算機(jī)上,具有70 k記錄文件的整個過程大約需要20秒。 我確實(shí)注意到了一些非常有趣的事情。 在編寫此代碼之前,我使用的是注入的EJB,它通過persist操作來調(diào)用方法。 這總共花費(fèi)了30秒,因此注入EntityManager并執(zhí)行持久操作可以直接為我節(jié)省三分之一的處理時間。 我只能推測該延遲是由于堆棧調(diào)用的增加而造成的,其中EJB攔截器位于中間。 這是在Wildfly中發(fā)生的。 我將對此進(jìn)行進(jìn)一步調(diào)查。
要定義塊,我們需要將其添加到process-job.xml文件中:
process-job.xml
<step id="processFile" next="moveFileToProcessed"><chunk item-count="100"><reader ref="auctionDataItemReader"/><processor ref="auctionDataItemProcessor"/><writer ref="auctionDataItemWriter"/></chunk> </step>在item-count屬性中,我們定義每個處理塊中可以容納多少個元素。 這意味著每100個事務(wù)就會提交一次。 這對于保持較小的事務(wù)大小和檢查數(shù)據(jù)很有用。 如果我們需要停止然后重新開始操作,我們可以這樣做而不必再次處理每個項(xiàng)目。 我們必須自己編寫邏輯代碼。 該示例中不包括此功能,但以后會做。
跑步
要運(yùn)行作業(yè),我們需要獲得JobOperator的引用。 JobOperator提供了一個界面來管理作業(yè)處理的各個方面,包括操作命令(例如開始,重新啟動和停止),以及與作業(yè)存儲庫相關(guān)的命令,例如檢索作業(yè)和步驟執(zhí)行。
要運(yùn)行先前的files-job.xml Job,我們執(zhí)行:
執(zhí)行工作
JobOperator jobOperator = BatchRuntime.getJobOperator(); jobOperator.start("files-job", new Properties());請注意,我們使用Job xml文件的名稱,而沒有擴(kuò)展名到JobOperator 。
下一步
我們?nèi)匀恍枰獏R總數(shù)據(jù)以提取指標(biāo)并將其顯示在網(wǎng)頁中。 這篇文章已經(jīng)很長了,因此我將在以后的文章中介紹以下步驟。 無論如何,該部分的代碼已經(jīng)在Github存儲庫中。 檢查資源部分。
資源資源
您可以從我的github存儲庫中克隆完整的工作副本,然后將其部署到Wildfly。 您可以在此處找到說明進(jìn)行部署。
翻譯自: https://www.javacodegeeks.com/2014/10/java-ee-7-batch-processing-and-world-of-warcraft-part-1.html
總結(jié)
以上是生活随笔為你收集整理的Java EE 7批处理和魔兽世界–第1部分的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 城市道路路内停车泊位设置规范(城市道路路
- 下一篇: Java开发人员应该知道的5种错误跟踪工