Difference between revisions of "Collector Rosetta Stone"
Jump to navigation
Jump to search
Line 3: | Line 3: | ||
You will use {{AccumulatorCombinerReducerLink}} for our MapReduce assignments which is almost a one-to-one match with {{CollectorLink}} but de-ultra-uber-hyper-mega-super-lambdafied. | You will use {{AccumulatorCombinerReducerLink}} for our MapReduce assignments which is almost a one-to-one match with {{CollectorLink}} but de-ultra-uber-hyper-mega-super-lambdafied. | ||
+ | = One To One = | ||
{| class="wikitable" style="margin:auto" | {| class="wikitable" style="margin:auto" | ||
|+ Caption text | |+ Caption text | ||
|- | |- | ||
− | ! CSE 231s: AccumulatorCombinerReducer<V, A, R> !! | + | ! CSE 231s: AccumulatorCombinerReducer<V, A, R> !! !! Java Streams: Collector<T, A, R> |
|- | |- | ||
| <syntaxhighlight lang="java"> | | <syntaxhighlight lang="java"> | ||
Line 19: | Line 20: | ||
} | } | ||
</syntaxhighlight> || <syntaxhighlight lang="java"> | </syntaxhighlight> || <syntaxhighlight lang="java"> | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | ==methods== | |
− | + | === createMutableContainer a.k.a. supplier get=== | |
− | + | We use createMutableContainer() to create a new mutable container. For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>]. | |
− | + | ||
− | + | rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = reducer.createMutableContainer()</code> | |
− | + | ||
+ | === accumulate a.k.a. accumulator accept=== | ||
+ | We use accumulate(container,item) to accumulate a value. For classic map reduce this would add an item to a list. | ||
+ | |||
+ | rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>reducer.accumulate(container,item)</code> | ||
+ | |||
+ | === combine a.k.a. combiner apply=== | ||
+ | We use combine(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result. | ||
− | + | rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>reducer.combine(containerA,containerB)</code> | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | === reduce a.k.a. finisher apply=== | |
− | + | We use reduce(container) to reduce an accumulator. | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | rosetta stone: <code>collector.finisher().apply(container)</code> <math>\leftrightarrow</math> <code>r = reducer.reduce(container)</code> | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | </ | ||
− | |||
− | + | <===> | |
− | |||
− | |||
− | |||
− | + | <===> | |
− | + | <===> | |
− | + | <===> | |
− | |||
− | < | ||
− | + | </syntaxhighlight> || <syntaxhighlight lang="java"> | |
− | <syntaxhighlight lang="java"> | ||
public interface Collector<T, A, R> { | public interface Collector<T, A, R> { | ||
− | |||
− | |||
− | |||
− | |||
− | |||
Supplier<A> supplier(); | Supplier<A> supplier(); | ||
− | |||
− | |||
− | |||
− | |||
− | |||
BiConsumer<A, T> accumulator(); | BiConsumer<A, T> accumulator(); | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
BinaryOperator<A> combiner(); | BinaryOperator<A> combiner(); | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
Function<A, R> finisher(); | Function<A, R> finisher(); | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
− | + | |} | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | } | ||
[https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html interface Collector<T,A,R>] | [https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html interface Collector<T,A,R>] | ||
Line 153: | Line 69: | ||
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html interface Function<T,R>] | : [https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html interface Function<T,R>] | ||
− | == | + | == Converting Back And Forth == |
− | + | <syntaxhighlight lang="java"> | |
− | + | public class StreamUtils { | |
+ | public static <V, A, R> Collector<V, A, R> toCollector( | ||
+ | AccumulatorCombinerReducer<V, A, R> accumulatorCombinerReducer) { | ||
return new Collector<V, A, R>() { | return new Collector<V, A, R>() { | ||
@Override | @Override | ||
public Supplier<A> supplier() { | public Supplier<A> supplier() { | ||
− | return () -> | + | return () -> accumulatorCombinerReducer.createMutableContainer(); |
} | } | ||
@Override | @Override | ||
public BiConsumer<A, V> accumulator() { | public BiConsumer<A, V> accumulator() { | ||
− | return (container, item) -> | + | return (container, item) -> accumulatorCombinerReducer.accumulate(container, item); |
} | } | ||
@Override | @Override | ||
public BinaryOperator<A> combiner() { | public BinaryOperator<A> combiner() { | ||
− | return (a, b) -> | + | return (a, b) -> { |
+ | accumulatorCombinerReducer.combine(a, b); | ||
+ | return a; | ||
+ | }; | ||
} | } | ||
@Override | @Override | ||
public Function<A, R> finisher() { | public Function<A, R> finisher() { | ||
− | return (container) -> | + | return (container) -> accumulatorCombinerReducer.reduce(container); |
} | } | ||
@Override | @Override | ||
public Set<Characteristics> characteristics() { | public Set<Characteristics> characteristics() { | ||
− | return | + | return accumulatorCombinerReducer.collectorCharacteristics(); |
} | } | ||
}; | }; | ||
} | } | ||
− | public static <V, A, R> | + | public static <V, A, R> AccumulatorCombinerReducer<V, A, R> toAccumulatorCombinerReducer( |
− | return new | + | Collector<V, A, R> collector) { |
+ | return new AccumulatorCombinerReducer<V, A, R>() { | ||
@Override | @Override | ||
public A createMutableContainer() { | public A createMutableContainer() { | ||
Line 197: | Line 119: | ||
@Override | @Override | ||
− | public | + | public void combine(A containerA, A containerB) { |
− | + | A result = collector.combiner().apply(containerA, containerB); | |
+ | if (result != containerA) { | ||
+ | throw new RuntimeException("collector must combine b into a and return a."); | ||
+ | } | ||
} | } | ||
Line 212: | Line 137: | ||
}; | }; | ||
} | } | ||
− | |||
− | + | public static AccumulatorCombinerReducer<Integer, ?, Integer> summingIntAccumulatorCombinerReducer() { | |
− | + | return toAccumulatorCombinerReducer(Collectors.summingInt(Integer::intValue)); | |
− | + | } | |
− | + | } | |
− | + | </syntaxhighlight> | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− |
Revision as of 16:02, 23 February 2023
The interface Collector<T,A,R> serves the standard Java streams framework for MapReduce-like tasks with added in-memory processing capability a la Apache Spark.
You will use interface AccumulatorCombinerReducer<V,A,R> for our MapReduce assignments which is almost a one-to-one match with interface Collector<T,A,R> but de-ultra-uber-hyper-mega-super-lambdafied.
One To One
CSE 231s: AccumulatorCombinerReducer<V, A, R> | Java Streams: Collector<T, A, R> | |
---|---|---|
public interface AccumulatorCombinerReducer<V, A, R> {
A createMutableContainer();
void accumulate(A container, V item);
void combine(A containerA, A containerB);
R reduce(A container);
}
|
==methods==
=== createMutableContainer a.k.a. supplier get===
We use createMutableContainer() to create a new mutable container. For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>].
rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = reducer.createMutableContainer()</code>
=== accumulate a.k.a. accumulator accept===
We use accumulate(container,item) to accumulate a value. For classic map reduce this would add an item to a list.
rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>reducer.accumulate(container,item)</code>
=== combine a.k.a. combiner apply===
We use combine(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.
rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>reducer.combine(containerA,containerB)</code>
=== reduce a.k.a. finisher apply===
We use reduce(container) to reduce an accumulator.
rosetta stone: <code>collector.finisher().apply(container)</code> <math>\leftrightarrow</math> <code>r = reducer.reduce(container)</code>
<===>
<===>
<===>
<===>
|
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
}
|
Converting Back And Forth
public class StreamUtils {
public static <V, A, R> Collector<V, A, R> toCollector(
AccumulatorCombinerReducer<V, A, R> accumulatorCombinerReducer) {
return new Collector<V, A, R>() {
@Override
public Supplier<A> supplier() {
return () -> accumulatorCombinerReducer.createMutableContainer();
}
@Override
public BiConsumer<A, V> accumulator() {
return (container, item) -> accumulatorCombinerReducer.accumulate(container, item);
}
@Override
public BinaryOperator<A> combiner() {
return (a, b) -> {
accumulatorCombinerReducer.combine(a, b);
return a;
};
}
@Override
public Function<A, R> finisher() {
return (container) -> accumulatorCombinerReducer.reduce(container);
}
@Override
public Set<Characteristics> characteristics() {
return accumulatorCombinerReducer.collectorCharacteristics();
}
};
}
public static <V, A, R> AccumulatorCombinerReducer<V, A, R> toAccumulatorCombinerReducer(
Collector<V, A, R> collector) {
return new AccumulatorCombinerReducer<V, A, R>() {
@Override
public A createMutableContainer() {
return collector.supplier().get();
}
@Override
public void accumulate(A container, V item) {
collector.accumulator().accept(container, item);
}
@Override
public void combine(A containerA, A containerB) {
A result = collector.combiner().apply(containerA, containerB);
if (result != containerA) {
throw new RuntimeException("collector must combine b into a and return a.");
}
}
@Override
public R reduce(A container) {
return collector.finisher().apply(container);
}
@Override
public Set<Characteristics> collectorCharacteristics() {
return collector.characteristics();
}
};
}
public static AccumulatorCombinerReducer<Integer, ?, Integer> summingIntAccumulatorCombinerReducer() {
return toAccumulatorCombinerReducer(Collectors.summingInt(Integer::intValue));
}
}