Introduction

PeerMR is a service that makes it easy to process large datasets in a distributed fashion across a cluster of computers. PeerMR, along with connected peer or worker nodes, provides a distributed file system and compute layer over a managed pool of web browsers. Functionality is exposed via a JavaScript SDK that allows data scientists and practitioners to execute distributed machine learning and data processing jobs without the need to manage any infrastructure.

Use Cases For PeerMR

  • Processing large datasets without the need to manage any infrastructure.
  • Customers of cloud providers that are running Hadoop, Spark or other map-reduce-like jobs who want to reduce costs by porting them to PeerMR.
  • Training multiple machine learning models in parallel using TensorFlow.js, mljs or other Javascript/WASM-based machine learning code.
  • Training a single Tensorflow.js model across multiple CPU or GPUs in parallel using our fork of TensorFlow.js which provides primitives for distributed training without any code changes to the TensorFlow.js-based code.
  • Distributed WebGPU compute. WebGPU is a new web standard that provides low-level access to the GPU. You can specify minimum constraints for the WebGPU adapters if your application requires a certain level of performance.

Non-Use Cases For PeerMR

  • Training a model in parallel using Python PyTorch or TensorFlow code. These frameworks do not run in web browsers. It is, however, possible to convert an existing PyTorch or TF model to ONNX and use the ONNX.js library with PeerMR to distribute inferencing. You can also port the training code to TensorFlow.js and distribute training with PeerMR. Our fork of TensorFlow.js provides primitives for distributed training of a single model across multiple browsers with no code changes to the TensorFlow.js-based code.
  • Distributing CUDA, ROCm or other native GPU code. However, any existing GPU code can be converted to WebGPU. WebGL is another option, but it's harder to determine if a particular web browser is leveraging an actual GPU or a software renderer and so speedups are not guaranteed.

Contact Us

Hire our team of data science and ML practitioners to help you onboard a new use case or port your existing PyTorch or Tensorflow models to PeerMR. https://www.peercompute.com/contact

Key Concepts

Compute

PeerMR provides compute to users in the form of jobs. Jobs are defined in JavaScript and executed entirely in the browser. Jobs are distributed and run in parallel across multiple browsers via map and reduce constructs. MPI-like primitives are also provided to allow direct communication between workers. A job consists of one or more stages; each stage is either a Map stage or a Reduce stage. Map stages are stateless, they send their output to the next stage immediately on completion of input processing. Reduce stages are stateful, they can only send their output onto the next stage once the entire set of known inputs is fully processed.

Storage

For storage, you can bring your own GCS and S3 or use PMRFS, a distributed file system provided by PeerMR. Architecturally PMRFS is similar to HDFS. There is a single component that coordinates the file system operations among multiple browser nodes which actually store and serve the data. Unlike HDFS, PMRFS persistence is ephemeral. Because clients can come and go at anytime, it is not guaranteed that stored files will be present due to one or more parts being missing. PMRFS is mainly meant for intermediate storage of data during compute and is not meant to store objects long term, although it can be used for that purpose. Its core purpose is to provide a storage layer for the inputs and outputs of the compute layer. You can also choose to use S3 or GCS instead of PMRFS.

Buckets

Similar to S3 and GCS, PMRFS buckets are top level objects. Bucket names are globally unique. Each user is limited to 20 buckets.

Objects

Objects belong to a bucket and are identified by key that look like file paths. Key names are unique across a bucket. Unlike HDFS, not all objects are immutable. Objects that are intermediate inputs/outputs generated during job processing may be deleted unless the saveIntermediateOutput flag is set to true.

Blobs

Blobs are pieces of objects. They are binary data that are replicated across browsers. Blobs can become unavailable from PMRFS in multiple ways:

  • Users can delete blobs through the UI or with browser dev tools.
  • All browsers replicating a blob are disconnected from PeerMR.

If one or more blobs for an object are unavailable, the entire object is considered unavailable since the full object cannot be reconstructed. Like torrents, unavailable objects may be made available again if all blobs are brought back online.

Getting Started

Creating an Account

There are two types of accounts in PeerMR: coordinator and worker. Coordinators create and run compute jobs as well as store data in PeerMR. Coordinators pay for the compute and storage resources they use. Workers provide compute, storage and network resources to PeerMR and get paid for their contributions.

Coordinator Account

A coordinator account is required to run compute jobs and store data in PeerMR. To create a new coordinator account:

  • Click the "Sign In" button on the "Run Jobs" section of PeerMR
  • Create a new job, see the Compute Jobs section for more information.
  • Refer to the Compute Jobs section to learn how to create and run compute jobs.

Worker Account

A worker account allows browsers to provide compute, storage, and network resources to PeerMR. Worker accounts are not required by coordinators because a coordinator account is able to provide workers for its own jobs both for testing purposes or for keeping data private. Worker accounts get paid on a monthly basis based on their contributions to PeerMR. To create a new worker account:

  • Click the "Sign In" button on the "Get Paid" section of PeerMR
  • Click on the "Payments" Link in the dashvoard to follow the onboarding process with Stripe to make sure you can get paid for your workers' contributions.
  • Refer to the Contributing Resources section to learn more about resource contribution.

Billing

All billing is handled via Stripe

Resource Types

Coordinator accounts are billed on a monthly basis according to usage of the following:

  • PMRFS Storage per GB, bytes are the smallest atomic unit.
  • Cost per hour of CPU usage, seconds are the smallest atomic unit.
  • Cost per hour of WebGPU/WebGL, seconds are the smallest atomic unit.
  • Peer-to-peer network transfer per GB, or the total ingress + egress per month of direct P2P traffic, bytes are the smallest atomic unit.
  • WebRTC TURN server network transfer per GB, or the total ingress + egress per month of traffic that must traverse a TURN server, bytes are the smallest atomic unit.

Private Mode

Only WebRTC relay traffic is billed in private mode when using PMRFS as the storage type, and no billing will be incurred if all of your workers are able to connect to each other without a relay server.

Contributing Resources

By connecting with a compatible browser to PeerMR as a worker or coordinator, you contribute resources that PeerMR uses for coordinator job processing and PMRFS. When a coordinator submits a job, it is queued for processing. If your browser meets the minimum requirements for the job and is currently not processing other PeerMR jobs, it may be selected to participate in the job. If selected, your browser will download the job and begin processing it in a web worker. During processing, your browser will use your computer's resources such as CPU, GPU, memory and network to help complete the job. When the job is complete, your browser will be given a score based on the resources it contributed to the job. At the end of the month, you will be paid for the contributions your browser(s) made to PeerMR.

Getting Paid

