Difference between revisions of "Collector Rosetta Stone"

From CSE231 Wiki
Jump to navigation Jump to search
Line 54: Line 54:
 
We use createMutableContainer() to create a new mutable container.  For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>].
 
We use createMutableContainer() to create a new mutable container.  For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>].
  
rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = reducer.createMutableContainer()</code>
+
rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = acr.createMutableContainer()</code>
  
 
== accumulate a.k.a. accumulator accept==
 
== accumulate a.k.a. accumulator accept==
 
We use accumulate(container,item) to accumulate a value.  For classic map reduce this would add an item to a list.
 
We use accumulate(container,item) to accumulate a value.  For classic map reduce this would add an item to a list.
  
rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>reducer.accumulate(container,item)</code>
+
rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>acr.accumulate(container,item)</code>
  
 
== combine a.k.a. combiner apply==
 
== combine a.k.a. combiner apply==
 
We use combine(containerA,containerB) to combine two accumulators.  You may combine containerB into containerA or containerA into containerB.  Just return whichever is the combined result.  
 
We use combine(containerA,containerB) to combine two accumulators.  You may combine containerB into containerA or containerA into containerB.  Just return whichever is the combined result.  
  
rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>reducer.combine(containerA,containerB)</code>
+
rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>acr.combine(containerA,containerB)</code>
 +
 
 +
'''Note:''' {{CollectorLink}} allows the client to combine containerB into containerA, containerA into containerB, or create a new container with the combined contents of containerA and containerB.  The only requirement is that resulting combined container is the return value.  {{AccumulatorCombinerReducerLink}} mandates that containerB be combined into containerA to take a bit of cruft out of the [[#Matrix_Map_Reduce_Framework_Assignment|Matrix MapReduce Framework exercise]].
  
 
== reduce a.k.a. finisher apply==
 
== reduce a.k.a. finisher apply==
 
We use reduce(container) to reduce an accumulator.
 
We use reduce(container) to reduce an accumulator.
  
rosetta stone: <code>collector.finisher().apply(container)</code> <math>\leftrightarrow</math> <code>r = reducer.reduce(container)</code>
+
rosetta stone: <code>collector.finisher().apply(container)</code> <math>\leftrightarrow</math> <code>r = acr.reduce(container)</code>
  
 
= Converting Back And Forth =
 
= Converting Back And Forth =

Revision as of 16:24, 23 February 2023

The interface Collector<T,A,R> serves the standard Java streams framework for MapReduce-like tasks with added in-memory processing capability a la Apache Spark.

You will use interface AccumulatorCombinerReducer<V,A,R> for our MapReduce assignments which is almost a one-to-one match with interface Collector<T,A,R> but de-ultra-uber-hyper-mega-super-lambdafied.

One To One

CSE 231s: AccumulatorCombinerReducer<V, A, R> Java Streams: Collector<T, A, R>
public interface AccumulatorCombinerReducer<V, A, R> {
	A createMutableContainer();

	void accumulate(A container, V item);

	void combine(A containerA, A containerB);

	R reduce(A container);
}
<===>

<===>

<===>

<===>
public interface Collector<T, A, R> {
    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();

    BinaryOperator<A> combiner();

    Function<A, R> finisher();
}

Differences

Each interface provides a way to customize 4 of 5 of the methods required for Spark-level MapReduce functionality. interface AccumulatorCombinerReducer<V,A,R> elects to perform the operations directly. interface Collector<T,A,R> elects to return single-abstract method interfaces which perform the operations.

interface Collector<T,A,R>

interface Supplier<T> returned by supplier()
interface BiConsumer<T,U> returned by accumulator()
interface BinaryOperator<T> returned by combiner()
interface Function<T,R> returned by finisher()

Methods

createMutableContainer a.k.a. supplier get

We use createMutableContainer() to create a new mutable container. For classic map reduce this would be a List<V>.

rosetta stone: container = collector.supplier().get() container = acr.createMutableContainer()

accumulate a.k.a. accumulator accept

We use accumulate(container,item) to accumulate a value. For classic map reduce this would add an item to a list.

rosetta stone: collector.accumulator().accept(container,item); acr.accumulate(container,item)

combine a.k.a. combiner apply

We use combine(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.

rosetta stone: collector.combiner().apply(containerA,containerB) acr.combine(containerA,containerB)

Note: interface Collector<T,A,R> allows the client to combine containerB into containerA, containerA into containerB, or create a new container with the combined contents of containerA and containerB. The only requirement is that resulting combined container is the return value. interface AccumulatorCombinerReducer<V,A,R> mandates that containerB be combined into containerA to take a bit of cruft out of the Matrix MapReduce Framework exercise.

reduce a.k.a. finisher apply

We use reduce(container) to reduce an accumulator.

rosetta stone: collector.finisher().apply(container) r = acr.reduce(container)

Converting Back And Forth

public class StreamUtils {
	public static <V, A, R> Collector<V, A, R> toCollector(
			AccumulatorCombinerReducer<V, A, R> accumulatorCombinerReducer) {
		return new Collector<V, A, R>() {
			@Override
			public Supplier<A> supplier() {
				return () -> accumulatorCombinerReducer.createMutableContainer();
			}

			@Override
			public BiConsumer<A, V> accumulator() {
				return (container, item) -> accumulatorCombinerReducer.accumulate(container, item);
			}

			@Override
			public BinaryOperator<A> combiner() {
				return (a, b) -> {
					accumulatorCombinerReducer.combine(a, b);
					return a;
				};
			}

			@Override
			public Function<A, R> finisher() {
				return (container) -> accumulatorCombinerReducer.reduce(container);
			}

			@Override
			public Set<Characteristics> characteristics() {
				return accumulatorCombinerReducer.collectorCharacteristics();
			}
		};
	}

	public static <V, A, R> AccumulatorCombinerReducer<V, A, R> toAccumulatorCombinerReducer(
			Collector<V, A, R> collector) {
		return new AccumulatorCombinerReducer<V, A, R>() {
			@Override
			public A createMutableContainer() {
				return collector.supplier().get();
			}

			@Override
			public void accumulate(A container, V item) {
				collector.accumulator().accept(container, item);
			}

			@Override
			public void combine(A containerA, A containerB) {
				A result = collector.combiner().apply(containerA, containerB);
				if (result != containerA) {
					throw new RuntimeException("collector must combine b into a and return a.");
				}
			}

			@Override
			public R reduce(A container) {
				return collector.finisher().apply(container);
			}

			@Override
			public Set<Characteristics> collectorCharacteristics() {
				return collector.characteristics();
			}
		};
	}

	public static AccumulatorCombinerReducer<Integer, ?, Integer> summingIntAccumulatorCombinerReducer() {
		return toAccumulatorCombinerReducer(Collectors.summingInt(Integer::intValue));
	}
}