日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

使用Java 8处理并行数据库流

發(fā)布時間:2023/12/3 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Java 8处理并行数据库流 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

什么是并行數(shù)據(jù)庫流?

閱讀這篇文章,了解如何使用并行流和Speedment并行處理數(shù)據(jù)庫中的數(shù)據(jù)。 在許多情況下,并行流可能比通常的順序流快得多。

隨著Java 8的引入,我們得到了期待已久的Stream庫。 流的優(yōu)點之一是使流并行非常容易。 基本上,我們可以采用任何流,然后只應(yīng)用方法parallel()獲得并行流,而不是順序流。 默認情況下,并行流由公共ForkJoinPool執(zhí)行。

尖塔和公爵并行工作

因此,如果我們有工作量相對較高的工作項,那么并行流通常是有意義的。如果要在并行流管線中執(zhí)行的工作項在很大程度上未耦合并且在幾個線程相對較低。 同樣,合并并行結(jié)果的努力也必須相對較低。

Speedment是開源的ORM Java工具包和RuntimeJava工具,它將現(xiàn)有的數(shù)據(jù)庫及其表包裝到Java 8流中。 我們可以使用現(xiàn)有的數(shù)據(jù)庫并運行Speedment工具,它將生成與我們使用該工具選擇的表相對應(yīng)的POJO類。


Speedment的一項很酷的功能是,數(shù)據(jù)庫流使用標準的Stream語義支持并行性。 這樣, 與順序處理流相比,我們可以輕松地并行處理數(shù)據(jù)庫內(nèi)容并更快地產(chǎn)生結(jié)果!

加速入門

訪問GitHub上的開放源Speedment ,了解如何開始Speedment項目。 將工具連接到現(xiàn)有數(shù)據(jù)庫應(yīng)該非常容易。

在本文中,下面的示例使用以下MySQL表。

