Word Count Only Parallel Map Reduce Framework Assignment
Contents
Building On Previous Exercises
Be sure to complete the Word Count Mapper and IntSumListAccumulatingReducer exercises first.
Code To Investigate
WordCountOnlyParallelMapReduceFramework
You will build each of the three methods mapReduceAll(input) invokes. Much can be gleaned from the parameters and return values of these methods.
mapReduceAll(TextSection[] input) |
---|
@Override
public Map<String, Integer> mapReduceAll(TextSection[] input) throws InterruptedException, ExecutionException {
List<List<Map.Entry<String, Integer>>> mapAllResult = mapAll(input);
Map<String, List<Integer>> accumulateAllResult = accumulateAll(mapAllResult);
return reduceAll(accumulateAllResult);
}
|
mapAll
The TextSection[] input:
input[0] = | new TextSection("A canner, exceedingly canny,") |
input[1] = | new TextSection("One morning remarked to his granny,") |
input[2] = | new TextSection("\"A canner can can") |
input[3] = | new TextSection("Anything that he can;") |
input[4] = | new TextSection("But a canner can't can a can, can he?\"") |
mapAll(input) would produce:
accumulateAll
accumulateAll(mapAllResult) would produce a Map<String, List<Integer>>
. Below we visualize a Map with 16 buckets that places entries based on the key's hashCode().
reduceAll
reduceAll(accumulateAllResult) would produce a Map<String, Integer>
. Below we visualize a Map with 16 buckets that places entries based on the key's hashCode().
Note: the keys all fall in the same location as in accumulateAllResult, but are associated with the reduced values.
Client
class: | WordCountOnlyParallelMapReduceFrameworkClient.java | CLIENT |
package: | mapreduce.framework.parallel.client | |
source folder: | student/src/main/java |
WordCountOnlyParallelMapReduceFrameworkClient |
---|
TextSection[] input = {
new TextSection("A canner, exceedingly canny,"),
new TextSection("One morning remarked to his granny,"),
new TextSection("\"A canner can can"),
new TextSection("Anything that he can;"),
new TextSection("But a canner can't can a can, can he?\""),
};
MapReduceFramework<TextSection, String, Integer, List<Integer>, Integer> framework = new WordCountOnlyParallelMapReduceFramework(new WordCountMapper(), new SummingIntClassicReducer());
Map<String, Integer> map = framework.mapReduceAll(input);
for (Map.Entry<String, Integer> entry : map.entrySet()) {
System.out.println(entry);
}
|
CardOnlySequentialMapReduceFrameworkClient Output |
---|
but=1 a=4 one=1 anything=1 morning=1 that=1 can=6 exceedingly=1 canner=3 his=1 granny=1 remarked=1 to=1 canny=1 can't=1 he=2 |
Code To Implement
WordCountOnlyParallelMapReduceFramework
mapper
method: public Mapper<Deck, Suit, Integer> mapper()
(sequential implementation only)
This method exists to remind you to use the mapper. The type is a perfect match for the WordCountMapper:
reducer
method: public Reducer<Integer, List<Integer>, Integer> reducer()
(sequential implementation only)
This method exists to remind you to use the accumulatorCombinerReducer. The type is a perfect match for the SummingIntClassicReducer:
mapAll
method: private List<List<KeyValuePair<String, Integer>>> mapAll(TextSection[] input)
(parallel implementation required)
Purpose: In parallel, map all the TextSection's using the mapper private instance variable
Requirement: |
Use the mapper's map method |
accumulateAll
method: private Map<String, List<Integer>> accumulateAll(List<List<Map.Entry<String, Integer>>> mapAllResults)
(sequential implementation only)
Purpose: Sequentially, assemble a Map<String, List<Integer>> where the String in this example is some word from a TextSection, and the List<Integer>'s is a list of 1's representing every time that word was found in a TextSection.
To do this: iterate through all of the entries, and use the accumulatorCombinerReducer's methods to create a new mutable container, and accumulate new values into that container. You may find Map.computeIfAbsent() particularly helpful for this.
Requirement: |
Use the accumulatorCombinerReducer's createMutableContainer and accumulate methods |
reduceAll
Provided
private Map<String, Integer> reduceAll(Map<String, List<Integer>> accumulateAllResult) throws InterruptedException, ExecutionException {
Entry<String, List<Integer>>[] entries = toArrayOfEntries(accumulateAllResult);
List<Integer> reductions = toReductions(entries);
return toReductionMap(entries, reductions);
}
toArrayOfEntries
Provided
private static Entry<String, List<Integer>>[] toArrayOfEntries(Map<String, List<Integer>> accumulateAllResult) {
return accumulateAllResult.entrySet().toArray(Entry[]::new);
}
toReductions
method: private List<Integer> toReductions(Entry<String, List<Integer>>[] entries)
(parallel implementation required)
Purpose: Use the reduce() method to return a list of Integers representing the reduced values of each Entry's value, in this case, a List<Integer>.
Say we have an Entry<String, List<Integer>> = ("the", [1, 1, 1, 1, 1]). We would want to reduce the 1's to a 5, and put that in our return list.
Requirement: |
Use the accumulatorCombinerReducer's reduce method |
toReductionMap
method: Map<String, Integer> toReductionMap(Entry<String, List<Integer>>[] entries, List<Integer> reductions)
(sequential implementation only)
Tip: The index of the Entry in the "entries[]" corresponds directly to the index of the Integer reduction of that entry from "reductions".
Testing Your Solution
class: | __WordCountOnlyParallelMapReduceFrameworkTestSuite.java | |
package: | mapreduce.framework.parallel.warmup | |
source folder: | testing/src/test/java |