Coverage Report

Created: 2025-11-02 07:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/serenity/Userland/Libraries/LibThreading/WorkerThread.h
Line
Count
Source
1
/*
2
 * Copyright (c) 2022, Gregory Bertilson <zaggy1024@gmail.com>
3
 *
4
 * SPDX-License-Identifier: BSD-2-Clause
5
 */
6
7
#pragma once
8
9
#include <AK/Debug.h>
10
#include <AK/Variant.h>
11
#include <LibThreading/ConditionVariable.h>
12
#include <LibThreading/Mutex.h>
13
#include <LibThreading/Thread.h>
14
15
namespace Threading {
16
17
// Macro to allow single-line logging prints with fields that only exist in debug mode.
18
#if WORKER_THREAD_DEBUG
19
#    define WORKER_LOG(args...) ({ dbgln(args); })
20
#else
21
#    define WORKER_LOG(args...)
22
#endif
23
24
template<typename ErrorType>
25
class WorkerThread {
26
    enum class State {
27
        Idle,
28
        Working,
29
        Stopped,
30
    };
31
    using WorkerTask = Function<ErrorOr<void, ErrorType>()>;
32
    using WorkerState = Variant<State, WorkerTask, ErrorType>;
33
34
public:
35
    static ErrorOr<NonnullOwnPtr<WorkerThread>> create(StringView name)
36
29
    {
37
29
        auto worker_thread = TRY(adopt_nonnull_own_or_enomem(new (nothrow) WorkerThread()));
38
29
        worker_thread->m_thread = TRY(Threading::Thread::try_create([&self = *worker_thread]() {
39
29
            WORKER_LOG("Starting worker loop {}", self.m_id);
40
41
29
            while (true) {
42
29
                self.m_mutex.lock();
43
29
                if (self.m_stop) {
44
29
                    WORKER_LOG("Exiting {}", self.m_id);
45
29
                    self.m_state = State::Stopped;
46
29
                    self.m_condition.broadcast();
47
29
                    self.m_mutex.unlock();
48
29
                    return 0;
49
29
                }
50
29
                if (self.m_state.template has<WorkerTask>()) {
51
29
                    auto task = move(self.m_state.template get<WorkerTask>());
52
29
                    self.m_state = State::Working;
53
29
                    self.m_mutex.unlock();
54
55
29
                    WORKER_LOG("Starting task on {}", self.m_id);
56
29
                    auto result = task();
57
29
                    if (result.is_error()) {
58
29
                        WORKER_LOG("Task finished on {} with error", self.m_id);
59
29
                        self.m_mutex.lock();
60
29
                        self.m_state = result.release_error();
61
29
                        self.m_condition.broadcast();
62
29
                    } else {
63
29
                        WORKER_LOG("Task finished successfully on {}", self.m_id);
64
29
                        self.m_mutex.lock();
65
29
                        self.m_state = State::Idle;
66
29
                        self.m_condition.broadcast();
67
29
                    }
68
29
                }
69
29
                WORKER_LOG("Awaiting new task in {}...", self.m_id);
70
29
                self.m_condition.wait();
71
29
                WORKER_LOG("Worker thread awoken in {}", self.m_id);
72
29
                self.m_mutex.unlock();
73
29
            }
74
75
29
            return 0;
76
29
        },
77
29
            name));
78
29
        worker_thread->m_thread->start();
79
29
        return worker_thread;
80
29
    }
81
82
    ~WorkerThread()
83
29
    {
84
29
        m_mutex.lock();
85
29
        m_stop = true;
86
29
        m_condition.broadcast();
87
58
        while (!is_in_state(State::Stopped))
88
29
            m_condition.wait();
89
29
        m_mutex.unlock();
90
29
        (void)m_thread->join();
91
29
        WORKER_LOG("Worker thread {} joined successfully", m_id);
92
29
    }
93
94
    // Returns whether the task is starting.
95
    bool start_task(WorkerTask&& task)
96
29
    {
97
29
        m_mutex.lock();
98
29
        VERIFY(!is_in_state(State::Stopped));
99
100
29
        bool start_work = false;
101
29
        if (is_in_state(State::Idle)) {
102
29
            start_work = true;
103
29
        } else if (m_state.template has<ErrorType>()) {
104
0
            WORKER_LOG("Starting task and ignoring previous error: {}", m_state.template get<ErrorType>().string_literal());
105
0
            start_work = true;
106
0
        }
107
29
        if (start_work) {
108
29
            WORKER_LOG("Queuing task on {}", m_id);
109
29
            m_state = move(task);
110
29
            m_condition.broadcast();
111
29
        }
112
113
29
        m_mutex.unlock();
114
29
        return start_work;
115
29
    }
116
117
    ErrorOr<void, ErrorType> wait_until_task_is_finished()
118
29
    {
119
29
        WORKER_LOG("Waiting for task to finish on {}...", m_id);
120
29
        m_mutex.lock();
121
39
        while (true) {
122
39
            if (m_state.template has<WorkerTask>() || is_in_state(State::Working)) {
123
10
                m_condition.wait();
124
29
            } else if (m_state.template has<ErrorType>()) {
125
27
                auto error = move(m_state.template get<ErrorType>());
126
27
                m_state = State::Idle;
127
27
                m_mutex.unlock();
128
27
                WORKER_LOG("Finished waiting with error on {}: {}", m_id, error.string_literal());
129
27
                return error;
130
27
            } else {
131
2
                m_mutex.unlock();
132
2
                WORKER_LOG("Finished waiting on {}", m_id);
133
2
                return {};
134
2
            }
135
39
        }
136
0
        m_mutex.unlock();
137
0
    }
138
139
private:
140
#if WORKER_THREAD_DEBUG
141
    static inline size_t current_id = 0;
142
#endif
143
144
    WorkerThread()
145
29
        : m_condition(m_mutex)
146
#if WORKER_THREAD_DEBUG
147
        , m_id(current_id++)
148
#endif
149
29
    {
150
29
    }
151
    WorkerThread(WorkerThread const&) = delete;
152
    WorkerThread(WorkerThread&&) = delete;
153
154
    // Must be called with the mutex locked.
155
    bool is_in_state(State state)
156
155
    {
157
155
        return m_state.template has<State>() && m_state.template get<State>() == state;
158
155
    }
159
160
    RefPtr<Threading::Thread> m_thread;
161
    Threading::Mutex m_mutex;
162
    Threading::ConditionVariable m_condition;
163
    WorkerState m_state { State::Idle };
164
    bool m_stop { false };
165
#if WORKER_THREAD_DEBUG
166
    size_t m_id;
167
#endif
168
};
169
170
#undef WORKER_LOG
171
172
}