Payments to worker accounts are sent once a month. You will only receive a payment if you have at least one worker account that provided resources to jobs processed by PeerMR. The following are resource types that workers get paid for, all of these are aggregated on a monthly period across all of your workers and included in the payment:

  • PMRFS Storage per GB, bytes are the smallest atomic unit.
  • Cost per hour of CPU usage, seconds are the smallest atomic unit.
  • Cost per hour of WebGPU/WebGL, seconds are the smallest atomic unit.
  • Peer-to-peer network transfer per GB, or the total ingress + egress per month, bytes are the smallest atomic unit. The "Payments" page in the dashboard will guide you through the onboarding process with Stripe to make sure you can get paid for your workers' contributions. The "Payments" page also has the latest pricing information with how much you earn for each resource.

Worker Scoring

Disconnections and other failures not related to bugs in PeerMR and/or coordinators' jobs are disruptive to the service. PeerMR adapts to these failures and finds replacement workers for failed workers. However, ensuring that your browser is connected to PeerMR is the ideal case. To help enforce this all workers are given a score per job that they participate in. These scores are aggregated for all jobs. During worker selection the workers with the highest scores are given preference. It is in your best interest to maximize the revenue of your workers by staying connected during the course of a job.

Browser Requirements

Chrome or Firefox are recommended browsers to use with PeerMR. At the time of writing Chrome has the best support for WebGPU so that is the recommended browser for jobs that require WebGPU which have the highest payouts.

Command Line Runner

The command line runner is a tool that allows you to connect a worker to PeerMR from the command line. This is useful for running workers on headless servers or for running workers in a Docker container. The tool is an executable that drives headless Chrome, a Chrome installation is required.

  1. Download the executable for your appropriate platform:
  1. Run the executable with the following flags:
./peermr-worker-chrome-0.0.1-darwin-arm64  -email=<worker_account_email> -data-dir=./worker-data

Compute Jobs

A job consists of a name and JavaScript which defines the input and stage definitions, and various other metadata for executing the job.

Creating Jobs

  • Select the "Jobs" link in the menu.
  • Click the "NEW JOB" button to create a new job.
  • Enter the name and JS for the job.
  • Click the "SAVE" button to save the job.

The basic JS template for a job is the following:

// create a new `JobExectution`
const execution = new JobExecution();

// defina a fn that the coordinator will execute. This fn should returns a promise 
// that resolves to an array. The array will be split up among the worker nodes
// and processed in parallel.
execution.setInputFn(async function input() {
});

// add a new map stage
execution.addStage(new MapStage(async function map(x) {
}));

// add a new reduce stage
execution.addStage(new ReduceStage(async function map(keyToValues) {
}));

// start the JobExecution
execution.start(jobRunner);

Job Execution Metadata

Metadata for a job execution gives PeerMR information needed to run the job. The metadata includes:

  • storageType: One of: 'pmrfs', 'gcs', or 's3'. Defines where to store for intermediate and final outputs.
  • workerCount: How many workers will run the job. Default worker count is 4. The maximum worker count is 8192.
  • groupCount: How many groups to run at a time. Default is 1. Groups can be used for A/B testing with up to 26 variations per job.
  • scripts: Third-party scripts required by the stages to run.
  • wasmBinaries: URLs of WebAssembly binaries required by the stages to run.
  • privateMode: If true, the job will run in private mode. Private mode is useful for debugging and testing jobs as well as keeping data private to only workers you've connected to your coordinator account.
  • gpu: If true, the job will only run on web browsers with WebGPU support.
  • gpuRequirements: a map of GPU requirements for the job. The map includes the following three keys:
  • saveIntermediateOutput: If true, intermediate outputs for PMRFS jobs will not be deleted after a job runs. Defaults to true. Is ignored for GCS and S3 jobs whose intermediate outputs are never deleted by PeerMR.

Submitting Jobs

  • Click the "RUN" button. This opens a new dialog where you can confirm the job execution metadata prior to running the job.
  • Confirm the job execution metadata and click the "RUN" button to submit the job.

Job Selection

After confirming a job run, the job is put in a globally shared job queue for processing. Once the job is at the front of the queue, it is selected for possible execution. If there are enough workers that match the job's requirements, the job will be executed. If not, the job will stay at the front of the queue. The browser that the job was queued the job is the coordinator for the job. The coordinator must stay connected to PeerMR for the job to run. If the coordinator is not connected, the job will not be started.

Private Mode

If the job execution's privateMode flag is true then only browsers that are connected to PeerMR with your coordinator account are eligible to be selected as workers. Only WebRTC relay traffic is billed in private mode, and no billing will be incurred if all of your workers are able to connect to each other without a relay server. Private mode is useful for testing your jobs before running them on the larger pool of public workers. Once you are confident that your job is working as expected, you can disable private mode and run the job in public mode. If privateMode is false then any browser connected to PeerMR is eligible to be selected as a worker provided it meets the job's gpu requirements (if any).

GPU Requirements

If the job execution's gpu flag is true then only browsers with WebGPU support are eligible to be selected as workers. Use gpuRequirements to specify the minimum WebGPU adapter requirements if your job needs certain levels of performance from GPU workers. If you do not specify gpu then PeerMR may still schedule your job on browsers with WebGPU support, but it is not guaranteed. You can check for WebGPU support at runtime in your worker map and reduce functions.

Word Count Example

Let's build the canonical map reduce example of counting words in documents in PeerMR.

  • Create a JobExecution object:
const execution = new JobExecution(storageType = 'pmrfs');
  • Define the input for the job:
execution.setInputFn(async function input() {
  // the input is a function that returns an array
  // PeerMR will chunk this array based on the number of workers for a job
  // and send a chunk of key/value pairs to the first stage. 
  // The key is the index of the array.
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});
  • Map stages operate on arrays of key/value pairs. Define a map stage that will fetch each csv, tokenize it, and emit a count of 1 for each token:
execution.addStage(new MapStage(async function map(x) {
  // the input to a map function is a array of key value pairs
  // x[0][0] is the first key, x[0][1] is the first value
  for await (const kv of x) {
    let [k, url] = kv; // destructure k/v pair
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1); // context.emit will emit a k,v pair to the next stage
        }
      }
    }
  }
  // it's important to call context.onComplete after all context.emit calls
  // this signals to PeerMR that processing is complete for input x
  await context.onComplete();
}));
  • Reduce stages operate on objects of key to values arrays. Define a reduce stage that will sum the counts of each mapper emit by token:
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    // key is the token emitted from the map stage
    // keyToValues[key] is an array of values emitted for key
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum); // context.emit will emit a k,v pair as the final output
  }
  // it's important to call context.onComplete after all context.emit calls
  // this signals to PeerMR that processing is complete for input keyToValues
  await context.onComplete();
}));
  • Start the job
execution.start(jobRunner);

Putting it all together:

