Direct Peer Communication
Workers can communicate directly with each other using the concept of rank
along with functions sendToRank
and receiveFromRank
.
For example, to send data to "right" neighbors and receive data from "left" neighbors in a given job context:
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
const dataToSend = [rank * 2, rank * 4, rank * 6];
const sendDataPmrfsUrl = await context.sendToRank(rank + 1, dataTag, dataToSend);
const receivedData = await context.receiveFromRank(rank - 1, dataTag);
// ... mapper logic here
context.onComplete();
}));
Awaiting on sendToRank
before awaiting on receiveFromRank
is optional. The engine can handle
out of order sends/receives so the following will also work:
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
const dataTag = 'some_unique_tag';
const dataToSend = [rank * 2, rank * 4, rank * 6];
const s = context.sendToRank(rank + 1, dataTag, dataToSend);
const r = await context.receiveFromRank(rank - 1, dataTag);
const result = await(Promise.all([s, r])); // result[0] is PMRFS url of send data, result[1] is the received data
// ... mapper logic here
context.onComplete();
}));
Direct Send
By default, sendToRank
and receiveFromRank
will use the same method as the map reduce engine
for transferring data between workers. For GCS and S3 jobs, senders copy data to GCS/S3
and receivers are notified of the resultant URLs which they then fetch. For PMRFS jobs,
senders transfer through the PMRFS network first, which includes disk persistence, and receivers
are notified of the resultant URLs which they then fetch. This allows sending and receiving to
peers without requiring a WebRTC connection between sender and receiver. This method is also more
reliable. However, it's possible to require a direct WebRTC connection between sender and receiver
and avoid disk persistence by passing the require
flag as options to both sendToRank
and receiveFromRank
.
Using this method, an ArrayBuffer is required as the data type and sender will send the data directly
to a receiver using a WebRTC data channel:
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
const options = {'direct': true};
await context.sendToRank(rank + 1, dataTag, dataToSend, options);
const receivedData = await context.receiveFromRank(rank - 1, dataTag, options);
// ... mapper logic here
context.onComplete();
}));
Note that both sender and receiver must be in agreement on the direct
flag.
Timeouts
receiveFromRank
will timeout after 10 minutes if a corresponding sendToRank
has not been sent.
This timeout can be adjusted by passing a timeout
flag as an option to receiveFromRank
. The
timeout is in milliseconds:
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
const dataTag = 'some_unique_tag'; // tag is a string that is globally unique for this stage/group
const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
await context.sendToRank(rank + 1, dataTag, dataToSend);
const options = {'timeout': 5 * 60 * 1_000}; // 5 minute timeout
context.receiveFromRank(rank - 1, dataTag, options)
.then(receivedData => {
// receive was successful
})
.catch(e => {
// receive timed out
});
// ... mapper logic here
context.onComplete();
}));
Retries
For indirect send, when a sender, call it worker A
, fails and another worker,
call it worker B
, is selected in its place, any previous sends to failed worker A
will get resent to B
.
So in the following example all sends/receives occur as expected
even in the case of workers failing and getting replaced:
execution.addStage(new MapStage(async function map(x) {
const rank = context.getRank();
const dataToSend = new ArrayBuffer([10, 20, 30, 40, 50]);
for(let i = 0; i < 3; i++) {
const dataTag = 'tag' + i;
await context.sendToRank(rank + 1, dataTag, dataToSend);
const receivedData = await context.receiveFromRank(rank - 1, dataTag);
// ... do something with data
}
// ... mapper logic here
context.onComplete();
}));
This works nicely for indirect sends because each worker keeps track of what URLs it has written and can notify a replaced client of the set in which it cares about. However, for a direct send the above example may not work as expected if a worker fails during a send or receive. This is because for direct send, senders delete their copy of sent data immediately after an acknowledgement of receipt by the receiver. Keeping these buffers in browser memory is not feasible and so resending does not occur when a node gets replaced. So it's important when using direct send to account for this using proper timeouts and retry logic.