MapReduce Frameworks Lab
The MapReduce assignment has many different pieces that come together to solve problems. So much so that in the end writing each individual class or method can be less challenging than simply figuring out what each piece’s role should be in the larger puzzle.
At a high level we have a framework, a mapper, a reducer, and some data we would like to process.
Let’s start with the Framework. At the end of the day, the first job of the Framework is mapAll() which unsurprisingly calls map() on the Mapper for every piece of data. That’s it. Don’t overcomplicate it. The framework leaves all of the details of what (key, value) pairs to emit to the Mapper. Just call map() for every piece of data. Granted, it does need to create an instance of Context to receive those emissions. For the SimpleFramework we end up creating a context that holds a single dictionary to handle all of the key, value pairs the Mapper throws at it. This IndividualContext leaves grouping to the next stage so it can simply associate the keys with the values and be done. For the MatrixFramework, we end up mapping and grouping at once so its GroupStageSkippingContext is a bit more complicated, but the mapAndGroupAll() method on MatrixFramework stays nice and simple. It call map() on the Mapper for each piece of data. As stated before, don’t overcomplicate it. You will want to create an n-way-split-esque separation of the data into tasks, each with its own GroupStageSkippingContext, but don’t lose sight of the goal: call map on the Mapper for each piece of the data.
Next up, let’s talk about the Mapper’s role. (Coming Soon)
Contents
- 1 Part 1 Simple Framework
- 1.1 class StudentIndividualMapContext
- 1.2 Word Count Application
- 1.3 Mutual Friends Application
- 1.4 class WordCountConcreteStaticMapReduce (optional but encouraged)
- 1.5 class MutualFriendsConcreteStaticMapReduce (optional but encouraged)
- 1.6 SimpleMapReduceFramework<E,K,V>
- 1.7 FinishAccumulatorBasedReducer
- 2 Part 2 Matrix Framework
Part 1 Simple Framework
class StudentIndividualMapContext
public void emit(K key, V value)
Word Count Application
class WordCountMapper
public void map(MapContext<String, Integer> context, String[] lineOfWords)
class WordCountReducer
public FinishAccumulator<Integer> createAccumulator()
Test
class TestWordCountApplicationWithInstructorFramework
Mutual Friends Application
class MutualFriendsMapper
public void map(MapContext<OrderedPair<AccountId>, List<AccountId>> context, Account account)
class MutualFriendsReducer
public FinishAccumulator<List<AccountId>> createAccumulator()
see: creating a custom reduction accumulator and ListIntersectionReducer<T>
Test
class TestMutualFriendsApplicationWithInstructorFramework
class WordCountConcreteStaticMapReduce (optional but encouraged)
mapAll
private static IndividualMapContext<String, Integer>[] mapAll(String[][] S, WordCountMapper mapper) throws SuspendableException
groupAll
private static Map<String, List<Integer>> groupAll(IndividualMapContext<String, Integer>[] f_of_S)
reduceAll
private static Map<String, Integer> reduceAll(Map<String, List<Integer>> grouped_f_of_S, WordCountReducer reducer) throws SuspendableException
Test
class TestWordCountConcreteStaticFramework
class MutualFriendsConcreteStaticMapReduce (optional but encouraged)
mapAll
private static IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] mapAll(Account[] S, MutualFriendsMapper mapper) throws SuspendableException
groupAll
private static Map<OrderedPair<AccountId>, List<List<AccountId>>> groupAll(IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] f_of_S)
reduceAll
private static Map<OrderedPair<AccountId>, List<AccountId>> reduceAll(Map<OrderedPair<AccountId>, List<List<AccountId>>> grouped_f_of_S, MutualFriendsReducer reducer) throws SuspendableException
Test
class TestMutualFriendsConcreteStaticFramework
SimpleMapReduceFramework<E,K,V>
mapAll
private IndividualMapContext<K,V>[] mapAll(E[] S, Mapper<E,K,V> mapper) throws SuspendableException
groupAll
private Map<K, List<V>> groupAll(IndividualMapContext<K,V>[] f_of_S)
reduceAll
private Map<K, V> reduceAll(Map<K, List<V>> grouped_f_of_S, Reducer<V> reducer) throws SuspendableException
Test
class TestMutualFriendsSolution
FinishAccumulatorBasedReducer
reduce
default V reduce( List<V> list ) throws SuspendableException
Part 2 Matrix Framework
class StudentGroupStageSkippingMapContext<K,V>
emit
public void emit(K key, V value)
class MatrixMapReduceFramework<E,K,V>
mapGroupAll
private GroupStageSkippingMapContext<K, V>[] mapGroupAll(E[] S, Mapper<E, K, V> mapper) throws SuspendableException
reduceAll
private Map<K,V> reduceAll(GroupStageSkippingMapContext<K, V>[] grouped_f_of_S, Reducer<V> reducer) throws SuspendableException