const execution = new JobExecution(storageType = 'pmrfs');

execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  context.log('mapper rank: ' + rank);
  for await (const kv of x) {
    let [k, url] = kv;
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1);
        }
      }
    }
  }
  await context.onComplete();
}));

execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  const rank = context.getRank();
  context.log('reducer rank: ' + rank);
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

execution.start(jobRunner);

Storage Type

As peers connected to PeerMR execute the stages of each job, they store the output of the map/reduce operations in either PMRFS, GCS or S3. GCS and S3 are faster and more reliable, but billing costs for IO and storage are typically higher than PMRFS. With PMRFS, workers store the data in the same browsers they are running in using IndexedDB and transfer the data to other peers using WebRTC. This is the less expensive storage option. PMRFS can be used in both public and private mode jobs. For public mode jobs, the data will be distributed between your coordinator, your coordinator's workers, and public workers connected to PeerMR. For private mode jobs, the data will only be distributed between your coordinator and your coordinator's workers.

Specify storage type when creating a JobExection, either pmrfs, gcs or s3:

const execution = new JobExecution(storageType = 'pmrfs');

GCS Setup

To use GCS, you must add your details in the Cloud Data page under the GCS section. Signed URLs are used for workers to get temporary permission to write job output to your GCS bucket. Follow the instructions here to get a private key and service account email that PeerMR uses to generate signed urls for your bucket.

To allow workers to read/write from your bucket, set a CORS policy on the bucket so that requests coming from workers will succeed. Use the following policy to and set CORS on your bucket:

[
  {
    "origin": [
      "https://www.peermr.com"
    ],
    "responseHeader": [
      "Content-Type",
      "Access-Control-Allow-Origin",
    ],
    "method": [
      "GET",
      "HEAD",
      "PUT"
    ],
    "maxAgeSeconds": 3600
  }
]

S3 Setup

To use S3, you must add your details in the Cloud Data page under the S3 section. Signed URLs are used for workers to get temporary permission to write job output to your S3 bucket.

To allow workers to read/write from your bucket, set a CORS policy on the bucket so that requests coming from workers will succeed. Use the following policy to and set CORS on your bucket:

[
    {
        "AllowedHeaders": [
            "*"
        ],
        "AllowedMethods": [
            "GET",
            "HEAD",
            "PUT"
        ],
        "AllowedOrigins": [
            "https://www.peermr.com"
        ],
        "ExposeHeaders": [
            "Content-Type",
            "Access-Control-Allow-Origin"        
        ],
        "MaxAgeSeconds": 3600
    }
]

