Direct Peer Communication

Workers can communicate directly with each other using the concept of rank along with functions sendToRank and receiveFromRank. For example, to send data to "right" neighbors and receive data from "left" neighbors in a given job context:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = [rank * 2, rank * 4, rank * 6];
  const sendDataPmrfsUrl = await context.sendToRank(rank + 1, dataTag, dataToSend);
  const receivedData = await context.receiveFromRank(rank - 1, dataTag);
  
  // ... mapper logic here
  context.onComplete();
}));

Awaiting on sendToRank before awaiting on receiveFromRank is optional. The engine can handle out of order sends/receives so the following will also work:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag';
  const dataToSend = [rank * 2, rank * 4, rank * 6];
  const s = context.sendToRank(rank + 1, dataTag, dataToSend);
  const r = await context.receiveFromRank(rank - 1, dataTag);
  const result = await(Promise.all([s, r])); // result[0] is PMRFS url of send data, result[1] is the received data
  
  // ... mapper logic here
  context.onComplete();
}));

Direct Send

By default, sendToRank and receiveFromRank will use the same method as the map reduce engine for transferring data between workers. For GCS and S3 jobs, senders copy data to GCS/S3 and receivers are notified of the resultant URLs which they then fetch. For PMRFS jobs, senders transfer through the PMRFS network first, which includes disk persistence, and receivers are notified of the resultant URLs which they then fetch. This allows sending and receiving to peers without requiring a WebRTC connection between sender and receiver. This method is also more reliable. However, it's possible to require a direct WebRTC connection between sender and receiver and avoid disk persistence by passing the require flag as options to both sendToRank and receiveFromRank. Using this method, an ArrayBuffer is required as the data type and sender will send the data directly to a receiver using a WebRTC data channel:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  const options = {'direct': true};
  await context.sendToRank(rank + 1, dataTag, dataToSend, options);
  const receivedData = await context.receiveFromRank(rank - 1, dataTag, options);
  
  // ... mapper logic here
  context.onComplete();
}));

Note that both sender and receiver must be in agreement on the direct flag.

Timeouts

receiveFromRank will timeout after 10 minutes if a corresponding sendToRank has not been sent. This timeout can be adjusted by passing a timeout flag as an option to receiveFromRank. The timeout is in milliseconds:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  await context.sendToRank(rank + 1, dataTag, dataToSend);
  const options = {'timeout': 5 * 60 * 1_000}; // 5 minute timeout
  context.receiveFromRank(rank - 1, dataTag, options)
    .then(receivedData => {
        // receive was successful
      })
    .catch(e => {
      // receive timed out
    });
  
  // ... mapper logic here
  context.onComplete();
}));

Retries

For indirect send, when a sender, call it worker A, fails and another worker, call it worker B, is selected in its place, any previous sends to failed worker A will get resent to B. So in the following example all sends/receives occur as expected even in the case of workers failing and getting replaced:

execution.addStage(new MapStage(async function map(x) {
  const rank = context.getRank();
  const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
  for(let i = 0; i < 3; i++) {
    const dataTag = 'tag' + i;
    await context.sendToRank(rank + 1, dataTag, dataToSend);
    const receivedData = await context.receiveFromRank(rank - 1, dataTag);
    // ... do something with data    
  }
  
  // ... mapper logic here
  context.onComplete();
}));

This works nicely for indirect sends because each worker keeps track of what URLs it has written and can notify a replaced client of the set in which it cares about. However, for a direct send the above example may not work as expected if a worker fails during a send or receive. This is because for direct send, senders delete their copy of sent data immediately after an acknowledgement of receipt by the receiver. Keeping these buffers in browser memory is not feasible and so resending does not occur when a node gets replaced. So it's important when using direct send to account for this using proper timeouts and retry logic.