Collector MapReduce Studio

From CSE231 Wiki
Revision as of 23:53, 23 July 2018 by Miles.p.cushing (talk | contribs) (Code To Use)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

credit for this assignment: Finn Voichick and Dennis Cosgrove

Motivation

interface Collector<T,A,R> is fundamental to the MapReduce Frameworks lab. Your frameworks will be general enough such that MapReduce is just a subset of what you will support.

In this studio we will build a ClassicReducer Collector which will implement the MapReduce style of accumulating all of the emitted values per key in a List.

We will also build a custom IntSumCollector to demonstrate the desired flexibility of the Collector interface.

Background

We debated between creating our own custom Reducer<V,A,R> interface versus adopting the standard interface Collector<T,A,R> from the standard Java streams framework as the basis for the MapReduce Frameworks Lab.

While It might have been slightly less confusing at the outset if we used something like this mythical non-existant interface below:

public interface Reducer<V, A, R> {
	A createMutableContainer();
	void accumulate(A container, V item);
	A combine(A containerA, A containerB);
	R reduce(A container);
}

we chose to go with the standard Collector<T,A,R> despite the extra level of indirection it requires (returning @Functional interfaces whose single abstract methods do the work).

We provide this mythical class CollectorReducerAdapter as a Rosetta Stone of sorts in an effort to reveal what each method is responsible for:

public class CollectorReducerAdapter<V, A, R> implements Reducer<V, A, R> {
	private final Collector<V,A,R> collector;
	public CollectorReducerAdapter(Collector<V,A,R> collector) {
		this.collector = collector;
	}
	@Override
	public A createMutableContainer() {
		return collector.supplier().get();
	}
	@Override
	public void accumulate(A container, V item) {
		collector.accumulator().accept(container, item);
	}
	@Override
	public A combine(A containerA, A containerB) {
		return collector.combiner().apply(containerA, containerB);
	}
	@Override
	public R reduce(A container) {
		return collector.finisher().apply(container);
	}
}

Collector

public interface Collector<T, A, R> {

	// invoke supplier().get() to create a new mutable container
	Supplier<A> supplier();

	// invoke accumulator().accept(container, item) to add item to a container
	BiConsumer<A, T> accumulator();

	// invoke combiner().apply(containerA, containerB) to combine one container into the other
	BinaryOperator<A> combiner();

	// invoke finisher().apply(container) to reduce a container to its final form
	Function<A, R> finisher();
}

supplier get

We use supplier().get() to create a new mutable container. For classic map reduce this would be a List<V>.

mythical code analogy: container = collector.supplier().get() \leftrightarrow container = accumulatorReducer.createContainer()

accumulator accept

We use accumulator().accept(container,item) to accumulate a value. For classic map reduce this would add an item to a list.

mythical code analogy: collector.accumulator().accept(container,item); \leftrightarrow accumulatorReducer.accumulate(container,item)

combiner apply

We use combiner().apply(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.

mythical code analogy: collector.combiner().apply(containerA,containerB) \leftrightarrow accumulatorReducer.combine(containerA,containerB)

finisher apply

We use finisher().apply(container) to reduce an accumulator.

mythical code analogy: collector.finisher().apply(container) \leftrightarrow r = accumulatorReducer.reduce(container)

Code To Use

interface Collector<T,A,R>

interface Supplier<T>
interface BiConsumer<T,U>
interface BinaryOperator<T>
interface Function<T,R>

class MutableInt

Knowing how to use anonymous classes will be very helpful!

Code To Implement

ClassicReducer

The classic MapReduce Collector will collect all of the emitted values in a List.

class: ClassicReducer.java Java.png
methods: supplier
accumulator
combiner
package: mapreduce.collector.studio
source folder: src/main/java

method: Supplier<List<V>> supplier() Sequential.svg (sequential implementation only)

method: BiConsumer<List<V>, V> accumulator() Sequential.svg (sequential implementation only)

method: BinaryOperator<List<V>> combiner() Sequential.svg (sequential implementation only)

IntSumCollector

MapReduce Apps like Word Count offer glaring opportunities to optimize the classic MapReduce append all of the 1s in a List and add them up later. In this section of the studio you will use MutableInt to simply add the values as they come in.

class: IntSumCollector.java Java.png
methods: supplier
accumulator
combiner
finisher
package: mapreduce.collector.intsum.studio
source folder: src/main/java

method: public Supplier<MutableInt> supplier() Sequential.svg (sequential implementation only)

method: public BiConsumer<MutableInt, Integer> accumulator() Sequential.svg (sequential implementation only)

method: public BinaryOperator<MutableInt> combiner() Sequential.svg (sequential implementation only)

method: public Function<MutableInt, Integer> finisher() Sequential.svg (sequential implementation only)

Testing Your Solution

Correctness

top level

class: CollectorStudioTestSuite.java Junit.png
package: mapreduce
source folder: src/test/java

sub

class: ClassicReducerTestSuite.java Junit.png
package: mapreduce
source folder: src/test/java
class: IntSumCollectorTestSuite.java Junit.png
package: mapreduce
source folder: src/test/java