Character Count

A job can be any combination of map/reduce functions. The following is a four stage example for demonstration purposes only. This can be done more efficiently with a single map followed by a single reduce.

const execution = new JobExecution(storageType = 'pmrfs');
execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});

// Stage 1 - tokenize the input into words
execution.addStage(new MapStage(async function map(x) {
  for await (const kv of x) {
    let [k, url] = kv;
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1);
        }
      }
    }
  }
  await context.onComplete();
}));

// Stage 2 - gather each word
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

// Stage 3 - count the characters
execution.addStage(new MapStage(async function map(x) {
  for (let i = 0, n = x.length; i < n; i++) {
    let [k, v] = x[i];
    for (let j = 0, m = k.length; j < m; j++) {
      context.emit(k.charAt(j), 1);
    }
  }
  await context.onComplete();
}));

// Stage 4 - sum each character
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

execution.start(jobRunner);