Difference between revisions of "Collector Rosetta Stone"

From CSE231 Wiki
Jump to navigation Jump to search
Line 40: Line 40:
 
</syntaxhighlight>
 
</syntaxhighlight>
 
|}
 
|}
 +
 +
= Differences =
 +
Each interface provides a way to customize 4 of 5 of the methods required for Spark-level MapReduce functionality.  {{AccumulatorCombinerReducerLink}} elects to perform the operations directly.  {{CollectorLink}} elects to return single-abstract method interfaces which perform the operations.
  
 
[https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html interface Collector<T,A,R>]
 
[https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html interface Collector<T,A,R>]
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/Supplier.html interface Supplier<T>]
+
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/Supplier.html interface Supplier<T>] returned by supplier()
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/BiConsumer.html interface BiConsumer<T,U>]
+
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/BiConsumer.html interface BiConsumer<T,U>] returned by accumulator()
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/BinaryOperator.html interface BinaryOperator<T>]
+
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/BinaryOperator.html interface BinaryOperator<T>] returned by combiner()
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html interface Function<T,R>]
+
: [https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html interface Function<T,R>] returned by finisher()
  
==methods==
+
=Methods=
=== createMutableContainer a.k.a. supplier get===
+
== createMutableContainer a.k.a. supplier get==
 
We use createMutableContainer() to create a new mutable container.  For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>].
 
We use createMutableContainer() to create a new mutable container.  For classic map reduce this would be a [https://docs.oracle.com/javase/8/docs/api/java/util/List.html List<V>].
  
 
rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = reducer.createMutableContainer()</code>
 
rosetta stone: <code>container = collector.supplier().get()</code> <math>\leftrightarrow</math> <code>container = reducer.createMutableContainer()</code>
  
=== accumulate a.k.a. accumulator accept===
+
== accumulate a.k.a. accumulator accept==
 
We use accumulate(container,item) to accumulate a value.  For classic map reduce this would add an item to a list.
 
We use accumulate(container,item) to accumulate a value.  For classic map reduce this would add an item to a list.
  
 
rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>reducer.accumulate(container,item)</code>
 
rosetta stone: <code>collector.accumulator().accept(container,item);</code> <math>\leftrightarrow</math> <code>reducer.accumulate(container,item)</code>
  
=== combine a.k.a. combiner apply===
+
== combine a.k.a. combiner apply==
 
We use combine(containerA,containerB) to combine two accumulators.  You may combine containerB into containerA or containerA into containerB.  Just return whichever is the combined result.  
 
We use combine(containerA,containerB) to combine two accumulators.  You may combine containerB into containerA or containerA into containerB.  Just return whichever is the combined result.  
  
 
rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>reducer.combine(containerA,containerB)</code>
 
rosetta stone: <code>collector.combiner().apply(containerA,containerB)</code> <math>\leftrightarrow</math> <code>reducer.combine(containerA,containerB)</code>
  
=== reduce a.k.a. finisher apply===
+
== reduce a.k.a. finisher apply==
 
We use reduce(container) to reduce an accumulator.
 
We use reduce(container) to reduce an accumulator.
  

Revision as of 16:17, 23 February 2023

The interface Collector<T,A,R> serves the standard Java streams framework for MapReduce-like tasks with added in-memory processing capability a la Apache Spark.

You will use interface AccumulatorCombinerReducer<V,A,R> for our MapReduce assignments which is almost a one-to-one match with interface Collector<T,A,R> but de-ultra-uber-hyper-mega-super-lambdafied.

One To One

CSE 231s: AccumulatorCombinerReducer<V, A, R> Java Streams: Collector<T, A, R>
public interface AccumulatorCombinerReducer<V, A, R> {
	A createMutableContainer();

	void accumulate(A container, V item);

	void combine(A containerA, A containerB);

	R reduce(A container);
}
<===>

<===>

<===>

<===>
public interface Collector<T, A, R> {
    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();

    BinaryOperator<A> combiner();

    Function<A, R> finisher();
}

Differences

Each interface provides a way to customize 4 of 5 of the methods required for Spark-level MapReduce functionality. interface AccumulatorCombinerReducer<V,A,R> elects to perform the operations directly. interface Collector<T,A,R> elects to return single-abstract method interfaces which perform the operations.

interface Collector<T,A,R>

interface Supplier<T> returned by supplier()
interface BiConsumer<T,U> returned by accumulator()
interface BinaryOperator<T> returned by combiner()
interface Function<T,R> returned by finisher()

Methods

createMutableContainer a.k.a. supplier get

We use createMutableContainer() to create a new mutable container. For classic map reduce this would be a List<V>.

rosetta stone: container = collector.supplier().get() container = reducer.createMutableContainer()

accumulate a.k.a. accumulator accept

We use accumulate(container,item) to accumulate a value. For classic map reduce this would add an item to a list.

rosetta stone: collector.accumulator().accept(container,item); reducer.accumulate(container,item)

combine a.k.a. combiner apply

We use combine(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.

rosetta stone: collector.combiner().apply(containerA,containerB) reducer.combine(containerA,containerB)

reduce a.k.a. finisher apply

We use reduce(container) to reduce an accumulator.

rosetta stone: collector.finisher().apply(container) r = reducer.reduce(container)

Converting Back And Forth

public class StreamUtils {
	public static <V, A, R> Collector<V, A, R> toCollector(
			AccumulatorCombinerReducer<V, A, R> accumulatorCombinerReducer) {
		return new Collector<V, A, R>() {
			@Override
			public Supplier<A> supplier() {
				return () -> accumulatorCombinerReducer.createMutableContainer();
			}

			@Override
			public BiConsumer<A, V> accumulator() {
				return (container, item) -> accumulatorCombinerReducer.accumulate(container, item);
			}

			@Override
			public BinaryOperator<A> combiner() {
				return (a, b) -> {
					accumulatorCombinerReducer.combine(a, b);
					return a;
				};
			}

			@Override
			public Function<A, R> finisher() {
				return (container) -> accumulatorCombinerReducer.reduce(container);
			}

			@Override
			public Set<Characteristics> characteristics() {
				return accumulatorCombinerReducer.collectorCharacteristics();
			}
		};
	}

	public static <V, A, R> AccumulatorCombinerReducer<V, A, R> toAccumulatorCombinerReducer(
			Collector<V, A, R> collector) {
		return new AccumulatorCombinerReducer<V, A, R>() {
			@Override
			public A createMutableContainer() {
				return collector.supplier().get();
			}

			@Override
			public void accumulate(A container, V item) {
				collector.accumulator().accept(container, item);
			}

			@Override
			public void combine(A containerA, A containerB) {
				A result = collector.combiner().apply(containerA, containerB);
				if (result != containerA) {
					throw new RuntimeException("collector must combine b into a and return a.");
				}
			}

			@Override
			public R reduce(A container) {
				return collector.finisher().apply(container);
			}

			@Override
			public Set<Characteristics> collectorCharacteristics() {
				return collector.characteristics();
			}
		};
	}

	public static AccumulatorCombinerReducer<Integer, ?, Integer> summingIntAccumulatorCombinerReducer() {
		return toAccumulatorCombinerReducer(Collectors.summingInt(Integer::intValue));
	}
}