Word Count Example

Let's build the canonical map reduce example of counting words in documents in PeerMR.

  • Create a JobExecution object:
const execution = new JobExecution(storageType = 'pmrfs');
  • Define the input for the job:
execution.setInputFn(async function input() {
  // the input is a function that returns an array
  // PeerMR will chunk this array based on the number of workers for a job
  // and send a chunk of key/value pairs to the first stage. 
  // The key is the index of the array.
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});
  • Map stages operate on arrays of key/value pairs. Define a map stage that will fetch each csv, tokenize it, and emit a count of 1 for each token:
execution.addStage(new MapStage(async function map(x) {
  // the input to a map function is a array of key value pairs
  // x[0][0] is the first key, x[0][1] is the first value
  for await (const kv of x) {
    let [k, url] = kv; // destructure k/v pair
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1); // context.emit will emit a k,v pair to the next stage
        }
      }
    }
  }
  // it's important to call context.onComplete after all context.emit calls
  // this signals to PeerMR that processing is complete for input x
  await context.onComplete();
}));
  • Reduce stages operate on objects of key to values arrays. Define a reduce stage that will sum the counts of each mapper emit by token:
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    // key is the token emitted from the map stage
    // keyToValues[key] is an array of values emitted for key
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum); // context.emit will emit a k,v pair as the final output
  }
  // it's important to call context.onComplete after all context.emit calls
  // this signals to PeerMR that processing is complete for input keyToValues
  await context.onComplete();
}));
  • Start the job
execution.start(jobRunner);

Putting it all together:

const execution = new JobExecution(storageType = 'pmrfs');

execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  context.log('mapper rank: ' + rank);
  for await (const kv of x) {
    let [k, url] = kv;
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1);
        }
      }
    }
  }
  await context.onComplete();
}));

execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  const rank = context.getRank();
  context.log('reducer rank: ' + rank);
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

execution.start(jobRunner);