CREATE TABLE `prime_candidate` (`id` int(11) NOT NULL AUTO_INCREMENT,`value` bigint(20) NOT NULL,`prime` bit(1) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB;

這個想法是人們可以在表中插入值,然后我們將編寫一個應(yīng)用程序來計算插入的值是否是質(zhì)數(shù)。 在實際情況下,我們可以使用MySQL,PostgreSQL或MariaDB數(shù)據(jù)庫中的任何表。

編寫順序流解決方案

首先,我們需要一個方法,如果值是素數(shù)則返回。 這是一種簡單的方法。 請注意, 故意使算法變慢,因此我們可以清楚地了解并行流在昂貴的操作上的效果。

public class PrimeUtil {/*** Returns if the given parameter is a prime number.** @param n the given prime number candidate* @return if the given parameter is a prime number*/static boolean isPrime(long n) {// primes are equal or greater than 2 if (n < 2) {return false;}// check if n is evenif (n % 2 == 0) {// 2 is the only even prime// all other even n:s are notreturn n == 2;}// if odd, then just check the odds// up to the square root of n// for (int i = 3; i * i <= n; i += 2) {//// Make the methods purposely slow by// checking all the way up to nfor (int i = 3; i <= n; i += 2) {if (n % i == 0) {return false;}}return true;}}

同樣,這篇文章的目的不是設(shè)計一種有效的質(zhì)數(shù)確定方法。

使用這種簡單的質(zhì)數(shù)方法,我們現(xiàn)在可以輕松編寫一個Speedment應(yīng)用程序,該應(yīng)用程序?qū)呙钄?shù)據(jù)庫表以查找未確定的質(zhì)數(shù)候選者,然后將確定它們是否為質(zhì)數(shù)并相應(yīng)地更新表。 看起來可能是這樣:

final JavapotApplication app = new JavapotApplicationBuilder().withPassword("javapot") // Replace with the real password.withLogging(LogType.STREAM).build();final Manager<PrimeCandidate> candidates = app.getOrThrow(PrimeCandidateManager.class);candidates.stream().filter(PrimeCandidate.PRIME.isNull()) // Filter out undetermined primes.map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))) // Sets if it is a prime or not.forEach(candidates.updater()); // Applies the Manager's updater

最后一部分包含有趣的內(nèi)容。 首先,我們在“ prime”列為
使用stream().filter(PrimeCandidate.PRIME.isNull())方法stream().filter(PrimeCandidate.PRIME.isNull()) null 。 重要的是要了解,Speedment流實現(xiàn)將識別過濾謂詞,并能夠使用該謂詞來減少從數(shù)據(jù)庫中實際提取的候選者的數(shù)量(例如,“ SELECT * FROM FROM WHERE prime IS NULL”將使用)。

然后,對每個這樣的總理候選人PC,我們無論是“黃金”列設(shè)置為true ,如果pc.getValue()是一個主要的或false ,如果pc.getValue()是不是一個素數(shù)。 有趣的是, pc.setPrime()方法返回實體pc本身,使我們能夠輕松地標記多個流操作。 在最后一行,我們通過應(yīng)用candidates.updater()函數(shù)update。 candidates.updater()將檢查結(jié)果更新數(shù)據(jù)庫。 因此,該應(yīng)用程序的主要功能實際上是單行的(分為五行以提高可讀性)。

現(xiàn)在,在測試應(yīng)用程序之前,我們需要生成一些測試數(shù)據(jù)輸入。 這是使用Speedment如何完成的示例:

final JavapotApplication app = new JavapotApplicationBuilder().withPassword("javapot") // Replace with the real password.build();final Manager<PrimeCandidate> candidates = app.getOrThrow(PrimeCandidateManager.class);final Random random = new SecureRandom();// Create a bunch of new prime candidatesrandom.longs(1_100, 0, Integer.MAX_VALUE).mapToObj(new PrimeCandidateImpl()::setValue) // Sets the random value .forEach(candidates.persister()); // Applies the Manager's persister function

同樣,我們只需幾行代碼就可以完成我們的任務(wù)。

嘗試默認的并行流

如果要并行化流,則只需向以前的解決方案中添加一個方法即可:

candidates.stream().parallel() // Now indicates a parallel stream.filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidates.updater()); // Applies the Manager's updater

和我們平行! 但是,默認情況下,Speedment使用Java的默認并行化行為(在Spliterators::spliteratorUnknownSize定義),該行為針對非計算密集型操作進行了優(yōu)化。 如果我們分析Java的默認并行化行為,我們將確定它將對第一個1024個工作項使用第一個線程,對隨后的2 * 1024 = 2048個工作項使用第二個線程,然后對第三個線程使用3 * 1024 = 3072個工作項線程等。

這對我們的應(yīng)用程序不利,因為每個應(yīng)用程序的成本都很高。 如果我們正在計算1100個主要候選對象,我們將僅使用兩個線程,因為第一個線程將處理前1024個項目,第二個線程將處理其余的76個項目。現(xiàn)代服務(wù)器的線程要多得多。 閱讀下一節(jié),了解如何解決此問題。

內(nèi)置并行化策略

速度有許多內(nèi)置的并行化策略,我們可以根據(jù)工作項的預(yù)期計算需求進行選擇。 這是對僅具有一種默認策略的Java 8的改進。 內(nèi)置的并行策略是:

@FunctionalInterface public interface ParallelStrategy {/*** A Parallel Strategy that is Java's default <code>Iterator</code> to* <code>Spliterator</code> converter. It favors relatively large sets (in* the ten thousands or more) with low computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityDefault() {...}/*** A Parallel Strategy that favors relatively small to medium sets with* medium computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityMedium() {...}/*** A Parallel Strategy that favors relatively small to medium sets with high* computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityHigh() {...}/*** A Parallel Strategy that favors small sets with extremely high* computational overhead. The set will be split up in solitary elements* that are executed separately in their own thread.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityExtreme() {...}<T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics);static ParallelStrategy of(final int... batchSizes) {return new ParallelStrategy() {@Overridepublic <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {return ConfigurableIteratorSpliterator.of(iterator, characteristics, batchSizes);}};}

應(yīng)用并行策略

我們要做的唯一一件事就是為這樣的管理器配置并行化策略,我們很高興:

Manager<PrimeCandidate> candidatesHigh = app.configure(PrimeCandidateManager.class).withParallelStrategy(ParallelStrategy.computeIntensityHigh()).build();candidatesHigh.stream() // Better parallel performance for our case!.parallel().filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidatesHigh.updater());

ParallelStrategy.computeIntensityHigh()策略會將工作項分解成更小的塊。 因為我們現(xiàn)在將使用所有可用的線程,所以這將使我們獲得更好的性能。 如果我們深入研究,可以看到該策略的定義如下:

private final static int[] BATCH_SIZES = IntStream.range(0, 8).map(ComputeIntensityUtil::toThePowerOfTwo).flatMap(ComputeIntensityUtil::repeatOnHalfAvailableProcessors).toArray();

這意味著,在具有8個線程的計算機上,它將在線程1-4上放置一個項目,在線程5-8上放置兩個項目,當(dāng)任務(wù)完成時,接下來的四個可用線程上將有四個項目,然后是八個依此類推,直到達到256,這是任何線程上的最大項目數(shù)。 顯然,對于該特定問題,此策略比Java的標準策略好得多。

這是常見的ForkJoinPool中的線程在我的8線程筆記本電腦上的樣子:

創(chuàng)建自己的并行策略

Speedment的一件很酷的事情是,我們可以很容易地編寫并行化策略,然后將其注入流中。 考慮以下自定義并行化策略:

public static class MyParallelStrategy implements ParallelStrategy {private final static int[] BATCH_SIZES = {1, 2, 4, 8};@Overridepublic <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {return ConfigurableIteratorSpliterator.of(iterator, characteristics, BATCH_SIZES);}}

實際上,它可以表達得更短:

ParallelStrategy myParallelStrategy = ParallelStrategy.of(1, 2, 4, 8);

此策略將在第一個可用線程上放置一個工作項,在第二個可用線程上放置兩個,在第三個線程上放置四個,在第四個線程上放置八個,其中八個是數(shù)組中的最后一位。 最后一位數(shù)字將用于所有后續(xù)可用線程。 因此,訂單實際上變成了1、2、4、8、8、8、8...。現(xiàn)在,我們可以使用以下新策略:

Manager<PrimeCandidate> candidatesCustom = app.configure(PrimeCandidateManager.class).withParallelStrategy(myParallelStrategy).build();candidatesCustom.stream().parallel().filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidatesCustom.updater());

瞧! 我們完全控制工作項在可用執(zhí)行線程上的布局方式。

基準測試

所有基準都使用相同的主要候選者輸入。 測試是在MacBook Pro,具有4個物理核心和8個線程的2.2 GHz Intel Core i7上進行的。

StrategySequential 265 s (One thread processed all 1100 items) Parallel Default Java 8 235 s (Because 1024 items were processed by thread 1 and 76 items by thread 2) Parallel computeIntensityHigh() 69 s (All 4 hardware cores were used)

結(jié)論

Speedment支持并行處理數(shù)據(jù)庫內(nèi)容。 Speedment支持多種并行策略,以允許充分利用執(zhí)行環(huán)境。

我們可以輕松地創(chuàng)建自己的并行策略,并在Speedment流中使用它們。 通過仔細選擇一種并行策略而不是僅使用Java的默認策略,可以顯著提高性能。

翻譯自: https://www.javacodegeeks.com/2016/10/work-parallel-database-streams-using-java-8.html

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的使用Java 8处理并行数据库流的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。