Coverage Report

Created: 2024-04-25 06:10

/src/uWebSockets/src/Loop.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Authored by Alex Hultman, 2018-2020.
3
 * Intellectual property of third-party.
4
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
18
#ifndef UWS_LOOP_H
19
#define UWS_LOOP_H
20
21
/* The loop is lazily created per-thread and run with run() */
22
23
#include "LoopData.h"
24
#include <libusockets.h>
25
#include <iostream>
26
27
namespace uWS {
28
struct Loop {
29
private:
30
5.85M
    static void wakeupCb(us_loop_t *loop) {
31
5.85M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
32
33
        /* Swap current deferQueue */
34
5.85M
        loopData->deferMutex.lock();
35
5.85M
        int oldDeferQueue = loopData->currentDeferQueue;
36
5.85M
        loopData->currentDeferQueue = (loopData->currentDeferQueue + 1) % 2;
37
5.85M
        loopData->deferMutex.unlock();
38
39
        /* Drain the queue */
40
5.85M
        for (auto &x : loopData->deferQueues[oldDeferQueue]) {
41
43.9k
            x();
42
43.9k
        }
43
5.85M
        loopData->deferQueues[oldDeferQueue].clear();
44
5.85M
    }
45
46
7.49M
    static void preCb(us_loop_t *loop) {
47
7.49M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
48
49
7.49M
        for (auto &p : loopData->preHandlers) {
50
5.08M
            p.second((Loop *) loop);
51
5.08M
        }
52
7.49M
    }
53
54
7.49M
    static void postCb(us_loop_t *loop) {
55
7.49M
        LoopData *loopData = (LoopData *) us_loop_ext(loop);
56
57
7.49M
        for (auto &p : loopData->postHandlers) {
58
5.08M
            p.second((Loop *) loop);
59
5.08M
        }
60
61
        /* After every event loop iteration, we must not hold the cork buffer */
62
7.49M
        if (loopData->corkedSocket) {
63
0
            std::cerr << "Error: Cork buffer must not be held across event loop iterations!" << std::endl;
64
0
            std::terminate();
65
0
        }
66
7.49M
    }
67
68
    Loop() = delete;
69
    ~Loop() = default;
70
71
19.9k
    Loop *init() {
72
19.9k
        new (us_loop_ext((us_loop_t *) this)) LoopData;
73
19.9k
        return this;
74
19.9k
    }
75
76
19.9k
    static Loop *create(void *hint) {
77
19.9k
        Loop *loop = ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();
78
79
        /* We also need some timers (should live off the one 4 second timer rather) */
80
19.9k
        LoopData *loopData = (LoopData *) us_loop_ext((struct us_loop_t *) loop);
81
19.9k
        loopData->dateTimer = us_create_timer((struct us_loop_t *) loop, 1, sizeof(LoopData *));
82
19.9k
        memcpy(us_timer_ext(loopData->dateTimer), &loopData, sizeof(LoopData *));
83
5.68M
        us_timer_set(loopData->dateTimer, [](struct us_timer_t *t) {
84
5.68M
            LoopData *loopData;
85
5.68M
            memcpy(&loopData, us_timer_ext(t), sizeof(LoopData *));
86
5.68M
            loopData->updateDate();
87
5.68M
        }, 1000, 1000);
88
89
19.9k
        return loop;
90
19.9k
    }
91
92
    /* What to do with loops created with existingNativeLoop? */
93
    struct LoopCleaner {
94
4
        ~LoopCleaner() {
95
4
            if(loop && cleanMe) {
96
0
                loop->free();
97
0
            }
98
4
        }
99
        Loop *loop = nullptr;
100
        bool cleanMe = false;
101
    };
102
103
505k
    static LoopCleaner &getLazyLoop() {
104
505k
        static thread_local LoopCleaner lazyLoop;
105
505k
        return lazyLoop;
106
505k
    }
107
108
public:
109
    /* Lazily initializes a per-thread loop and returns it.
110
     * Will automatically free all initialized loops at exit. */
111
222k
    static Loop *get(void *existingNativeLoop = nullptr) {
112
222k
        if (!getLazyLoop().loop) {
113
            /* If we are given a native loop pointer we pass that to uSockets and let it deal with it */
114
19.9k
            if (existingNativeLoop) {
115
                /* Todo: here we want to pass the pointer, not a boolean */
116
0
                getLazyLoop().loop = create(existingNativeLoop);
117
                /* We cannot register automatic free here, must be manually done */
118
19.9k
            } else {
119
19.9k
                getLazyLoop().loop = create(nullptr);
120
19.9k
                getLazyLoop().cleanMe = true;
121
19.9k
            }
122
19.9k
        }
123
124
222k
        return getLazyLoop().loop;
125
222k
    }
126
127
    /* Freeing the default loop should be done once */
128
19.9k
    void free() {
129
19.9k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
130
131
        /* Stop and free dateTimer first */
132
19.9k
        us_timer_close(loopData->dateTimer);
133
134
19.9k
        loopData->~LoopData();
135
        /* uSockets will track whether this loop is owned by us or a borrowed alien loop */
136
19.9k
        us_loop_free((us_loop_t *) this);
137
138
        /* Reset lazyLoop */
139
19.9k
        getLazyLoop().loop = nullptr;
140
19.9k
    }
141
142
17.6k
    void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
143
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
144
145
17.6k
        loopData->postHandlers.emplace(key, std::move(handler));
146
17.6k
    }
147
148
    /* Bug: what if you remove a handler while iterating them? */
149
17.6k
    void removePostHandler(void *key) {
150
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
151
152
17.6k
        loopData->postHandlers.erase(key);
153
17.6k
    }
154
155
17.6k
    void addPreHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
156
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
157
158
17.6k
        loopData->preHandlers.emplace(key, std::move(handler));
159
17.6k
    }
160
161
    /* Bug: what if you remove a handler while iterating them? */
162
17.6k
    void removePreHandler(void *key) {
163
17.6k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
164
165
17.6k
        loopData->preHandlers.erase(key);
166
17.6k
    }
167
168
    /* Defer this callback on Loop's thread of execution */
169
49.7k
    void defer(MoveOnlyFunction<void()> &&cb) {
170
49.7k
        LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
171
172
        //if (std::thread::get_id() == ) // todo: add fast path for same thread id
173
49.7k
        loopData->deferMutex.lock();
174
49.7k
        loopData->deferQueues[loopData->currentDeferQueue].emplace_back(std::move(cb));
175
49.7k
        loopData->deferMutex.unlock();
176
177
49.7k
        us_wakeup_loop((us_loop_t *) this);
178
49.7k
    }
179
180
    /* Actively block and run this loop */
181
19.9k
    void run() {
182
19.9k
        us_loop_run((us_loop_t *) this);
183
19.9k
    }
184
185
    /* Passively integrate with the underlying default loop */
186
    /* Used to seamlessly integrate with third parties such as Node.js */
187
0
    void integrate() {
188
0
        us_loop_integrate((us_loop_t *) this);
189
0
    }
190
191
    /* Dynamically change this */
192
6.41k
    void setSilent(bool silent) {
193
6.41k
        ((LoopData *) us_loop_ext((us_loop_t *) this))->noMark = silent;
194
6.41k
    }
195
};
196
197
/* Can be called from any thread to run the thread local loop */
198
19.9k
inline void run() {
199
19.9k
    Loop::get()->run();
200
19.9k
}
201
202
}
203
204
#endif // UWS_LOOP_H