Coverage Report

Created: 2025-12-10 07:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/node/src/node_messaging.h
Line
Count
Source
1
#ifndef SRC_NODE_MESSAGING_H_
2
#define SRC_NODE_MESSAGING_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "node_mutex.h"
8
#include "v8.h"
9
#include <deque>
10
#include <string>
11
#include <unordered_map>
12
#include <set>
13
14
namespace node {
15
namespace worker {
16
17
class MessagePortData;
18
class MessagePort;
19
20
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
21
22
// Used to represent the in-flight structure of an object that is being
23
// transferred or cloned using postMessage().
24
class TransferData : public MemoryRetainer {
25
 public:
26
  // Deserialize this object on the receiving end after a .postMessage() call.
27
  // - `context` may not be the same as `env->context()`. This method should
28
  //    not produce JS objects coming from Contexts other than `context`.
29
  // - `self` is a unique_ptr for the object that this is being called on.
30
  // - The return value is treated like a `Maybe`, i.e. if `nullptr` is
31
  //   returned, any further deserialization of the message is stopped and
32
  //   control is returned to the event loop or JS as soon as possible.
33
  virtual BaseObjectPtr<BaseObject> Deserialize(
34
      Environment* env,
35
      v8::Local<v8::Context> context,
36
      std::unique_ptr<TransferData> self) = 0;
37
  // FinalizeTransferWrite() is the counterpart to
38
  // BaseObject::FinalizeTransferRead(). It is called right after the transfer
39
  // data was created, and defaults to doing nothing. After this function,
40
  // this object should not hold any more Isolate-specific data.
41
  virtual v8::Maybe<bool> FinalizeTransferWrite(
42
      v8::Local<v8::Context> context, v8::ValueSerializer* serializer);
43
};
44
45
// Represents a single communication message.
46
class Message : public MemoryRetainer {
47
 public:
48
  // Create a Message with a specific underlying payload, in the format of the
49
  // V8 ValueSerializer API. If `payload` is empty, this message indicates
50
  // that the receiving message port should close itself.
51
  explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
52
0
  ~Message() = default;
53
54
  Message(Message&& other) = default;
55
  Message& operator=(Message&& other) = default;
56
  Message& operator=(const Message&) = delete;
57
  Message(const Message&) = delete;
58
59
  // Whether this is a message indicating that the port is to be closed.
60
  // This is the last message to be received by a MessagePort.
61
  bool IsCloseMessage() const;
62
63
  // Deserialize the contained JS value. May only be called once, and only
64
  // after Serialize() has been called (e.g. by another thread).
65
  v8::MaybeLocal<v8::Value> Deserialize(
66
      Environment* env,
67
      v8::Local<v8::Context> context,
68
      v8::Local<v8::Value>* port_list = nullptr);
69
70
  // Serialize a JS value, and optionally transfer objects, into this message.
71
  // The Message object retains ownership of all transferred objects until
72
  // deserialization.
73
  // The source_port parameter, if provided, will make Serialize() throw a
74
  // "DataCloneError" DOMException if source_port is found in transfer_list.
75
  v8::Maybe<bool> Serialize(Environment* env,
76
                            v8::Local<v8::Context> context,
77
                            v8::Local<v8::Value> input,
78
                            const TransferList& transfer_list,
79
                            v8::Local<v8::Object> source_port =
80
                                v8::Local<v8::Object>());
81
82
  // Internal method of Message that is called when a new SharedArrayBuffer
83
  // object is encountered in the incoming value's structure.
84
  void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
85
  // Internal method of Message that is called once serialization finishes
86
  // and that transfers ownership of `data` to this message.
87
  void AddTransferable(std::unique_ptr<TransferData>&& data);
88
  // Internal method of Message that is called when a new WebAssembly.Module
89
  // object is encountered in the incoming value's structure.
90
  uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
91
  // Internal method of Message that is called when a shared value is
92
  // encountered for the first time in the incoming value's structure.
93
  void AdoptSharedValueConveyor(v8::SharedValueConveyor&& conveyor);
94
95
  // The host objects that will be transferred, as recorded by Serialize()
96
  // (e.g. MessagePorts).
97
  // Used for warning user about posting the target MessagePort to itself,
98
  // which will as a side effect destroy the communication channel.
99
0
  const std::vector<std::unique_ptr<TransferData>>& transferables() const {
100
0
    return transferables_;
101
0
  }
102
0
  bool has_transferables() const {
103
0
    return !transferables_.empty() || !array_buffers_.empty();
104
0
  }
105
106
  void MemoryInfo(MemoryTracker* tracker) const override;
107
108
  SET_MEMORY_INFO_NAME(Message)
109
  SET_SELF_SIZE(Message)
110
111
 private:
112
  MallocedBuffer<char> main_message_buf_;
113
  // TODO(addaleax): Make this a std::variant to save storage size in the common
114
  // case (which is that all of these vectors are empty) once that is available
115
  // with C++17.
116
  std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
117
  std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
118
  std::vector<std::unique_ptr<TransferData>> transferables_;
119
  std::vector<v8::CompiledWasmModule> wasm_modules_;
120
  std::optional<v8::SharedValueConveyor> shared_value_conveyor_;
121
122
  friend class MessagePort;
123
};
124
125
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
126
 public:
127
  // Named SiblingGroup, Used for one-to-many BroadcastChannels.
128
  static std::shared_ptr<SiblingGroup> Get(const std::string& name);
129
130
  // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
131
0
  SiblingGroup() = default;
132
  explicit SiblingGroup(const std::string& name);
133
  ~SiblingGroup();
134
135
  // Dispatches the Message to the collection of associated
136
  // ports. If there is more than one destination port and
137
  // the Message contains transferables, Dispatch will fail.
138
  // Returns Just(true) if successful and the message was
139
  // dispatched to at least one destination. Returns Just(false)
140
  // if there were no destinations. Returns Nothing<bool>()
141
  // if there was an error. If error is not nullptr, it will
142
  // be set to an error message or warning message as appropriate.
143
  v8::Maybe<bool> Dispatch(
144
      MessagePortData* source,
145
      std::shared_ptr<Message> message,
146
      std::string* error = nullptr);
147
148
  void Entangle(MessagePortData* data);
149
  void Entangle(std::initializer_list<MessagePortData*> data);
150
  void Disentangle(MessagePortData* data);
151
152
0
  const std::string& name() const { return name_; }
153
154
0
  size_t size() const { return ports_.size(); }
155
156
 private:
157
  const std::string name_;
158
  RwLock group_mutex_;  // Protects ports_.
159
  std::set<MessagePortData*> ports_;
160
161
  static void CheckSiblingGroup(const std::string& name);
162
163
  using Map =
164
      std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
165
166
  static Mutex groups_mutex_;
167
  static Map groups_;
168
};
169
170
// This contains all data for a `MessagePort` instance that is not tied to
171
// a specific Environment/Isolate/event loop, for easier transfer between those.
172
class MessagePortData : public TransferData {
173
 public:
174
  explicit MessagePortData(MessagePort* owner);
175
  ~MessagePortData() override;
176
177
  MessagePortData(MessagePortData&& other) = delete;
178
  MessagePortData& operator=(MessagePortData&& other) = delete;
179
  MessagePortData(const MessagePortData& other) = delete;
180
  MessagePortData& operator=(const MessagePortData& other) = delete;
181
182
  // Add a message to the incoming queue and notify the receiver.
183
  // This may be called from any thread.
184
  void AddToIncomingQueue(std::shared_ptr<Message> message);
185
  v8::Maybe<bool> Dispatch(
186
      std::shared_ptr<Message> message,
187
      std::string* error = nullptr);
188
189
  // Turns `a` and `b` into siblings, i.e. connects the sending side of one
190
  // to the receiving side of the other. This is not thread-safe.
191
  static void Entangle(MessagePortData* a, MessagePortData* b);
192
193
  // Removes any possible sibling. This is thread-safe (it acquires both
194
  // `sibling_mutex_` and `mutex_`), and has to be because it is called once
195
  // the corresponding JS handle handle wants to close
196
  // which can happen on either side of a worker.
197
  void Disentangle();
198
199
  void MemoryInfo(MemoryTracker* tracker) const override;
200
  BaseObjectPtr<BaseObject> Deserialize(
201
      Environment* env,
202
      v8::Local<v8::Context> context,
203
      std::unique_ptr<TransferData> self) override;
204
205
  SET_MEMORY_INFO_NAME(MessagePortData)
206
  SET_SELF_SIZE(MessagePortData)
207
208
 private:
209
  // This mutex protects all fields below it, with the exception of
210
  // sibling_.
211
  mutable Mutex mutex_;
212
  // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr>
213
  // once that is available with C++17, because std::shared_ptr comes with
214
  // overhead that is only necessary for BroadcastChannel.
215
  std::deque<std::shared_ptr<Message>> incoming_messages_;
216
  MessagePort* owner_ = nullptr;
217
  std::shared_ptr<SiblingGroup> group_;
218
  friend class MessagePort;
219
  friend class SiblingGroup;
220
};
221
222
// A message port that receives messages from other threads, including
223
// the uv_async_t handle that is used to notify the current event loop of
224
// new incoming messages.
225
class MessagePort : public HandleWrap {
226
 private:
227
  // Create a new MessagePort. The `context` argument specifies the Context
228
  // instance that is used for creating the values emitted from this port.
229
  // This is called by MessagePort::New(), which is the public API used for
230
  // creating MessagePort instances.
231
  MessagePort(Environment* env,
232
              v8::Local<v8::Context> context,
233
              v8::Local<v8::Object> wrap);
234
235
 public:
236
  ~MessagePort() override;
237
238
  // Create a new message port instance, optionally over an existing
239
  // `MessagePortData` object.
240
  static MessagePort* New(Environment* env,
241
                          v8::Local<v8::Context> context,
242
                          std::unique_ptr<MessagePortData> data = {},
243
                          std::shared_ptr<SiblingGroup> sibling_group = {});
244
245
  // Send a message, i.e. deliver it into the sibling's incoming queue.
246
  // If this port is closed, or if there is no sibling, this message is
247
  // serialized with transfers, then silently discarded.
248
  v8::Maybe<bool> PostMessage(Environment* env,
249
                              v8::Local<v8::Context> context,
250
                              v8::Local<v8::Value> message,
251
                              const TransferList& transfer);
252
253
  // Start processing messages on this port as a receiving end.
254
  void Start();
255
  // Stop processing messages on this port as a receiving end.
256
  void Stop();
257
258
  /* constructor */
259
  static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
260
  /* prototype methods */
261
  static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
262
  static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
263
  static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
264
  static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
265
  static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
266
267
  /* static */
268
  static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
269
270
  // Turns `a` and `b` into siblings, i.e. connects the sending side of one
271
  // to the receiving side of the other. This is not thread-safe.
272
  static void Entangle(MessagePort* a, MessagePort* b);
273
  static void Entangle(MessagePort* a, MessagePortData* b);
274
275
  // Detach this port's data for transferring. After this, the MessagePortData
276
  // is no longer associated with this handle, although it can still receive
277
  // messages.
278
  std::unique_ptr<MessagePortData> Detach();
279
280
  void Close(
281
      v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
282
283
  // Returns true if either data_ has been freed, or if the handle is being
284
  // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
285
  //
286
  // If checking if a JavaScript MessagePort object is detached, this method
287
  // alone is often not enough, since the backing C++ MessagePort object may
288
  // have been deleted already. For all intents and purposes, an object with a
289
  // NULL pointer to the C++ MessagePort object is also detached.
290
  inline bool IsDetached() const;
291
292
  BaseObject::TransferMode GetTransferMode() const override;
293
  std::unique_ptr<TransferData> TransferForMessaging() override;
294
295
  void MemoryInfo(MemoryTracker* tracker) const override;
296
  SET_MEMORY_INFO_NAME(MessagePort)
297
  SET_SELF_SIZE(MessagePort)
298
299
 private:
300
  enum class MessageProcessingMode {
301
    kNormalOperation,
302
    kForceReadMessages
303
  };
304
305
  void OnClose() override;
306
  void OnMessage(MessageProcessingMode mode);
307
  void TriggerAsync();
308
  v8::MaybeLocal<v8::Value> ReceiveMessage(
309
      v8::Local<v8::Context> context,
310
      MessageProcessingMode mode,
311
      v8::Local<v8::Value>* port_list = nullptr);
312
313
  std::unique_ptr<MessagePortData> data_ = nullptr;
314
  bool receiving_messages_ = false;
315
  uv_async_t async_;
316
  v8::Global<v8::Function> emit_message_fn_;
317
318
  friend class MessagePortData;
319
};
320
321
// Provide a wrapper class created when a built-in JS classes that being
322
// transferable or cloneable by postMessage().
323
// See e.g. FileHandle in internal/fs/promises.js for an example.
324
class JSTransferable : public BaseObject {
325
 public:
326
  static BaseObjectPtr<JSTransferable> Wrap(Environment* env,
327
                                            v8::Local<v8::Object> target);
328
  static bool IsJSTransferable(Environment* env,
329
                               v8::Local<v8::Context> context,
330
                               v8::Local<v8::Object> object);
331
332
  JSTransferable(Environment* env,
333
                 v8::Local<v8::Object> obj,
334
                 v8::Local<v8::Object> target);
335
  ~JSTransferable();
336
337
  BaseObject::TransferMode GetTransferMode() const override;
338
  std::unique_ptr<TransferData> TransferForMessaging() override;
339
  std::unique_ptr<TransferData> CloneForMessaging() const override;
340
  v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>>
341
      NestedTransferables() const override;
342
  v8::Maybe<void> FinalizeTransferRead(
343
      v8::Local<v8::Context> context,
344
      v8::ValueDeserializer* deserializer) override;
345
346
  SET_NO_MEMORY_INFO()
347
  SET_MEMORY_INFO_NAME(JSTransferable)
348
  SET_SELF_SIZE(JSTransferable)
349
350
  v8::Local<v8::Object> target() const;
351
352
 private:
353
  template <TransferMode mode>
354
  std::unique_ptr<TransferData> TransferOrClone() const;
355
356
  v8::Global<v8::Object> target_;
357
358
  class Data : public TransferData {
359
   public:
360
    Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data);
361
362
    BaseObjectPtr<BaseObject> Deserialize(
363
        Environment* env,
364
        v8::Local<v8::Context> context,
365
        std::unique_ptr<TransferData> self) override;
366
    v8::Maybe<bool> FinalizeTransferWrite(
367
        v8::Local<v8::Context> context,
368
        v8::ValueSerializer* serializer) override;
369
370
    SET_NO_MEMORY_INFO()
371
    SET_MEMORY_INFO_NAME(JSTransferableTransferData)
372
    SET_SELF_SIZE(Data)
373
374
   private:
375
    std::string deserialize_info_;
376
    v8::Global<v8::Value> data_;
377
  };
378
};
379
380
v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate(
381
    IsolateData* isolate_data);
382
383
}  // namespace worker
384
}  // namespace node
385
386
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
387
388
389
#endif  // SRC_NODE_MESSAGING_H_