MapReduce Reducer Assignment
credit for this assignment: Finn Voichick and Dennis Cosgrove
Motivation
interface Reducer<T,A,R>
is fundamental to the MapReduce Frameworks lab. Your frameworks will be general enough such that MapReduce is just a subset of what you will support.
In this studio we will build a ListAccumulatingReducerr which will implement the MapReduce style of accumulating all of the emitted values per key in a List.
We will also build a custom IntSumEfficientReducer to demonstrate the desired flexibility of the Reducer interface.
Background
We have chosen to use a non-lambdafied Reducer<V,A,R> interface versus adopting the standard interface Collector<T,A,R> from the standard Java streams framework as the basis for the MapReduce Frameworks Lab.
CSE 231 Selection: Reducer
public interface Reducer<V, A, R> { A createMutableContainer(); void accumulate(A container, V item); A combine(A containerA, A containerB); R reduce(A container); }
Java Streams Collector
public interface Collector<T, A, R> { // invoke supplier().get() to create a new mutable container Supplier<A> supplier(); // invoke accumulator().accept(container, item) to add item to a container BiConsumer<A, T> accumulator(); // invoke combiner().apply(containerA, containerB) to combine one container into the other BinaryOperator<A> combiner(); // invoke finisher().apply(container) to reduce a container to its final form Function<A, R> finisher(); }
Rosetta Stone
public static <V, A, R> Collector<V, A, R> toCollector(Reducer<V, A, R> reducer) { return new Collector<V, A, R>() { @Override public Supplier<A> supplier() { return () -> reducer.createMutableContainer(); } @Override public BiConsumer<A, V> accumulator() { return (container, item) -> reducer.accumulate(container, item); } @Override public BinaryOperator<A> combiner() { return (a, b) -> reducer.combine(a, b); } @Override public Function<A, R> finisher() { return (container) -> reducer.reduce(container); } @Override public Set<Characteristics> characteristics() { return reducer.collectorCharacteristics(); } }; } public static <V, A, R> Reducer<V, A, R> toReducer(Collector<V, A, R> collector) { return new Reducer<V, A, R>() { @Override public A createMutableContainer() { return collector.supplier().get(); } @Override public void accumulate(A container, V item) { collector.accumulator().accept(container, item); } @Override public A combine(A containerA, A containerB) { return collector.combiner().apply(containerA, containerB); } @Override public R reduce(A container) { return collector.finisher().apply(container); } @Override public Set<Characteristics> collectorCharacteristics() { return collector.characteristics(); } }; }
methods
createMutableContainer a.k.a. supplier get
We use createMutableContainer() to create a new mutable container. For classic map reduce this would be a List<V>.
rosetta stone: container = collector.supplier().get()
container = reducer.createMutableContainer()
accumulate a.k.a. accumulator accept
We use accumulate(container,item) to accumulate a value. For classic map reduce this would add an item to a list.
rosetta stone: collector.accumulator().accept(container,item);
reducer.accumulate(container,item)
combine a.k.a. combiner apply
We use combine(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.
rosetta stone: collector.combiner().apply(containerA,containerB)
reducer.combine(containerA,containerB)
reduce a.k.a. finisher apply
We use reduce(container) to reduce an accumulator.
rosetta stone: collector.finisher().apply(container)
r = reducer.reduce(container)
Code To Use
Code To Implement
ListAccumulatingReducer
The classic MapReduce Collector will collect all of the emitted values in a List.
class: | ListAccumulatingReducer.java | |
methods: | createMutableContainer accumulate combine |
|
package: | mapreduce.apps.reducer.listaccumulating.exercise | |
source folder: | student/src/main/java |
method: List<V> createMutableContainer()
(sequential implementation only)
method: void accumulate(List<V> container, V item)
(sequential implementation only)
method: List<V> combine(List<V> containerA, List<V> containerB)
(sequential implementation only)
IntSumEfficientReducer
MapReduce Apps like Word Count offer glaring opportunities to optimize the classic MapReduce append all of the 1s in a List and add them up later. In this section of the studio you will use MutableInt to simply add the values as they come in.
class: | IntSumEfficientReducer.java | |
methods: | createMutableContainer accumulate accumulate reduce |
|
package: | mapreduce.apps.reducer.efficient.intsum.exercise | |
source folder: | student/src/main/java |
method: MutableInt createMutableContainer()
(sequential implementation only)
method: void accumulate(MutableInt container, Integer item)
(sequential implementation only)
method: MutableInt combine(MutableInt containerA, MutableInt containerB)
(sequential implementation only)
method: reduce(MutableInt container)
(sequential implementation only)
Testing Your Solution
Correctness
top level
class: | CollectorStudioTestSuite.java | |
package: | mapreduce | |
source folder: | testing/src/test/java |
sub
class: | ClassicReducerTestSuite.java | |
package: | mapreduce | |
source folder: | testing/src/test/java |
class: | IntSumCollectorTestSuite.java | |
package: | mapreduce | |
source folder: | testing/src/test/java |