Word Count Example
Let's build the canonical map reduce example of counting words in documents in PeerMR.
- Create a
JobExecutionobject:
const execution = new JobExecution(storageType = 'pmrfs');
- Define the input for the job:
execution.setInputFn(async function input() {
// the input is a function that returns an array
// PeerMR will chunk this array based on the number of workers for a job
// and send a chunk of key/value pairs to the first stage.
// The key is the index of the array.
return [
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
];
});
- Map stages operate on arrays of key/value pairs. Define a map stage that will fetch each csv, tokenize it, and emit a count of 1 for each token:
execution.addStage(new MapStage(async function map(x) {
// the input to a map function is a array of key value pairs
// x[0][0] is the first key, x[0][1] is the first value
for await (const kv of x) {
let [k, url] = kv; // destructure k/v pair
const response = await fetch(url);
const data = await response.text();
const lines = data.split('\\n');
for (const line of lines) {
const tokens = line.split(/\s+/);
for (const token of tokens) {
const str = token.replace(/[^0-9a-z]/gi, '').trim();
if (str.length) {
context.emit(str, 1); // context.emit will emit a k,v pair to the next stage
}
}
}
}
// it's important to call context.onComplete after all context.emit calls
// this signals to PeerMR that processing is complete for input x
await context.onComplete();
}));
- Reduce stages operate on objects of key to values arrays. Define a reduce stage that will sum the counts of each mapper emit by token:
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
for (let key in keyToValues) {
// key is the token emitted from the map stage
// keyToValues[key] is an array of values emitted for key
let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
return accumulator + currentValue
}, 0);
context.emit(key, sum); // context.emit will emit a k,v pair as the final output
}
// it's important to call context.onComplete after all context.emit calls
// this signals to PeerMR that processing is complete for input keyToValues
await context.onComplete();
}));
- Start the job
execution.start(jobRunner);
Putting it all together:
const execution = new JobExecution(storageType = 'pmrfs');
execution.setInputFn(async function input() {
return [
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
];
});
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
context.log('mapper rank: ' + rank);
for await (const kv of x) {
let [k, url] = kv;
const response = await fetch(url);
const data = await response.text();
const lines = data.split('\\n');
for (const line of lines) {
const tokens = line.split(/\s+/);
for (const token of tokens) {
const str = token.replace(/[^0-9a-z]/gi, '').trim();
if (str.length) {
context.emit(str, 1);
}
}
}
}
await context.onComplete();
}));
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
const rank = context.getRank();
context.log('reducer rank: ' + rank);
for (let key in keyToValues) {
let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
return accumulator + currentValue
}, 0);
context.emit(key, sum);
}
await context.onComplete();
}));
execution.start(jobRunner);