Character Count With Flush
Building on the last example, we can use the flush()
method to flush emits every X
times.
Flushing is useful to prevent workers from running out of memory by periodically sending output of computation
onto the next stage rather than waiting until all outputs have been processed.
Stage 3 below demonstrates this.
const execution = new JobExecution(storageType = 'pmrfs');
execution.setInputFn(async function input() {
return [
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
"https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
];
});
// Stage 1 - tokenize the input
execution.addStage(new MapStage(async function map(x) {
for await (const kv of x) {
let [k, url] = kv;
const response = await fetch(url);
const data = await response.text();
const lines = data.split('\\n');
for (const line of lines) {
const tokens = line.split(',');
for (const token of tokens) {
if (token && token.trim().length > 0) {
context.emit(token, 1);
}
}
}
}
await context.onComplete();
}));
// Stage 2 - gather each word
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
for (let key in keyToValues) {
let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
return accumulator + currentValue
}, 0);
context.emit(key, sum);
}
await context.onComplete();
}));
// Stage 3 - count the characters, flushing the output every 1,000 emits
execution.addStage(new MapStage(async function map(x) {
const flushMod = 1_000;
for (let i = 0, n = x.length; i < n; i++) {
let [k, v] = x[i];
for (let j = 0, m = k.length; j < m; j++) {
context.emit(k.charAt(j), 1);
}
if (i % flushMod === 0) {
await context.flush();
}
}
await context.onComplete();
}));
// Stage 4 - sum each character
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
for (let key in keyToValues) {
let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
return accumulator + currentValue
}, 0);
context.emit(key, sum);
}
await context.onComplete();
}));
execution.start(jobRunner);