Coverage Report

Created: 2025-07-04 06:41

/src/uWebSockets/src/Loop.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2020.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_LOOP_H
19
#define UWS_LOOP_H
20
21
/* The loop is lazily created per-thread and run with run() */
22
23
#include "LoopData.h"
24
#include <libusockets.h>
25
#include <iostream>
26
27
namespace uWS {
28
29
/* A prepared message is dependent on the Loop, so it belongs here */
30
struct PreparedMessage {
31
    /* These should be a single alloation along with the PreparedMessage itself (they are static) */
32
    std::string originalMessage, compressedMessage;
33
    bool compressed;
34
    int opCode;
35
};
36
37
struct Loop {
38
private:
39
12.9M
    static void wakeupCb(us_loop_t *loop) {
40
12.9M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
41
42
        /* Swap current deferQueue */
43
12.9M
        loopData->deferMutex.lock();
44
12.9M
        int oldDeferQueue = loopData->currentDeferQueue;
45
12.9M
        loopData->currentDeferQueue = (loopData->currentDeferQueue + 1) % 2;
46
12.9M
        loopData->deferMutex.unlock();
47
48
        /* Drain the queue */
49
12.9M
        for (auto &x : loopData->deferQueues[oldDeferQueue]) {
50
31.6k
            x();
51
31.6k
        }
52
12.9M
        loopData->deferQueues[oldDeferQueue].clear();
53
12.9M
    }
54
55
16.0M
    static void preCb(us_loop_t *loop) {
56
16.0M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
57
58
16.0M
        for (auto &p : loopData->preHandlers) {
59
12.8M
            p.second((Loop *) loop);
60
12.8M
        }
61
16.0M
    }
62
63
16.0M
    static void postCb(us_loop_t *loop) {
64
16.0M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
65
66
16.0M
        for (auto &p : loopData->postHandlers) {
67
12.8M
            p.second((Loop *) loop);
68
12.8M
        }
69
70
        /* After every event loop iteration, we must not hold the cork buffer */
71
16.0M
        if (loopData->corkedSocket) {
72
0
            std::cerr << "Error: Cork buffer must not be held across event loop iterations!" << std::endl;
73
0
            std::terminate();
74
0
        }
75
16.0M
    }
76
77
    Loop() = delete;
78
    ~Loop() = default;
79
80
20.7k
    Loop *init() {
81
20.7k
        new (us_loop_ext((us_loop_t *) this)) LoopData;
82
20.7k
        return this;
83
20.7k
    }
84
85
20.7k
    static Loop *create(void *hint) {
86
20.7k
        Loop *loop = ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();
87
88
        /* We also need some timers (should live off the one 4 second timer rather) */
89
20.7k
        LoopData *loopData = (LoopData *) us_loop_ext((struct us_loop_t *) loop);
90
20.7k
        loopData->dateTimer = us_create_timer((struct us_loop_t *) loop, 1, sizeof(LoopData *));
91
20.7k
        memcpy(us_timer_ext(loopData->dateTimer), &loopData, sizeof(LoopData *));
92
12.9M
        us_timer_set(loopData->dateTimer, [](struct us_timer_t *t) {
93
12.9M
            LoopData *loopData;
94
12.9M
            memcpy(&loopData, us_timer_ext(t), sizeof(LoopData *));
95
12.9M
            loopData->updateDate();
96
12.9M
        }, 1000, 1000);
97
98
20.7k
        return loop;
99
20.7k
    }
100
101
    /* What to do with loops created with existingNativeLoop? */
102
    struct LoopCleaner {
103
4
        ~LoopCleaner() {
104
4
            if(loop && cleanMe) {
105
0
                loop->free();
106
0
            }
107
4
        }
108
        Loop *loop = nullptr;
109
        bool cleanMe = false;
110
    };
111
112
486k
    static LoopCleaner &getLazyLoop() {
113
486k
        static thread_local LoopCleaner lazyLoop;
114
486k
        return lazyLoop;
115
486k
    }
116
117
public:
118
119
    /* Preformatted messages need the Loop */
120
0
    PreparedMessage prepareMessage(std::string_view message, int opCode, bool compress = true) {
121
0
        /* The message could be formatted right here, but this optimization is not done yet */
122
0
        PreparedMessage preparedMessage;
123
0
        preparedMessage.compressed = compress;
124
0
        preparedMessage.opCode = opCode;
125
0
        preparedMessage.originalMessage = message;
126
0
127
0
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
128
0
129
0
        if (compress) {
130
0
            /* Initialize loop's deflate inflate streams */
131
0
            if (!loopData->zlibContext) {
132
0
                loopData->zlibContext = new ZlibContext;
133
0
                loopData->inflationStream = new InflationStream(CompressOptions::DEDICATED_DECOMPRESSOR);
134
0
                loopData->deflationStream = new DeflationStream(CompressOptions::DEDICATED_COMPRESSOR);
135
0
            }
136
0
137
0
            preparedMessage.compressedMessage = loopData->deflationStream->deflate(loopData->zlibContext, {preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, true);
138
0
        }
139
0
140
0
        return preparedMessage;
141
0
    }
142
143
    /* Lazily initializes a per-thread loop and returns it.
144
     * Will automatically free all initialized loops at exit. */
145
211k
    static Loop *get(void *existingNativeLoop = nullptr) {
146
211k
        if (!getLazyLoop().loop) {
147
            /* If we are given a native loop pointer we pass that to uSockets and let it deal with it */
148
20.7k
            if (existingNativeLoop) {
149
                /* Todo: here we want to pass the pointer, not a boolean */
150
0
                getLazyLoop().loop = create(existingNativeLoop);
151
                /* We cannot register automatic free here, must be manually done */
152
20.7k
            } else {
153
20.7k
                getLazyLoop().loop = create(nullptr);
154
20.7k
                getLazyLoop().cleanMe = true;
155
20.7k
            }
156
20.7k
        }
157
158
211k
        return getLazyLoop().loop;
159
211k
    }
160
161
    /* Freeing the default loop should be done once */
162
20.7k
    void free() {
163
20.7k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
164
165
        /* Stop and free dateTimer first */
166
20.7k
        us_timer_close(loopData->dateTimer);
167
168
20.7k
        loopData->~LoopData();
169
        /* uSockets will track whether this loop is owned by us or a borrowed alien loop */
170
20.7k
        us_loop_free((us_loop_t *) this);
171
172
        /* Reset lazyLoop */
173
20.7k
        getLazyLoop().loop = nullptr;
174
20.7k
    }
175
176
17.6k
    void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
177
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
178
179
17.6k
        loopData->postHandlers.emplace(key, std::move(handler));
180
17.6k
    }
181
182
    /* Bug: what if you remove a handler while iterating them? */
183
17.6k
    void removePostHandler(void *key) {
184
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
185
186
17.6k
        loopData->postHandlers.erase(key);
187
17.6k
    }
188
189
17.6k
    void addPreHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
190
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
191
192
17.6k
        loopData->preHandlers.emplace(key, std::move(handler));
193
17.6k
    }
194
195
    /* Bug: what if you remove a handler while iterating them? */
196
17.6k
    void removePreHandler(void *key) {
197
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
198
199
17.6k
        loopData->preHandlers.erase(key);
200
17.6k
    }
201
202
    /* Defer this callback on Loop's thread of execution */
203
36.7k
    void defer(MoveOnlyFunction<void()> &&cb) {
204
36.7k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
205
206
        //if (std::thread::get_id() == ) // todo: add fast path for same thread id
207
36.7k
        loopData->deferMutex.lock();
208
36.7k
        loopData->deferQueues[loopData->currentDeferQueue].emplace_back(std::move(cb));
209
36.7k
        loopData->deferMutex.unlock();
210
211
36.7k
        us_wakeup_loop((us_loop_t *) this);
212
36.7k
    }
213
214
    /* Actively block and run this loop */
215
20.7k
    void run() {
216
20.7k
        us_loop_run((us_loop_t *) this);
217
20.7k
    }
218
219
    /* Passively integrate with the underlying default loop */
220
    /* Used to seamlessly integrate with third parties such as Node.js */
221
0
    void integrate() {
222
0
        us_loop_integrate((us_loop_t *) this);
223
0
    }
224
225
    /* Dynamically change this */
226
6.08k
    void setSilent(bool silent) {
227
6.08k
        ((LoopData *) us_loop_ext((us_loop_t *) this))->noMark = silent;
228
6.08k
    }
229
};
230
231
/* Can be called from any thread to run the thread local loop */
232
20.7k
inline void run() {
233
20.7k
    Loop::get()->run();
234
20.7k
}
235
236
}
237
238
#endif // UWS_LOOP_H