zipkin 自定义采样率_分组,采样和批处理– Java 8中的自定义收集器
zipkin 自定義采樣率
在第一篇文章的后續(xù)部分,這一次我們將編寫一些更有用的自定義收集器:用于按給定的標(biāo)準(zhǔn)進(jìn)行分組,采樣輸入,批量處理以及在固定大小的窗口上滑動(dòng)。
分組(計(jì)數(shù)事件,直方圖)
假設(shè)您有一些項(xiàng)目的集合,并且想要計(jì)算每個(gè)項(xiàng)目(相對于equals() )出現(xiàn)在此集合中的次數(shù)。 這可以使用Apache Commons Collections中的CollectionUtils.getCardinalityMap()來實(shí)現(xiàn)。 此方法采用Iterable<T>并返回Map<T, Integer> ,計(jì)算每個(gè)項(xiàng)目出現(xiàn)在集合中的次數(shù)。 但是,有時(shí)我們不使用equals()而是按輸入T的任意屬性分組。 例如,假設(shè)我們有一個(gè)Person對象列表,我們想計(jì)算男性與女性的數(shù)量(即Map<Sex, Integer> )或年齡分布。 有一個(gè)內(nèi)置的收集器Collectors.groupingBy(Function<T, K> classifier) –但是,它從鍵返回一個(gè)映射到映射到該鍵的所有項(xiàng)。 看到:
import static java.util.stream.Collectors.groupingBy;//...final List<Person> people = //... final Map<Sex, List<Person>> bySex = people.stream().collect(groupingBy(Person::getSex));這很有價(jià)值,但是在我們的例子中,不必要地構(gòu)建了兩個(gè)List<Person> 。 我只想知道人數(shù)。 沒有內(nèi)置的這種收集器,但是我們可以用一種非常簡單的方式來組成它:
import static java.util.stream.Collectors.counting; import static java.util.stream.Collectors.groupingBy;//...final Map<Sex, Long> bySex = people.stream().collect(groupingBy(Person::getSex, HashMap::new, counting()));這個(gè)重載版本的groupingBy()具有三個(gè)參數(shù)。 如前所述,第一個(gè)是鍵( 分類器 )功能。 第二個(gè)參數(shù)創(chuàng)建了一個(gè)新地圖,我們很快就會(huì)看到它為什么有用的原因。 counting()是一個(gè)嵌套的收集器,它將所有同性的人帶到一起,并將它們組合在一起-在我們的示例中,它們只是在到達(dá)時(shí)對其進(jìn)行計(jì)數(shù)。 能夠選擇地圖實(shí)現(xiàn)非常有用,例如在構(gòu)建年齡直方圖時(shí)。 我們想知道在給定年齡下有多少人-但年齡值應(yīng)排序:
final TreeMap<Integer, Long> byAge = people.stream().collect(groupingBy(Person::getAge, TreeMap::new, counting()));byAge.forEach((age, count) ->System.out.println(age + ":\t" + count));我們最終得到了一個(gè)從年齡(已排序)到具有該年齡的人數(shù)的TreeMap 。
采樣,批處理和滑動(dòng)窗口
Scala中的IterableLike.sliding()方法允許通過固定大小的滑動(dòng)窗口查看集合。 該窗口從開始處開始,在每次迭代中移動(dòng)給定數(shù)量的項(xiàng)目。 Java 8中缺少的這種功能允許使用多種有用的運(yùn)算符,例如計(jì)算移動(dòng)平均值 ,將大集合分成批處理(與Guava中的Lists.partition()比較)或每第n個(gè)元素進(jìn)行采樣。 我們將為Java 8實(shí)現(xiàn)具有類似行為的收集器。 讓我們從單元測試開始,它應(yīng)該簡要描述我們想要實(shí)現(xiàn)的目標(biāo):
import static com.nurkiewicz.CustomCollectors.sliding@Unroll class CustomCollectorsSpec extends Specification {def "Sliding window of #input with size #size and step of 1 is #output"() {expect:input.stream().collect(sliding(size)) == outputwhere:input | size | output[] | 5 | [][1] | 1 | [[1]][1, 2] | 1 | [[1], [2]][1, 2] | 2 | [[1, 2]][1, 2] | 3 | [[1, 2]]1..3 | 3 | [[1, 2, 3]]1..4 | 2 | [[1, 2], [2, 3], [3, 4]]1..4 | 3 | [[1, 2, 3], [2, 3, 4]]1..7 | 3 | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]1..7 | 6 | [1..6, 2..7]}def "Sliding window of #input with size #size and no overlapping is #output"() {expect:input.stream().collect(sliding(size, size)) == outputwhere:input | size | output[] | 5 | []1..3 | 2 | [[1, 2], [3]]1..4 | 4 | [1..4]1..4 | 5 | [1..4]1..7 | 3 | [1..3, 4..6, [7]]1..6 | 2 | [[1, 2], [3, 4], [5, 6]]}def "Sliding window of #input with size #size and some overlapping is #output"() {expect:input.stream().collect(sliding(size, 2)) == outputwhere:input | size | output[] | 5 | []1..4 | 5 | [[1, 2, 3, 4]]1..7 | 3 | [1..3, 3..5, 5..7]1..6 | 4 | [1..4, 3..6]1..9 | 4 | [1..4, 3..6, 5..8, 7..9]1..10 | 4 | [1..4, 3..6, 5..8, 7..10]1..11 | 4 | [1..4, 3..6, 5..8, 7..10, 9..11]}def "Sliding window of #input with size #size and gap of #gap is #output"() {expect:input.stream().collect(sliding(size, size + gap)) == outputwhere:input | size | gap | output[] | 5 | 1 | []1..9 | 4 | 2 | [1..4, 7..9]1..10 | 4 | 2 | [1..4, 7..10]1..11 | 4 | 2 | [1..4, 7..10]1..12 | 4 | 2 | [1..4, 7..10]1..13 | 4 | 2 | [1..4, 7..10, [13]]1..13 | 5 | 1 | [1..5, 7..11, [13]]1..12 | 5 | 3 | [1..5, 9..12]1..13 | 5 | 3 | [1..5, 9..13]}def "Sampling #input taking every #nth th element is #output"() {expect:input.stream().collect(sliding(1, nth)) == outputwhere:input | nth | output[] | 1 | [][] | 5 | []1..3 | 5 | [[1]]1..6 | 2 | [[1], [3], [5]]1..10 | 5 | [[1], [6]]1..100 | 30 | [[1], [31], [61], [91]]} }在Spock中使用數(shù)據(jù)驅(qū)動(dòng)的測試,我成功地立即編寫了將近40個(gè)測試用例,簡潔地描述了所有需求。 我希望這些對您來說都是清楚的,即使您以前從未看過這種語法。 我已經(jīng)假設(shè)存在方便的工廠方法:
public class CustomCollectors {public static <T> Collector<T, ?, List<List<T>>> sliding(int size) {return new SlidingCollector<>(size, 1);}public static <T> Collector<T, ?, List<List<T>>> sliding(int size, int step) {return new SlidingCollector<>(size, step);}}收藏家接連收到物品的事實(shí)使工作更加困難。 當(dāng)然,首先收集整個(gè)列表并在列表上滑動(dòng)會(huì)比較容易,但是卻很浪費(fèi)。 讓我們迭代構(gòu)建結(jié)果。 我什至不假裝通常可以并行執(zhí)行此任務(wù),所以我將不實(shí)現(xiàn)combiner() :
public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {private final int size;private final int step;private final int window;private final Queue<T> buffer = new ArrayDeque<>();private int totalIn = 0;public SlidingCollector(int size, int step) {this.size = size;this.step = step;this.window = max(size, step);}@Overridepublic Supplier<List<List<T>>> supplier() {return ArrayList::new;}@Overridepublic BiConsumer<List<List<T>>, T> accumulator() {return (lists, t) -> {buffer.offer(t);++totalIn;if (buffer.size() == window) {dumpCurrent(lists);shiftBy(step);}};}@Overridepublic Function<List<List<T>>, List<List<T>>> finisher() {return lists -> {if (!buffer.isEmpty()) {final int totalOut = estimateTotalOut();if (totalOut > lists.size()) {dumpCurrent(lists);}}return lists;};}private int estimateTotalOut() {return max(0, (totalIn + step - size - 1) / step) + 1;}private void dumpCurrent(List<List<T>> lists) {final List<T> batch = buffer.stream().limit(size).collect(toList());lists.add(batch);}private void shiftBy(int by) {for (int i = 0; i < by; i++) {buffer.remove();}}@Overridepublic BinaryOperator<List<List<T>>> combiner() {return (l1, l2) -> {throw new UnsupportedOperationException("Combining not possible");};}@Overridepublic Set<Characteristics> characteristics() {return EnumSet.noneOf(Characteristics.class);}}我花了很多時(shí)間來編寫此實(shí)現(xiàn),尤其是正確的finisher()所以請不要害怕。 關(guān)鍵部分是一個(gè)buffer ,它可以收集項(xiàng)目,直到可以形成一個(gè)滑動(dòng)窗口為止。 然后丟棄“最舊”的物品,并step向前滑動(dòng)窗口。 我對這種實(shí)現(xiàn)并不特別滿意,但是測試正在通過。 sliding(N) (與sliding(N, 1)同義詞)將允許計(jì)算N項(xiàng)目的移動(dòng)平均值。 sliding(N, N)將輸入分成大小為N批次。 sliding(1, N)獲取第N個(gè)元素(樣本)。 希望您會(huì)發(fā)現(xiàn)這個(gè)收藏家有用,喜歡!
翻譯自: https://www.javacodegeeks.com/2014/07/grouping-sampling-and-batching-custom-collectors-in-java-8.html
zipkin 自定義采樣率
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的zipkin 自定义采样率_分组,采样和批处理– Java 8中的自定义收集器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信4.0安卓版下载安装(微信4.0安卓
- 下一篇: linux启动进程和关闭进程的命令(li