Grant public read to your bucket so that workers can read from your bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "PublicReadGetObject",
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::<your-bucket-name>/*"
            ]
        }
    ]
}

Workers and Groups

Workers

To specify the amount of workers (the default is 4) to run a job:

execution.workerCount = 16;

4 is the default. There is a maximum of 8192. If you specify a worker count higher than the number of available eligible workers, your job will block remain queued until the specified amount of workers becomes available.

Each worker has a score based on their previous history of job participation. Workers with higher scores are more likely to be chosen to participate in a job.

WebGPU

You can require that a worker has WebGPU support by setting the gpu flag:

execution.gpu = true;

When this flag is set, only workers with WebGPU support will be chosen to participate in the job.

Private Workers

When this flag is set, only your own workers will be chosen to participate in the job. You can bring up your own workers by connection multiple tabs or browsers to PeerMR. This mode ensures data privacy as no other workers will execute any parts of the job.

In addition., while developing a job, you may want to test it with a small number of your own workers before running it with a larger group of workers.

Private mode is enabled by setting the privateMode flag:

execution.privateMode = true;

Groups

You may wish to execute your job multiple times at once and compare the results. This is easily done with groups. A group count of 2 for example will execute the job twice, separating the intermediate and final output into groups A and B.

To specify the amount of groups for your job:

execution.groupCount = 2;

Security

Execution Environment

All jobs run in a web worker. This has certain implications; for job script code does not have access to the DOM. Additionally, certain API's are not available in a web worker, for example to use canvas you need to use OffscreenCanvas. Refer to the Web Workers documentation for more information.

The following functions are also not allowed job scripts:

Restricted Domains

[XMLHttpRequest])(https://developer.mozilla.org/en-US/docs/Web/API/XMLHttpRequest) and fetch are also limited. Only GET method is supported and only domains necessary to the functioning of the job are allowed.

Data Privacy

For jobs run in public mode (the default), any worker connected to PeerMR is eligible to process a job. This means that any data being processed is not private, even if you take care to encrypt it and decrypt it during processing. The workers will always have access to whatever data they process. Setting the privateMode of a job execution to true means that only workers you connect to PeerMR with your coordinator account are eligible to process your jobs. This is the only way to ensure data privacy in PeerMR.

Importing Scripts

Your job may require third-party libraries, or you may wish to bundle parts of your code into separate JS files. These files can be imported by specifying a list of URLs in the scripts field of the execution object:

execution.scripts = ["https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest/dist/tf.min.js"];

Note that calling importScripts inside of job scripts is disabled and so any outside scripts should be imported this way.

WASM Support

PeerMR supports WebAssembly (WASM) in compute jobs. This allows your job to run code written in languages like C, C++, and Rust.

To register WASM binaries for a job, add their source URLs to the wasmBinaries field of the execution object:

execution.wasmBinaries = ["https://unpkg.com/browse/@wllama/wllama@1.16.2/src/single-thread/wllama.wasm"];

Then the WASM binary can be loaded via context.getWebAssemblyURL. Please refer to WASM LERP C++ for an example.

Flushing

Keep in mind that the functions defined in the jobs are run in web browsers and not servers. They are run in separate Web Workers to avoid interrupting the UI but still they are limited to the memory and CPU available on the host machine. It is possible to flush emits mid-processing to keep the resultant output file sizes small and to help keep memory consumption low. This ensures that the browsers processing the next stage will not have to load very large files into memory.

The following example will flush every 10000 emits:

execution.addStage(new MapStage(async function map(x) {
  for (let i = 0, n = x.length; i < n; i++) {
    let [k, v] = x[i];
    for (let j = 0, m = k.length; j < m; j++) {
      context.emit(k.charAt(j), v);
    }
    if (i % 10000 === 0) {
      await context.flush();
    }
  }
  await context.onComplete();
}));

Serialization/Deserialization

Map stages operate on key/value pairs while reduce stages operate on key/values (key/array of value) pairs. JSON is used by default in order to serialize/deserialize data for processing between stages. CSV and BSON are also supported.

To have a stage output CSV rather than the default JSON specify the content type:

const stage = new MapStage(async function map(x) {
  // map body
});
stage.contentType = 'text/csv';

Similarly to use BSON:

const stage = new MapStage(async function map(x) {
  // map body
});
stage.contentType = 'application/bson';

Set the second argument of execution.setInputFn to specify the content type of the input to the first stage:

execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
}, 'application/bson');

Logging

Each worker and the coordinator (your browser) keep a separate log of what is happening. When a job is complete, or a job fails, these logs are uploaded to PMRFS, GCS, or S3 depending on the storage type of the job. These contain system run level logs that are useful for debugging. You can add your own logging to these logs via context.log:

execution.addStage(new MapStage(async function map(x) {
  for await (const kv of x) {
    let [k, v] = kv; // destructure k/v pair
    context.log('key: ' + k);
  }
  await context.onComplete();
}));

Timeouts

Stage timeouts

Each stage is given 60 minutes to complete its work. If all work required for the stage is not completed within the time limit then the job will fail. This timeout may be too short or too long for your particular use case. To adjust the timeout, set the timeout property on a stage before adding it to the execution: stage.timeout = <timeout_seconds>:

const stage = new ReduceStage(async function reduce(keyToValues) {
  // reduce logic
});

stage.timeout = 30 * 60 // 30 minutes stage timeout
execution.addStage(stage);

Worker timeouts

By default, every worker processing a map/reduce stage has a timeout of 60 minutes. This means if a worker working on a particular stage does not send an emit within the timeout, the worker will fail. When a worker fails another available worker is selected. If there are no workers available, the job will fail. This timeout may be too short or too long for your particular use case. To adjust the timeout, call the context.setWorkerTimeout(<timeout_seconds>) function at the beginning of your stage:

execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  context.setWorkerTimeout(120 * 60); // two hour worker timeout
  // your logic here
  await context.onComplete();
}));

Rank

Similar to MPI, each worker processing a job is identified by a unique rank. You can use this rank to do special processing for specific worker(s) if necessary. To get the rank of a worker in a map or reduce stage via context.getRank:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  if (rank % 2 === 0) {
    // do something
  }
}));

Direct Peer Communication

Workers can communicate directly with each other using the concept of rank along with functions sendToRank and receiveFromRank. For example, to send data to "right" neighbors and receive data from "left" neighbors in a given job context:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = [rank * 2, rank * 4, rank * 6];
  const sendDataPmrfsUrl = await context.sendToRank(rank + 1, dataTag, dataToSend);
  const receivedData = await context.receiveFromRank(rank - 1, dataTag);
  
  // ... mapper logic here
  context.onComplete();
}));

Awaiting on sendToRank before awaiting on receiveFromRank is optional. The engine can handle out of order sends/receives so the following will also work:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag';
  const dataToSend = [rank * 2, rank * 4, rank * 6];
  const s = context.sendToRank(rank + 1, dataTag, dataToSend);
  const r = await context.receiveFromRank(rank - 1, dataTag);
  const result = await(Promise.all([s, r])); // result[0] is PMRFS url of send data, result[1] is the received data
  
  // ... mapper logic here
  context.onComplete();
}));

Direct Send

By default, sendToRank and receiveFromRank will use the same method as the map reduce engine for transferring data between workers. For GCS and S3 jobs, senders copy data to GCS/S3 and receivers are notified of the resultant URLs which they then fetch. For PMRFS jobs, senders transfer through the PMRFS network first, which includes disk persistence, and receivers are notified of the resultant URLs which they then fetch. This allows sending and receiving to peers without requiring a WebRTC connection between sender and receiver. This method is also more reliable. However, it's possible to require a direct WebRTC connection between sender and receiver and avoid disk persistence by passing the require flag as options to both sendToRank and receiveFromRank. Using this method, an ArrayBuffer is required as the data type and sender will send the data directly to a receiver using a WebRTC data channel:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  const options = {'direct': true};
  await context.sendToRank(rank + 1, dataTag, dataToSend, options);
  const receivedData = await context.receiveFromRank(rank - 1, dataTag, options);
  
  // ... mapper logic here
  context.onComplete();
}));

Note that both sender and receiver must be in agreement on the direct flag.

Timeouts

receiveFromRank will timeout after 10 minutes if a corresponding sendToRank has not been sent. This timeout can be adjusted by passing a timeout flag as an option to receiveFromRank. The timeout is in milliseconds:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  await context.sendToRank(rank + 1, dataTag, dataToSend);
  const options = {'timeout': 5 * 60 * 1_000}; // 5 minute timeout
  context.receiveFromRank(rank - 1, dataTag, options)
    .then(receivedData => {
        // receive was successful
      })
    .catch(e => {
      // receive timed out
    });
  
  // ... mapper logic here
  context.onComplete();
}));

Retries

For indirect send, when a sender, call it worker A, fails and another worker, call it worker B, is selected in its place, any previous sends to failed worker A will get resent to B. So in the following example all sends/receives occur as expected even in the case of workers failing and getting replaced:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  for(let i = 0; i < 3; i++) {
    const dataTag = 'tag' + i;
    await context.sendToRank(rank + 1, dataTag, dataToSend);
    const receivedData = await context.receiveFromRank(rank - 1, dataTag);
    // ... do something with data    
  }
  
  // ... mapper logic here
  context.onComplete();
}));

This works nicely for indirect sends because each worker keeps track of what URLs it has written and can notify a replaced client of the set in which it cares about. However, for a direct send the above example may not work as expected if a worker fails during a send or receive. This is because for direct send, senders delete their copy of sent data immediately after an acknowledgement of receipt by the receiver. Keeping these buffers in browser memory is not feasible and so resending does not occur when a node gets replaced. So it's important when using direct send to account for this using proper timeouts and retry logic.

More Examples

Character Count

A job can be any combination of map/reduce functions. The following is a four stage example for demonstration purposes only. This can be done more efficiently with a single map followed by a single reduce.

const execution = new JobExecution(storageType = 'pmrfs');
execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});

// Stage 1 - tokenize the input into words
execution.addStage(new MapStage(async function map(x) {
  for await (const kv of x) {
    let [k, url] = kv;
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(/\s+/);
      for (const token of tokens) {
        const str = token.replace(/[^0-9a-z]/gi, '').trim();
        if (str.length) {
          context.emit(str, 1);
        }
      }
    }
  }
  await context.onComplete();
}));

// Stage 2 - gather each word
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

// Stage 3 - count the characters
execution.addStage(new MapStage(async function map(x) {
  for (let i = 0, n = x.length; i < n; i++) {
    let [k, v] = x[i];
    for (let j = 0, m = k.length; j < m; j++) {
      context.emit(k.charAt(j), 1);
    }
  }
  await context.onComplete();
}));

// Stage 4 - sum each character
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

execution.start(jobRunner);

Character Count With Flush

Building on the last example, we can use the flush() method to flush emits every X times. Flushing is useful to prevent workers from running out of memory by periodically sending output of computation onto the next stage rather than waiting until all outputs have been processed. Stage 3 below demonstrates this.

const execution = new JobExecution(storageType = 'pmrfs');
execution.setInputFn(async function input() {
  return [
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/LittleWomen.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Middlemarch.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MobyDick.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/MyLife.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/PrideAndPrejudice.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/RoomWithAView.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TaleOfTwoCities.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheAdventuresOfSherlockHolmes.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheCountOfMonteCristo.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheGreatGatsby.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/TheScarletLetter.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/Ulysses.txt",
    "https://storage.googleapis.com/peermr/8cb2de6c-63fe-4c53-94ed-452074972f46/_data/WarAndPeace.txt"
  ];
});

// Stage 1 - tokenize the input
execution.addStage(new MapStage(async function map(x) {
  for await (const kv of x) {
    let [k, url] = kv;
    const response = await fetch(url);
    const data = await response.text();
    const lines = data.split('\\n');
    for (const line of lines) {
      const tokens = line.split(',');
      for (const token of tokens) {
        if (token && token.trim().length > 0) {
          context.emit(token, 1);
        }
      }
    }
  }
  await context.onComplete();
}));

// Stage 2 - gather each word
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

// Stage 3 - count the characters, flushing the output every 1,000 emits
execution.addStage(new MapStage(async function map(x) {
  const flushMod = 1_000;
  for (let i = 0, n = x.length; i < n; i++) {
    let [k, v] = x[i];
    for (let j = 0, m = k.length; j < m; j++) {
      context.emit(k.charAt(j), 1);
    }
    if (i % flushMod === 0) {
      await context.flush();
    }
  }
  await context.onComplete();
}));

// Stage 4 - sum each character
execution.addStage(new ReduceStage(async function reduce(keyToValues) {
  for (let key in keyToValues) {
    let sum = keyToValues[key].reduce(function (accumulator, currentValue) {
      return accumulator + currentValue
    }, 0);
    context.emit(key, sum);
  }
  await context.onComplete();
}));

execution.start(jobRunner);

Digit Recognition With TensorFlow.js

Description

OCR (Optical Character Recognition) for digit recognition is a technology used to convert images containing handwritten or printed digits into machine-readable text. Here we train an OCR model using Tensorflow.js and PeerMR.

Implementation

This is a good example for porting an existing Tensorflow.js model that runs on a single web browser to run across multiple browsers using PeerMR. This example uses the vanilla Tensorflow.js library and does not require our fork of Tensorflow.js.

The following is adapted from the TensorFlow.js lab here

The input defines parameters for three different models. A single map function is used and three workers each train a single model. The prediction results are the final output of the job.

const execution = new JobExecution('gcs');
execution.scripts = ["https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest/dist/tf.min.js"];

execution.setInputFn(async function inputs() {
  return [
    {
      "kernel-size-1": 5,
      "filters-1": 8,
      "strides-1": 1,
      "kernel-size-2": 5,
      "filters-2": 16,
      "strides-2": 1
    },
    {
      "kernel-size-1": 4,
      "filters-1": 8,
      "strides-1": 1,
      "kernel-size-2": 4,
      "filters-2": 16,
      "strides-2": 1
    },
    {
      "kernel-size-1": 5,
      "filters-1": 16,
      "strides-1": 1,
      "kernel-size-2": 5,
      "filters-2": 32,
      "strides-2": 1
    }
  ];
});

execution.addStage(new MapStage(async function map(mapInputs) {
  const IMAGE_WIDTH = 28;
  const IMAGE_HEIGHT = 28;
  const IMAGE_CHANNELS = 1;

  function getModel(modelParams) {
    const model = tf.sequential();

    // In the first layer of our convolutional neural network we have
    // to specify the input shape. Then we specify some parameters for
    // the convolution operation that takes place in this layer.
    model.add(tf.layers.conv2d({
      inputShape: [IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS],
      kernelSize: modelParams['kernel-size-1'],
      filters: modelParams['filters-1'],
      strides: modelParams['strides-1'],
      activation: 'relu',
      kernelInitializer: 'varianceScaling'
    }));

    // The MaxPooling layer acts as a sort of downsampling using max values
    // in a region instead of averaging.
    model.add(tf.layers.maxPooling2d({poolSize: [2, 2], strides: [2, 2]}));

    // Repeat another conv2d + maxPooling stack.
    // Note that we have more filters in the convolution.
    model.add(tf.layers.conv2d({
      kernelSize: modelParams['kernel-size-2'],
      filters: modelParams['filters-2'],
      strides: modelParams['strides-2'],
      activation: 'relu',
      kernelInitializer: 'varianceScaling'
    }));
    model.add(tf.layers.maxPooling2d({poolSize: [2, 2], strides: [2, 2]}));

    // Now we flatten the output from the 2D filters into a 1D vector to prepare
    // it for input into our last layer. This is common practice when feeding
    // higher dimensional data to a final classification output layer.
    model.add(tf.layers.flatten());

    // Our last layer is a dense layer which has 10 output units, one for each
    // output class (i.e. 0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
    const NUM_OUTPUT_CLASSES = 10;
    model.add(tf.layers.dense({
      units: NUM_OUTPUT_CLASSES,
      kernelInitializer: 'varianceScaling',
      activation: 'softmax'
    }));

    // Choose an optimizer, loss function and accuracy metric,
    // then compile and return the model
    const optimizer = tf.train.adam();
    model.compile({
      optimizer: optimizer,
      loss: 'categoricalCrossentropy',
      metrics: ['accuracy'],
    });

    return model;
  }

  async function train(model, data) {
    const BATCH_SIZE = 512;
    const TRAIN_DATA_SIZE = 5500;
    const TEST_DATA_SIZE = 1000;

    const [trainXs, trainYs] = tf.tidy(() => {
      const d = data.nextTrainBatch(TRAIN_DATA_SIZE);
      return [
        d.xs.reshape([TRAIN_DATA_SIZE, 28, 28, 1]),
        d.labels
      ];
    });

    const [testXs, testYs] = tf.tidy(() => {
      const d = data.nextTestBatch(TEST_DATA_SIZE);
      return [
        d.xs.reshape([TEST_DATA_SIZE, 28, 28, 1]),
        d.labels
      ];
    });

    const info = await model.fit(trainXs, trainYs, {
      batchSize: BATCH_SIZE,
      validationData: [testXs, testYs],
      epochs: 10,
      shuffle: true,
      // callbacks: fitCallbacks
    });
    console.log('Final accuracy', info.history.acc);
  }

  const IMAGE_SIZE = 784;
  const NUM_CLASSES = 10;
  const NUM_DATASET_ELEMENTS = 65000;

  const NUM_TRAIN_ELEMENTS = 55000;
  const NUM_TEST_ELEMENTS = NUM_DATASET_ELEMENTS - NUM_TRAIN_ELEMENTS;

  const MNIST_IMAGES_SPRITE_PATH =
    'https://storage.googleapis.com/peermr/689a5731-c29b-468e-94a6-24b6e117d3bb/_data/mnist_images.png';
  const MNIST_LABELS_PATH =
    'https://storage.googleapis.com/peermr/689a5731-c29b-468e-94a6-24b6e117d3bb/_data/mnist_labels_uint8';

  class MnistData {
    constructor() {
      this.shuffledTrainIndex = 0;
      this.shuffledTestIndex = 0;
    }

    async load() {
      const canvas = new OffscreenCanvas(IMAGE_WIDTH, IMAGE_HEIGHT);
      const ctx = canvas.getContext('2d');
      const imgResponse = await fetch(MNIST_IMAGES_SPRITE_PATH);
      const imgBlob = await imgResponse.blob();
      const imgBitmap = await createImageBitmap(imgBlob);

      const datasetBytesBuffer =
        new ArrayBuffer(NUM_DATASET_ELEMENTS * IMAGE_SIZE * 4);
      const chunkSize = 5000;
      canvas.height = chunkSize;

      for (let i = 0; i < NUM_DATASET_ELEMENTS / chunkSize; i++) {
        const datasetBytesView = new Float32Array(
          datasetBytesBuffer, i * IMAGE_SIZE * chunkSize * 4,
          IMAGE_SIZE * chunkSize);
        ctx.drawImage(
          imgBitmap, 0, i * chunkSize, imgBitmap.width, chunkSize, 0, 0, imgBitmap.width,
          chunkSize);

        const imageData = ctx.getImageData(0, 0, canvas.width, canvas.height);

        for (let j = 0; j < imageData.data.length / 4; j++) {
          // All channels hold an equal value since the image is grayscale, so
          // just read the red channel.
          datasetBytesView[j] = imageData.data[j * 4] / 255;
        }
      }
      this.datasetImages = new Float32Array(datasetBytesBuffer);

      const labelsResponse = await fetch(MNIST_LABELS_PATH);
      this.datasetLabels = new Uint8Array(await labelsResponse.arrayBuffer());

      // Create shuffled indices into the train/test set for when we select a
      // random dataset element for training / validation.
      this.trainIndices = tf.util.createShuffledIndices(NUM_TRAIN_ELEMENTS);
      this.testIndices = tf.util.createShuffledIndices(NUM_TEST_ELEMENTS);

      // Slice the images and labels into train and test sets.
      this.trainImages =
        this.datasetImages.slice(0, IMAGE_SIZE * NUM_TRAIN_ELEMENTS);
      this.testImages = this.datasetImages.slice(IMAGE_SIZE * NUM_TRAIN_ELEMENTS);
      this.trainLabels =
        this.datasetLabels.slice(0, NUM_CLASSES * NUM_TRAIN_ELEMENTS);
      this.testLabels =
        this.datasetLabels.slice(NUM_CLASSES * NUM_TRAIN_ELEMENTS);
    }

    nextTrainBatch(batchSize) {
      return this.nextBatch(
        batchSize, [this.trainImages, this.trainLabels], () => {
          this.shuffledTrainIndex =
            (this.shuffledTrainIndex + 1) % this.trainIndices.length;
          return this.trainIndices[this.shuffledTrainIndex];
        });
    }

    nextTestBatch(batchSize) {
      return this.nextBatch(batchSize, [this.testImages, this.testLabels], () => {
        this.shuffledTestIndex =
          (this.shuffledTestIndex + 1) % this.testIndices.length;
        return this.testIndices[this.shuffledTestIndex];
      });
    }

    nextBatch(batchSize, data, index) {
      const batchImagesArray = new Float32Array(batchSize * IMAGE_SIZE);
      const batchLabelsArray = new Uint8Array(batchSize * NUM_CLASSES);

      for (let i = 0; i < batchSize; i++) {
        const idx = index();

        const image =
          data[0].slice(idx * IMAGE_SIZE, idx * IMAGE_SIZE + IMAGE_SIZE);
        batchImagesArray.set(image, i * IMAGE_SIZE);

        const label =
          data[1].slice(idx * NUM_CLASSES, idx * NUM_CLASSES + NUM_CLASSES);
        batchLabelsArray.set(label, i * NUM_CLASSES);
      }

      const xs = tf.tensor2d(batchImagesArray, [batchSize, IMAGE_SIZE]);
      const labels = tf.tensor2d(batchLabelsArray, [batchSize, NUM_CLASSES]);

      return {xs, labels};
    }
  }

  function doPrediction(model, data, testDataSize = 500) {
    const IMAGE_WIDTH = 28;
    const IMAGE_HEIGHT = 28;
    const testData = data.nextTestBatch(testDataSize);
    const testxs = testData.xs.reshape([testDataSize, IMAGE_WIDTH, IMAGE_HEIGHT, 1]);
    const labels = testData.labels.argMax(-1);
    const preds = model.predict(testxs).argMax(-1);

    testxs.dispose();
    return [preds, labels];
  }

  async function trainAndPredict(k, modelParams) {
    context.log('loading mnist data');
    const data = new MnistData();
    await data.load();

    context.log('defining model with params: ' + JSON.stringify(modelParams));
    const model = getModel(modelParams);

    context.log('training model with params: ' + JSON.stringify(modelParams));
    await train(model, data);

    context.log('getting label predictions');
    const [preds, labels] = doPrediction(model, data);
    context.log('computing mse');
    const mse = tf.metrics.meanSquaredError(labels, preds)
    context.emit(k, mse.arraySync());
  }

  for await (const kv of mapInputs) {
    const [k, modelParams] = kv;
    await trainAndPredict(k, modelParams);
  }
  await context.onComplete();
}));

execution.workerCount = 3;
execution.start(jobRunner);

Decoder-Only Transformer LM

Description

A decoder-only Transformer LM refers to a language model architecture that only employs the decoder component of a Transformer model. In the original Transformer architecture, used in tasks like machine translation, the model consists of an encoder and a decoder. The encoder processes the input sequence, creating a contextualized representation, while the decoder generates the output sequence based on that representation.

In a decoder-only Transformer LM, the encoder part of the model is omitted, and only the decoder is used. This setup is typically employed in autoregressive language models where the task is to generate the next token in a sequence based on the previous tokens. In such models, the decoder predicts the next token given the previously generated tokens, often by attending to them through self-attention mechanisms.

Decoder-only Transformer LMs are commonly used in tasks like language generation, text completion, and dialogue generation, where the model needs to generate coherent and contextually appropriate sequences of text. These models are trained on large corpora of text data using techniques like maximum likelihood estimation or variants of it, such as teacher forcing or self-attention mechanisms.

Implementation

The following example uses the PeerMR fork of TensorFlow.js. This fork implements the All-Reduce pattern so that you can train models with large amounts of data across multiple web browsers. It also provides a good example of how to port Tensorflow Python code to Tensorflow.js. The original Tensorflow Python version is available here.

// adapted from: https://github.com/ml-explore/mlx-examples/tree/main/transformer_lm

// use PMRFS for storage and use 8 workers
const execution = new JobExecution('pmrfs', workerCount = 8);

// require the PeerMR fork of Tensorflow.js
execution.scripts = ["https://storage.googleapis.com/peermr.com/js/tf.es2017-5.js"];

// require a WebGPU browser
execution.gpu = true;

// not necessary for this example, but here we specify some WebGPU requirements
// to demonstrate the capability.
// workers that do not meet these requirements will not be selected to run this job.
execution.gpuRequirements = {
  info: {
    'vendor': 'apple',
  },
  features: {
    'texture-compression-astc': true
  },
  limits: {
    'maxBufferSize': 4294967296
  }
}

// coordinator browser will fetch the input and distribute in pieces to all workers
execution.setInputFn(async function inputs() {
  async function getDatasets() {
    const trainDataUrl = 'https://storage.googleapis.com/peermr/ptb/ptb.train.txt';
    context.log(`fetching train data ${trainDataUrl}`);
    const trainDataResponse = await fetch(trainDataUrl);
    const trainDataText = await trainDataResponse.text();

    // get all tokens
    context.log(`parsing vocab`);
    const eos = '<eos>';
    const vocabTokens = {};
    vocabTokens[eos] = true;
    const lines = trainDataText.split('\\n');
    for (const line of lines) {
      const tokens = line.trim().split(' ');
      for (const token of tokens) {
        const t = token.trim();
        if (t.length) {
          vocabTokens[t] = true;
        }
      }
    }
    const tokenCount = Object.keys(vocabTokens).length;
    context.log(`${tokenCount} vocab tokens`);
    if (tokenCount !== 10_000) {
      throw new Error(`invalid token count ${tokenCount}`);
    }

    // group tokens by unique index
    context.log(`group tokens by index`);
    let index = 0;
    const vocab = {};
    for (const token in vocabTokens) {
      vocab[token] = index;
      index += 1;
    }
    context.log(`${Object.keys(vocab).length} vocab size`);

    function* getData(text) {
      const lines = text.split('\\n');
      for (const line of lines) {
        const tokens = line.trim().split(' ');
        for (const token of tokens) {
          const t = token.trim();
          if (t.length) {
            yield vocab[t];
          }
        }
        yield vocab[eos];
      }
    }

    const trainData = Array.from(getData(trainDataText));
    context.log(`${trainData.length} train data size`);

    return [vocab, trainData];
  }

  const [vocab, trainData] = await getDatasets();

  const workerCount = context.getWorkerCount();
  const samplesPerWorker = trainData.length / workerCount;
  const mapStageInputs = [];
  for (let i = 0; i < workerCount; i++) {
    const data = trainData.slice(i * samplesPerWorker, (i + 1) * samplesPerWorker);
    mapStageInputs.push({
      'vocab': vocab,
      'data': data,
    });
  }
  return mapStageInputs;
});

const stage = new MapStage(async function map(mapInputs) {
  if (mapInputs.length !== 1) {
    throw new Error('invalid mapper input length: ' + mapInputs.length);
  }

  const contextSize = 1024; // context size in tokens of the model
  const numBlocks = 12; // number of transformer blocks
  const numHeads = 16; // number of heads used for multi-head attention
  const batchSize = 2; // minibatch size
  const iterations = 1000; // train iterations
  const learningRate = 1e-3; // SGD learning rate
  const modelDims = 1024; // dimensionality of embeddings and hidden layers

  function createAdditiveCausalMask(N) {
    return tf.tidy(() => {
      const indices = tf.range(0, N);
      const mask = tf.less(tf.reshape(indices, [-1, 1]), tf.reshape(indices, [1, -1]));
      return mask.cast('float32').mul(tf.scalar(-1e9));
    });
  }

  class SelfAttention extends tf.layers.Layer {
    constructor(name, numHeads, modelDims, contextSize) {
      super({name: name, inputShape: [modelDims, modelDims]});
      this.Wq = tf.layers.dense({units: modelDims, useBias: false});
      this.Wk = tf.layers.dense({units: modelDims, useBias: false});
      this.Wv = tf.layers.dense({units: modelDims, useBias: false});
      this.Wo = tf.layers.dense({units: modelDims, useBias: false});

      this.causalMask = createAdditiveCausalMask(contextSize);
      this.numHeads = numHeads;
      this.headDim = modelDims / numHeads;
      this.scale = tf.scalar(1.0 / Math.sqrt(this.headDim));
    }

    build(inputShape) {
      this.Wq.build(inputShape);
      this.Wk.build(inputShape);
      this.Wv.build(inputShape);
      this.Wo.build(inputShape);
    }

    call(inputs, kwargs) {
      return tf.tidy(() => {
        const x = inputs[0];
        let queries = this.Wq.apply(x);
        let keys = this.Wk.apply(x);
        let values = this.Wv.apply(x);

        const [B, L, D] = x.shape;
        queries = tf.transpose(tf.reshape(queries, [B, L, this.numHeads, -1]), [0, 2, 1, 3]);
        keys = tf.transpose(tf.reshape(keys, [B, L, this.numHeads, -1]), [0, 2, 1, 3]);
        values = tf.transpose(tf.reshape(values, [B, L, this.numHeads, -1]), [0, 2, 1, 3]);

        let scores = this.scale.mul(queries.matMul(tf.transpose(keys, [0, 1, 3, 2])));
        scores = tf.softmax(scores.add(this.causalMask), -1);
        values = scores.matMul(values);
        let valuesHat = tf.reshape(tf.transpose(values, [0, 2, 1, 3]), [B, L, -1]);

        return this.Wo.apply(valuesHat);
      });
    }
  }

  SelfAttention.className = 'SelfAttention';
  tf.serialization.registerClass(SelfAttention);

  class EncoderLayer extends tf.layers.Layer {
    constructor(name, numHeads, modelDims, contextSize) {
      super({name: name, inputShape: [modelDims, modelDims]});
      // if these are not wrapped in sequential then the layers need to have build called with this.build is called
      this.selfAttn = tf.sequential();
      this.selfAttn.add(tf.layers.layerNormalization({epsilon: 1e-5, inputShape: [modelDims, modelDims]}));
      this.selfAttn.add(new SelfAttention(`${name}-self-attention`, numHeads, modelDims, contextSize));
      this.mlp = tf.sequential();
      this.mlp.add(tf.layers.layerNormalization({epsilon: 1e-5, inputShape: [modelDims, modelDims]}));
      this.mlp.add(tf.layers.dense({units: 4 * modelDims, activation: 'relu', inputShape: [modelDims, modelDims]}));
      this.mlp.add(tf.layers.dense({units: modelDims}));
    }

    call(inputs, kwargs) {
      return tf.tidy(() => {
        const x = this.selfAttn.apply(inputs);
        const x1 = tf.add(inputs, x);
        const x2 = this.mlp.apply(x1);
        return tf.add(x1, x2);
      });
    }
  }

  EncoderLayer.className = 'EncoderLayer';
  tf.serialization.registerClass(EncoderLayer);

  class TransformerLM extends tf.LayersModel {
    constructor(vocabSize, numLayers, numHeads, modelDims, contextSize) {
      const embedding = tf.layers.embedding({
        name: 'embedding',
        inputDim: vocabSize,
        outputDim: modelDims,
        inputLength: modelDims,
      });
      const trnsfrmr = tf.sequential();
      for (let i = 0; i < numLayers; i++) {
        trnsfrmr.add(new EncoderLayer(
          `transformer-encoder-layer-${i}`, numHeads, modelDims, contextSize));
      }
      const projection = tf.layers.dense({
        name: 'projection',
        units: vocabSize,
      });

      const input = tf.input({shape: [modelDims]});
      const x1 = embedding.apply(input);
      const x2 = trnsfrmr.apply(x1);
      const x3 = projection.apply(x2);

      super({
        inputs: input,
        outputs: x3,
        name: 'transformer_lm'
      });
    }
  }

  TransformerLM.className = 'TransformerLM';
  tf.serialization.registerClass(TransformerLM);

  function toSamples(contextSize, dataset, batchSize, offset) {
    return tf.tidy(() => {
      const windowSize = contextSize + 1; // include target
      const d = [];
      const upperBound = batchSize + offset;
      for (let i = offset; i < upperBound; i++) {
        d.push(dataset.slice(i, i + windowSize));
      }
      let X = tf.tensor(d);
      const rows = X.shape[0];
      const cols = X.shape[1];
      const inputs = X.slice([0, 0], [rows, cols - 1]); // all but last column
      const targets = X.slice([0, 1], [rows, cols - 1]); // all but first column

      return [inputs, targets];
    });
  }

  const [k, mapData] = mapInputs[0];
  const trainData = mapData['data'];
  const vocab = mapData['vocab'];
  const vocabSize = Object.keys(vocab).length;
  context.log(`vocab size: ${vocabSize}`);

  const transformer = new TransformerLM(
    vocabSize, numBlocks, numHeads, modelDims, contextSize
  );
  transformer.compile({
    optimizer: tf.train.sgd(learningRate),
    loss: 'sparseCategoricalCrossentropy',
  });
  transformer.build([batchSize, contextSize]);
  transformer.summary();

  const paramCount = transformer.countParams();
  context.log(`training a transformer with ${paramCount} parameters`);

  const tokenCount = trainData.length;
  const windowSize = contextSize + 1; // include target
  const sampleSize = tokenCount - windowSize + 1;
  for (let i = 0; i < iterations; i++) {
    for (let j = 0; j < sampleSize; j += batchSize) {
      const [inputs, targets] = toSamples(contextSize, trainData, batchSize, j);
      const loss = await transformer.trainOnBatch(inputs, targets, context);
      inputs.dispose();
      targets.dispose();
      if (j % 10 === 0) {
        context.log(`iteration ${j}, loss: ${loss}`);
        context.log(tf.memory());
      }
    }
  }

  await context.onComplete();
});
stage.timeout = 8 * 60 * 60;
execution.addStage(stage);

execution.start(jobRunner);

WebAssembly LERP C++

The following example demonstrates how to use a WebAssembly binary in a PeerMR job. It is based off of the Emscripten LERP C++ example. The binary is a simple linear interpolation function written in C++ and compiled to WebAssembly using Emscripten. PeerMR can be used to run this in a distributed manner across multiple browsers.

First, create the C++ source lerp.cpp:

#include <emscripten/bind.h>

using namespace emscripten;

float lerp(float a, float b, float t) {
    return (1 - t) * a + t * b;
}

EMSCRIPTEN_BINDINGS(my_module) {
    function("lerp", &lerp);
}

Next, compile it to WebAssembly using Emscripten:

emcc -lembind -O0 -s ENVIRONMENT=worker -s EXPORT_NAME="LerpModule" -s MODULARIZE=1 -o lerp.js lerp.cpp

-O0 optimizes the binary for size. -s ENVIRONMENT=worker sets the environment to a web worker which is the execution environment for PeerMR jobs. -s EXPORT_NAME="LerpModule" sets the name of the exported module. -s MODULARIZE=1 wraps the output in a module to not pollute the global namespace. -o lerp.js specifies the output file.

This will generate two files: lerp.js and lerp.wasm. The lerp.js file is a JavaScript wrapper around the WebAssembly binary. Both files are needed to run the WebAssembly binary in a browser. Make these files available as public URLs, for example by uploading them to a cloud storage service. In this example, we will use GCS and assume the bucket name is lerpbucket.

Next, create a PeerMR job that uses the WebAssembly binary:

const execution = new JobExecution(storageType = 'pmrfs');
execution.workerCount = 2;

// lerp.js will fetch the WASM binary
execution.scripts = ['https://storage.googleapis.com/lerpbucket/lerp.js'];

// register the URL of each WASM binary that the job needs
execution.wasmBinaries = ['https://storage.googleapis.com/lerpbucket/lerp.wasm'];

execution.setInputFn(async function input() {
  return [
    [0.0, 1.0, 0.5],
    [0.0, 1.0, 0.25],
    [0.0, 1.0, 0.75],
    [0.0, 1.0, 0.1],
    [0.0, 1.0, 0.9],
    [0.0, 1.0, 0.0],
    [0.0, 1.0, 1.0]
  ];
});
  
execution.addStage(new MapStage(async function map(x) {
  // import the LerpModule
  // use `context.getWebAssemblyURL` to specify the URL of the WASM binary
  const lerpModule = await LerpModule({
    locateFile: function (path) {
      if (path.endsWith('.wasm')) {
        return context.getWebAssemblyURL('lerp.wasm');
      }
      return path;
    }
  });

  // lerp each value
  for await (const kv of x) {
    let [k, v] = kv;
    context.emit(k, lerpModule.lerp(v[0], v[1], v[2]));
  }
  
  await context.onComplete();
}));

execution.start(jobRunner);