/src/node/src/node_messaging.h
Line | Count | Source |
1 | | #ifndef SRC_NODE_MESSAGING_H_ |
2 | | #define SRC_NODE_MESSAGING_H_ |
3 | | |
4 | | #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
5 | | |
6 | | #include "env.h" |
7 | | #include "node_mutex.h" |
8 | | #include "v8.h" |
9 | | #include <deque> |
10 | | #include <string> |
11 | | #include <unordered_map> |
12 | | #include <set> |
13 | | |
14 | | namespace node { |
15 | | namespace worker { |
16 | | |
17 | | class MessagePortData; |
18 | | class MessagePort; |
19 | | |
20 | | typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList; |
21 | | |
22 | | // Used to represent the in-flight structure of an object that is being |
23 | | // transferred or cloned using postMessage(). |
24 | | class TransferData : public MemoryRetainer { |
25 | | public: |
26 | | // Deserialize this object on the receiving end after a .postMessage() call. |
27 | | // - `context` may not be the same as `env->context()`. This method should |
28 | | // not produce JS objects coming from Contexts other than `context`. |
29 | | // - `self` is a unique_ptr for the object that this is being called on. |
30 | | // - The return value is treated like a `Maybe`, i.e. if `nullptr` is |
31 | | // returned, any further deserialization of the message is stopped and |
32 | | // control is returned to the event loop or JS as soon as possible. |
33 | | virtual BaseObjectPtr<BaseObject> Deserialize( |
34 | | Environment* env, |
35 | | v8::Local<v8::Context> context, |
36 | | std::unique_ptr<TransferData> self) = 0; |
37 | | // FinalizeTransferWrite() is the counterpart to |
38 | | // BaseObject::FinalizeTransferRead(). It is called right after the transfer |
39 | | // data was created, and defaults to doing nothing. After this function, |
40 | | // this object should not hold any more Isolate-specific data. |
41 | | virtual v8::Maybe<bool> FinalizeTransferWrite( |
42 | | v8::Local<v8::Context> context, v8::ValueSerializer* serializer); |
43 | | }; |
44 | | |
45 | | // Represents a single communication message. |
46 | | class Message : public MemoryRetainer { |
47 | | public: |
48 | | // Create a Message with a specific underlying payload, in the format of the |
49 | | // V8 ValueSerializer API. If `payload` is empty, this message indicates |
50 | | // that the receiving message port should close itself. |
51 | | explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); |
52 | 0 | ~Message() = default; |
53 | | |
54 | | Message(Message&& other) = default; |
55 | | Message& operator=(Message&& other) = default; |
56 | | Message& operator=(const Message&) = delete; |
57 | | Message(const Message&) = delete; |
58 | | |
59 | | // Whether this is a message indicating that the port is to be closed. |
60 | | // This is the last message to be received by a MessagePort. |
61 | | bool IsCloseMessage() const; |
62 | | |
63 | | // Deserialize the contained JS value. May only be called once, and only |
64 | | // after Serialize() has been called (e.g. by another thread). |
65 | | v8::MaybeLocal<v8::Value> Deserialize( |
66 | | Environment* env, |
67 | | v8::Local<v8::Context> context, |
68 | | v8::Local<v8::Value>* port_list = nullptr); |
69 | | |
70 | | // Serialize a JS value, and optionally transfer objects, into this message. |
71 | | // The Message object retains ownership of all transferred objects until |
72 | | // deserialization. |
73 | | // The source_port parameter, if provided, will make Serialize() throw a |
74 | | // "DataCloneError" DOMException if source_port is found in transfer_list. |
75 | | v8::Maybe<bool> Serialize(Environment* env, |
76 | | v8::Local<v8::Context> context, |
77 | | v8::Local<v8::Value> input, |
78 | | const TransferList& transfer_list, |
79 | | v8::Local<v8::Object> source_port = |
80 | | v8::Local<v8::Object>()); |
81 | | |
82 | | // Internal method of Message that is called when a new SharedArrayBuffer |
83 | | // object is encountered in the incoming value's structure. |
84 | | void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); |
85 | | // Internal method of Message that is called once serialization finishes |
86 | | // and that transfers ownership of `data` to this message. |
87 | | void AddTransferable(std::unique_ptr<TransferData>&& data); |
88 | | // Internal method of Message that is called when a new WebAssembly.Module |
89 | | // object is encountered in the incoming value's structure. |
90 | | uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); |
91 | | // Internal method of Message that is called when a shared value is |
92 | | // encountered for the first time in the incoming value's structure. |
93 | | void AdoptSharedValueConveyor(v8::SharedValueConveyor&& conveyor); |
94 | | |
95 | | // The host objects that will be transferred, as recorded by Serialize() |
96 | | // (e.g. MessagePorts). |
97 | | // Used for warning user about posting the target MessagePort to itself, |
98 | | // which will as a side effect destroy the communication channel. |
99 | 0 | const std::vector<std::unique_ptr<TransferData>>& transferables() const { |
100 | 0 | return transferables_; |
101 | 0 | } |
102 | 0 | bool has_transferables() const { |
103 | 0 | return !transferables_.empty() || !array_buffers_.empty(); |
104 | 0 | } |
105 | | |
106 | | void MemoryInfo(MemoryTracker* tracker) const override; |
107 | | |
108 | | SET_MEMORY_INFO_NAME(Message) |
109 | | SET_SELF_SIZE(Message) |
110 | | |
111 | | private: |
112 | | MallocedBuffer<char> main_message_buf_; |
113 | | // TODO(addaleax): Make this a std::variant to save storage size in the common |
114 | | // case (which is that all of these vectors are empty) once that is available |
115 | | // with C++17. |
116 | | std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; |
117 | | std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; |
118 | | std::vector<std::unique_ptr<TransferData>> transferables_; |
119 | | std::vector<v8::CompiledWasmModule> wasm_modules_; |
120 | | std::optional<v8::SharedValueConveyor> shared_value_conveyor_; |
121 | | |
122 | | friend class MessagePort; |
123 | | }; |
124 | | |
125 | | class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> { |
126 | | public: |
127 | | // Named SiblingGroup, Used for one-to-many BroadcastChannels. |
128 | | static std::shared_ptr<SiblingGroup> Get(const std::string& name); |
129 | | |
130 | | // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. |
131 | 0 | SiblingGroup() = default; |
132 | | explicit SiblingGroup(const std::string& name); |
133 | | ~SiblingGroup(); |
134 | | |
135 | | // Dispatches the Message to the collection of associated |
136 | | // ports. If there is more than one destination port and |
137 | | // the Message contains transferables, Dispatch will fail. |
138 | | // Returns Just(true) if successful and the message was |
139 | | // dispatched to at least one destination. Returns Just(false) |
140 | | // if there were no destinations. Returns Nothing<bool>() |
141 | | // if there was an error. If error is not nullptr, it will |
142 | | // be set to an error message or warning message as appropriate. |
143 | | v8::Maybe<bool> Dispatch( |
144 | | MessagePortData* source, |
145 | | std::shared_ptr<Message> message, |
146 | | std::string* error = nullptr); |
147 | | |
148 | | void Entangle(MessagePortData* data); |
149 | | void Entangle(std::initializer_list<MessagePortData*> data); |
150 | | void Disentangle(MessagePortData* data); |
151 | | |
152 | 0 | const std::string& name() const { return name_; } |
153 | | |
154 | 0 | size_t size() const { return ports_.size(); } |
155 | | |
156 | | private: |
157 | | const std::string name_; |
158 | | RwLock group_mutex_; // Protects ports_. |
159 | | std::set<MessagePortData*> ports_; |
160 | | |
161 | | static void CheckSiblingGroup(const std::string& name); |
162 | | |
163 | | using Map = |
164 | | std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; |
165 | | |
166 | | static Mutex groups_mutex_; |
167 | | static Map groups_; |
168 | | }; |
169 | | |
170 | | // This contains all data for a `MessagePort` instance that is not tied to |
171 | | // a specific Environment/Isolate/event loop, for easier transfer between those. |
172 | | class MessagePortData : public TransferData { |
173 | | public: |
174 | | explicit MessagePortData(MessagePort* owner); |
175 | | ~MessagePortData() override; |
176 | | |
177 | | MessagePortData(MessagePortData&& other) = delete; |
178 | | MessagePortData& operator=(MessagePortData&& other) = delete; |
179 | | MessagePortData(const MessagePortData& other) = delete; |
180 | | MessagePortData& operator=(const MessagePortData& other) = delete; |
181 | | |
182 | | // Add a message to the incoming queue and notify the receiver. |
183 | | // This may be called from any thread. |
184 | | void AddToIncomingQueue(std::shared_ptr<Message> message); |
185 | | v8::Maybe<bool> Dispatch( |
186 | | std::shared_ptr<Message> message, |
187 | | std::string* error = nullptr); |
188 | | |
189 | | // Turns `a` and `b` into siblings, i.e. connects the sending side of one |
190 | | // to the receiving side of the other. This is not thread-safe. |
191 | | static void Entangle(MessagePortData* a, MessagePortData* b); |
192 | | |
193 | | // Removes any possible sibling. This is thread-safe (it acquires both |
194 | | // `sibling_mutex_` and `mutex_`), and has to be because it is called once |
195 | | // the corresponding JS handle handle wants to close |
196 | | // which can happen on either side of a worker. |
197 | | void Disentangle(); |
198 | | |
199 | | void MemoryInfo(MemoryTracker* tracker) const override; |
200 | | BaseObjectPtr<BaseObject> Deserialize( |
201 | | Environment* env, |
202 | | v8::Local<v8::Context> context, |
203 | | std::unique_ptr<TransferData> self) override; |
204 | | |
205 | | SET_MEMORY_INFO_NAME(MessagePortData) |
206 | | SET_SELF_SIZE(MessagePortData) |
207 | | |
208 | | private: |
209 | | // This mutex protects all fields below it, with the exception of |
210 | | // sibling_. |
211 | | mutable Mutex mutex_; |
212 | | // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr> |
213 | | // once that is available with C++17, because std::shared_ptr comes with |
214 | | // overhead that is only necessary for BroadcastChannel. |
215 | | std::deque<std::shared_ptr<Message>> incoming_messages_; |
216 | | MessagePort* owner_ = nullptr; |
217 | | std::shared_ptr<SiblingGroup> group_; |
218 | | friend class MessagePort; |
219 | | friend class SiblingGroup; |
220 | | }; |
221 | | |
222 | | // A message port that receives messages from other threads, including |
223 | | // the uv_async_t handle that is used to notify the current event loop of |
224 | | // new incoming messages. |
225 | | class MessagePort : public HandleWrap { |
226 | | private: |
227 | | // Create a new MessagePort. The `context` argument specifies the Context |
228 | | // instance that is used for creating the values emitted from this port. |
229 | | // This is called by MessagePort::New(), which is the public API used for |
230 | | // creating MessagePort instances. |
231 | | MessagePort(Environment* env, |
232 | | v8::Local<v8::Context> context, |
233 | | v8::Local<v8::Object> wrap); |
234 | | |
235 | | public: |
236 | | ~MessagePort() override; |
237 | | |
238 | | // Create a new message port instance, optionally over an existing |
239 | | // `MessagePortData` object. |
240 | | static MessagePort* New(Environment* env, |
241 | | v8::Local<v8::Context> context, |
242 | | std::unique_ptr<MessagePortData> data = {}, |
243 | | std::shared_ptr<SiblingGroup> sibling_group = {}); |
244 | | |
245 | | // Send a message, i.e. deliver it into the sibling's incoming queue. |
246 | | // If this port is closed, or if there is no sibling, this message is |
247 | | // serialized with transfers, then silently discarded. |
248 | | v8::Maybe<bool> PostMessage(Environment* env, |
249 | | v8::Local<v8::Context> context, |
250 | | v8::Local<v8::Value> message, |
251 | | const TransferList& transfer); |
252 | | |
253 | | // Start processing messages on this port as a receiving end. |
254 | | void Start(); |
255 | | // Stop processing messages on this port as a receiving end. |
256 | | void Stop(); |
257 | | |
258 | | /* constructor */ |
259 | | static void New(const v8::FunctionCallbackInfo<v8::Value>& args); |
260 | | /* prototype methods */ |
261 | | static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
262 | | static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); |
263 | | static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); |
264 | | static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); |
265 | | static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); |
266 | | |
267 | | /* static */ |
268 | | static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); |
269 | | |
270 | | // Turns `a` and `b` into siblings, i.e. connects the sending side of one |
271 | | // to the receiving side of the other. This is not thread-safe. |
272 | | static void Entangle(MessagePort* a, MessagePort* b); |
273 | | static void Entangle(MessagePort* a, MessagePortData* b); |
274 | | |
275 | | // Detach this port's data for transferring. After this, the MessagePortData |
276 | | // is no longer associated with this handle, although it can still receive |
277 | | // messages. |
278 | | std::unique_ptr<MessagePortData> Detach(); |
279 | | |
280 | | void Close( |
281 | | v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; |
282 | | |
283 | | // Returns true if either data_ has been freed, or if the handle is being |
284 | | // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. |
285 | | // |
286 | | // If checking if a JavaScript MessagePort object is detached, this method |
287 | | // alone is often not enough, since the backing C++ MessagePort object may |
288 | | // have been deleted already. For all intents and purposes, an object with a |
289 | | // NULL pointer to the C++ MessagePort object is also detached. |
290 | | inline bool IsDetached() const; |
291 | | |
292 | | BaseObject::TransferMode GetTransferMode() const override; |
293 | | std::unique_ptr<TransferData> TransferForMessaging() override; |
294 | | |
295 | | void MemoryInfo(MemoryTracker* tracker) const override; |
296 | | SET_MEMORY_INFO_NAME(MessagePort) |
297 | | SET_SELF_SIZE(MessagePort) |
298 | | |
299 | | private: |
300 | | enum class MessageProcessingMode { |
301 | | kNormalOperation, |
302 | | kForceReadMessages |
303 | | }; |
304 | | |
305 | | void OnClose() override; |
306 | | void OnMessage(MessageProcessingMode mode); |
307 | | void TriggerAsync(); |
308 | | v8::MaybeLocal<v8::Value> ReceiveMessage( |
309 | | v8::Local<v8::Context> context, |
310 | | MessageProcessingMode mode, |
311 | | v8::Local<v8::Value>* port_list = nullptr); |
312 | | |
313 | | std::unique_ptr<MessagePortData> data_ = nullptr; |
314 | | bool receiving_messages_ = false; |
315 | | uv_async_t async_; |
316 | | v8::Global<v8::Function> emit_message_fn_; |
317 | | |
318 | | friend class MessagePortData; |
319 | | }; |
320 | | |
321 | | // Provide a wrapper class created when a built-in JS classes that being |
322 | | // transferable or cloneable by postMessage(). |
323 | | // See e.g. FileHandle in internal/fs/promises.js for an example. |
324 | | class JSTransferable : public BaseObject { |
325 | | public: |
326 | | static BaseObjectPtr<JSTransferable> Wrap(Environment* env, |
327 | | v8::Local<v8::Object> target); |
328 | | static bool IsJSTransferable(Environment* env, |
329 | | v8::Local<v8::Context> context, |
330 | | v8::Local<v8::Object> object); |
331 | | |
332 | | JSTransferable(Environment* env, |
333 | | v8::Local<v8::Object> obj, |
334 | | v8::Local<v8::Object> target); |
335 | | ~JSTransferable(); |
336 | | |
337 | | BaseObject::TransferMode GetTransferMode() const override; |
338 | | std::unique_ptr<TransferData> TransferForMessaging() override; |
339 | | std::unique_ptr<TransferData> CloneForMessaging() const override; |
340 | | v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>> |
341 | | NestedTransferables() const override; |
342 | | v8::Maybe<void> FinalizeTransferRead( |
343 | | v8::Local<v8::Context> context, |
344 | | v8::ValueDeserializer* deserializer) override; |
345 | | |
346 | | SET_NO_MEMORY_INFO() |
347 | | SET_MEMORY_INFO_NAME(JSTransferable) |
348 | | SET_SELF_SIZE(JSTransferable) |
349 | | |
350 | | v8::Local<v8::Object> target() const; |
351 | | |
352 | | private: |
353 | | template <TransferMode mode> |
354 | | std::unique_ptr<TransferData> TransferOrClone() const; |
355 | | |
356 | | v8::Global<v8::Object> target_; |
357 | | |
358 | | class Data : public TransferData { |
359 | | public: |
360 | | Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data); |
361 | | |
362 | | BaseObjectPtr<BaseObject> Deserialize( |
363 | | Environment* env, |
364 | | v8::Local<v8::Context> context, |
365 | | std::unique_ptr<TransferData> self) override; |
366 | | v8::Maybe<bool> FinalizeTransferWrite( |
367 | | v8::Local<v8::Context> context, |
368 | | v8::ValueSerializer* serializer) override; |
369 | | |
370 | | SET_NO_MEMORY_INFO() |
371 | | SET_MEMORY_INFO_NAME(JSTransferableTransferData) |
372 | | SET_SELF_SIZE(Data) |
373 | | |
374 | | private: |
375 | | std::string deserialize_info_; |
376 | | v8::Global<v8::Value> data_; |
377 | | }; |
378 | | }; |
379 | | |
380 | | v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate( |
381 | | IsolateData* isolate_data); |
382 | | |
383 | | } // namespace worker |
384 | | } // namespace node |
385 | | |
386 | | #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
387 | | |
388 | | |
389 | | #endif // SRC_NODE_MESSAGING_H_ |