Coverage Report

Created: 2023-06-06 06:17

/src/uWebSockets/src/Loop.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2020.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_LOOP_H
19
#define UWS_LOOP_H
20
21
/* The loop is lazily created per-thread and run with run() */
22
23
#include "LoopData.h"
24
#include <libusockets.h>
25
#include <iostream>
26
27
namespace uWS {
28
struct Loop {
29
private:
30
1.43M
    static void wakeupCb(us_loop_t *loop) {
31
1.43M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
32
33
        /* Swap current deferQueue */
34
1.43M
        loopData->deferMutex.lock();
35
1.43M
        int oldDeferQueue = loopData->currentDeferQueue;
36
1.43M
        loopData->currentDeferQueue = (loopData->currentDeferQueue + 1) % 2;
37
1.43M
        loopData->deferMutex.unlock();
38
39
        /* Drain the queue */
40
1.43M
        for (auto &x : loopData->deferQueues[oldDeferQueue]) {
41
9.43k
            x();
42
9.43k
        }
43
1.43M
        loopData->deferQueues[oldDeferQueue].clear();
44
1.43M
    }
45
46
1.74M
    static void preCb(us_loop_t *loop) {
47
1.74M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
48
49
1.74M
        for (auto &p : loopData->preHandlers) {
50
1.74M
            p.second((Loop *) loop);
51
1.74M
        }
52
1.74M
    }
53
54
1.74M
    static void postCb(us_loop_t *loop) {
55
1.74M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
56
57
1.74M
        for (auto &p : loopData->postHandlers) {
58
1.74M
            p.second((Loop *) loop);
59
1.74M
        }
60
61
        /* After every event loop iteration, we must not hold the cork buffer */
62
1.74M
        if (loopData->corkedSocket) {
63
0
            std::cerr << "Error: Cork buffer must not be held across event loop iterations!" << std::endl;
64
0
            std::terminate();
65
0
        }
66
1.74M
    }
67
68
    Loop() = delete;
69
    ~Loop() = default;
70
71
5.93k
    Loop *init() {
72
5.93k
        new (us_loop_ext((us_loop_t *) this)) LoopData;
73
5.93k
        return this;
74
5.93k
    }
75
76
5.93k
    static Loop *create(void *hint) {
77
5.93k
        Loop *loop = ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();
78
79
        /* We also need some timers (should live off the one 4 second timer rather) */
80
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((struct us_loop_t *) loop);
81
5.93k
        loopData->dateTimer = us_create_timer((struct us_loop_t *) loop, 1, sizeof(LoopData *));
82
5.93k
        memcpy(us_timer_ext(loopData->dateTimer), &loopData, sizeof(LoopData *));
83
1.44M
        us_timer_set(loopData->dateTimer, [](struct us_timer_t *t) {
84
1.44M
            LoopData *loopData;
85
1.44M
            memcpy(&loopData, us_timer_ext(t), sizeof(LoopData *));
86
1.44M
            loopData->updateDate();
87
1.44M
        }, 1000, 1000);
88
89
5.93k
        return loop;
90
5.93k
    }
91
92
    /* What to do with loops created with existingNativeLoop? */
93
    struct LoopCleaner {
94
1
        ~LoopCleaner() {
95
1
            if(loop && cleanMe) {
96
0
                loop->free();
97
0
            }
98
1
        }
99
        Loop *loop = nullptr;
100
        bool cleanMe = false;
101
    };
102
103
169k
    static LoopCleaner &getLazyLoop() {
104
169k
        static thread_local LoopCleaner lazyLoop;
105
169k
        return lazyLoop;
106
169k
    }
107
108
public:
109
    /* Lazily initializes a per-thread loop and returns it.
110
     * Will automatically free all initialized loops at exit. */
111
75.8k
    static Loop *get(void *existingNativeLoop = nullptr) {
112
75.8k
        if (!getLazyLoop().loop) {
113
            /* If we are given a native loop pointer we pass that to uSockets and let it deal with it */
114
5.93k
            if (existingNativeLoop) {
115
                /* Todo: here we want to pass the pointer, not a boolean */
116
0
                getLazyLoop().loop = create(existingNativeLoop);
117
                /* We cannot register automatic free here, must be manually done */
118
5.93k
            } else {
119
5.93k
                getLazyLoop().loop = create(nullptr);
120
5.93k
                getLazyLoop().cleanMe = true;
121
5.93k
            }
122
5.93k
        }
123
124
75.8k
        return getLazyLoop().loop;
125
75.8k
    }
126
127
    /* Freeing the default loop should be done once */
128
5.93k
    void free() {
129
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
130
131
        /* Stop and free dateTimer first */
132
5.93k
        us_timer_close(loopData->dateTimer);
133
134
5.93k
        loopData->~LoopData();
135
        /* uSockets will track whether this loop is owned by us or a borrowed alien loop */
136
5.93k
        us_loop_free((us_loop_t *) this);
137
138
        /* Reset lazyLoop */
139
5.93k
        getLazyLoop().loop = nullptr;
140
5.93k
    }
141
142
5.93k
    void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
143
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
144
145
5.93k
        loopData->postHandlers.emplace(key, std::move(handler));
146
5.93k
    }
147
148
    /* Bug: what if you remove a handler while iterating them? */
149
5.93k
    void removePostHandler(void *key) {
150
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
151
152
5.93k
        loopData->postHandlers.erase(key);
153
5.93k
    }
154
155
5.93k
    void addPreHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
156
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
157
158
5.93k
        loopData->preHandlers.emplace(key, std::move(handler));
159
5.93k
    }
160
161
    /* Bug: what if you remove a handler while iterating them? */
162
5.93k
    void removePreHandler(void *key) {
163
5.93k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
164
165
5.93k
        loopData->preHandlers.erase(key);
166
5.93k
    }
167
168
    /* Defer this callback on Loop's thread of execution */
169
10.5k
    void defer(MoveOnlyFunction<void()> &&cb) {
170
10.5k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
171
172
        //if (std::thread::get_id() == ) // todo: add fast path for same thread id
173
10.5k
        loopData->deferMutex.lock();
174
10.5k
        loopData->deferQueues[loopData->currentDeferQueue].emplace_back(std::move(cb));
175
10.5k
        loopData->deferMutex.unlock();
176
177
10.5k
        us_wakeup_loop((us_loop_t *) this);
178
10.5k
    }
179
180
    /* Actively block and run this loop */
181
5.93k
    void run() {
182
5.93k
        us_loop_run((us_loop_t *) this);
183
5.93k
    }
184
185
    /* Passively integrate with the underlying default loop */
186
    /* Used to seamlessly integrate with third parties such as Node.js */
187
0
    void integrate() {
188
0
        us_loop_integrate((us_loop_t *) this);
189
0
    }
190
191
    /* Dynamically change this */
192
5.93k
    void setSilent(bool silent) {
193
5.93k
        ((LoopData *) us_loop_ext((us_loop_t *) this))->noMark = silent;
194
5.93k
    }
195
};
196
197
/* Can be called from any thread to run the thread local loop */
198
5.93k
inline void run() {
199
5.93k
    Loop::get()->run();
200
5.93k
}
201
202
}
203
204
#endif // UWS_LOOP_H