Coverage Report

Created: 2025-11-16 07:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/serenity/Userland/Libraries/LibIPC/Connection.cpp
Line
Count
Source
1
/*
2
 * Copyright (c) 2021, Andreas Kling <kling@serenityos.org>
3
 * Copyright (c) 2022, the SerenityOS developers.
4
 *
5
 * SPDX-License-Identifier: BSD-2-Clause
6
 */
7
8
#include <AK/Vector.h>
9
#include <LibCore/EventLoop.h>
10
#include <LibCore/Socket.h>
11
#include <LibCore/Timer.h>
12
#include <LibIPC/Connection.h>
13
#include <LibIPC/Message.h>
14
#include <LibIPC/Stub.h>
15
#include <sys/select.h>
16
17
namespace IPC {
18
19
struct CoreEventLoopDeferredInvoker final : public DeferredInvoker {
20
    virtual ~CoreEventLoopDeferredInvoker() = default;
21
22
    virtual void schedule(Function<void()> callback) override
23
0
    {
24
0
        Core::deferred_invoke(move(callback));
25
0
    }
26
};
27
28
ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullOwnPtr<Core::LocalSocket> socket, u32 local_endpoint_magic)
29
0
    : m_local_stub(local_stub)
30
0
    , m_socket(move(socket))
31
0
    , m_local_endpoint_magic(local_endpoint_magic)
32
0
    , m_deferred_invoker(make<CoreEventLoopDeferredInvoker>())
33
0
{
34
0
    m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
35
0
    m_socket->on_ready_to_read = [this] {
36
0
        NonnullRefPtr protect = *this;
37
        // FIXME: Do something about errors.
38
0
        (void)drain_messages_from_peer();
39
0
        handle_messages();
40
0
    };
41
0
}
42
43
0
ConnectionBase::~ConnectionBase() = default;
44
45
bool ConnectionBase::is_open() const
46
0
{
47
0
    return m_socket->is_open();
48
0
}
49
50
void ConnectionBase::set_deferred_invoker(NonnullOwnPtr<DeferredInvoker> deferred_invoker)
51
0
{
52
0
    m_deferred_invoker = move(deferred_invoker);
53
0
}
54
55
ErrorOr<void> ConnectionBase::post_message(Message const& message, MessageKind kind)
56
0
{
57
0
    return post_message(TRY(message.encode()), kind);
58
0
}
59
60
ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer, MessageKind kind)
61
0
{
62
    // NOTE: If this connection is being shut down, but has not yet been destroyed,
63
    //       the socket will be closed. Don't try to send more messages.
64
0
    if (!m_socket->is_open())
65
0
        return Error::from_string_literal("Trying to post_message during IPC shutdown");
66
67
0
    if (auto result = buffer.transfer_message(*m_socket, kind == MessageKind::Sync); result.is_error()) {
68
0
        shutdown_with_error(result.error());
69
0
        return result.release_error();
70
0
    }
71
72
0
    m_responsiveness_timer->start();
73
0
    return {};
74
0
}
75
76
void ConnectionBase::shutdown()
77
0
{
78
0
    m_socket->close();
79
0
    die();
80
0
}
81
82
void ConnectionBase::shutdown_with_error(Error const& error)
83
0
{
84
0
    dbgln("IPC::ConnectionBase ({:p}) had an error ({}), disconnecting.", this, error);
85
0
    shutdown();
86
0
}
87
88
void ConnectionBase::handle_messages()
89
0
{
90
0
    auto messages = move(m_unprocessed_messages);
91
0
    for (auto& message : messages) {
92
0
        if (message->endpoint_magic() == m_local_endpoint_magic) {
93
0
            auto handler_result = m_local_stub.handle(*message);
94
0
            if (handler_result.is_error()) {
95
0
                dbgln("IPC::ConnectionBase::handle_messages: {}", handler_result.error());
96
0
                continue;
97
0
            }
98
99
0
            if (auto response = handler_result.release_value()) {
100
0
                if (auto post_result = post_message(*response, MessageKind::Async); post_result.is_error()) {
101
0
                    dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
102
0
                }
103
0
            }
104
0
        }
105
0
    }
106
0
}
107
108
void ConnectionBase::wait_for_socket_to_become_readable()
109
0
{
110
0
    auto maybe_did_become_readable = m_socket->can_read_without_blocking(-1);
111
0
    if (maybe_did_become_readable.is_error()) {
112
0
        dbgln("ConnectionBase::wait_for_socket_to_become_readable: {}", maybe_did_become_readable.error());
113
0
        warnln("ConnectionBase::wait_for_socket_to_become_readable: {}", maybe_did_become_readable.error());
114
0
        VERIFY_NOT_REACHED();
115
0
    }
116
117
0
    VERIFY(maybe_did_become_readable.value());
118
0
}
119
120
ErrorOr<Vector<u8>> ConnectionBase::read_as_much_as_possible_from_socket_without_blocking()
121
0
{
122
0
    Vector<u8> bytes;
123
124
0
    if (!m_unprocessed_bytes.is_empty()) {
125
0
        bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size());
126
0
        m_unprocessed_bytes.clear();
127
0
    }
128
129
0
    u8 buffer[4096];
130
0
    Vector<int> received_fds;
131
132
0
    bool should_shut_down = false;
133
0
    auto schedule_shutdown = [this, &should_shut_down]() {
134
0
        should_shut_down = true;
135
0
        m_deferred_invoker->schedule([strong_this = NonnullRefPtr(*this)] {
136
0
            strong_this->shutdown();
137
0
        });
138
0
    };
139
140
0
    while (m_socket->is_open()) {
141
0
        auto maybe_bytes_read = m_socket->receive_message({ buffer, 4096 }, MSG_DONTWAIT, received_fds);
142
0
        if (maybe_bytes_read.is_error()) {
143
0
            auto error = maybe_bytes_read.release_error();
144
0
            if (error.is_syscall() && error.code() == EAGAIN) {
145
0
                break;
146
0
            }
147
148
0
            if (error.is_syscall() && error.code() == ECONNRESET) {
149
0
                schedule_shutdown();
150
0
                break;
151
0
            }
152
153
0
            dbgln("ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {}", error);
154
0
            warnln("ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {}", error);
155
0
            VERIFY_NOT_REACHED();
156
0
        }
157
158
0
        auto bytes_read = maybe_bytes_read.release_value();
159
0
        if (bytes_read.is_empty()) {
160
0
            schedule_shutdown();
161
0
            break;
162
0
        }
163
164
0
        bytes.append(bytes_read.data(), bytes_read.size());
165
0
        for (auto const& fd : received_fds)
166
0
            m_unprocessed_fds.enqueue(IPC::File::adopt_fd(fd));
167
0
    }
168
169
0
    if (!bytes.is_empty()) {
170
0
        m_responsiveness_timer->stop();
171
0
        did_become_responsive();
172
0
    } else if (should_shut_down) {
173
0
        return Error::from_string_literal("IPC connection EOF");
174
0
    }
175
176
0
    return bytes;
177
0
}
178
179
ErrorOr<void> ConnectionBase::drain_messages_from_peer()
180
0
{
181
0
    auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking());
182
183
0
    size_t index = 0;
184
0
    try_parse_messages(bytes, index);
185
186
0
    if (index < bytes.size()) {
187
        // Sometimes we might receive a partial message. That's okay, just stash away
188
        // the unprocessed bytes and we'll prepend them to the next incoming message
189
        // in the next run of this function.
190
0
        auto remaining_bytes = TRY(ByteBuffer::copy(bytes.span().slice(index)));
191
0
        if (!m_unprocessed_bytes.is_empty()) {
192
0
            shutdown();
193
0
            return Error::from_string_literal("drain_messages_from_peer: Already have unprocessed bytes");
194
0
        }
195
0
        m_unprocessed_bytes = move(remaining_bytes);
196
0
    }
197
198
0
    if (!m_unprocessed_messages.is_empty()) {
199
0
        m_deferred_invoker->schedule([strong_this = NonnullRefPtr(*this)] {
200
0
            strong_this->handle_messages();
201
0
        });
202
0
    }
203
0
    return {};
204
0
}
205
206
OwnPtr<IPC::Message> ConnectionBase::wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id)
207
0
{
208
0
    for (;;) {
209
        // Double check we don't already have the event waiting for us.
210
        // Otherwise we might end up blocked for a while for no reason.
211
0
        for (size_t i = 0; i < m_unprocessed_messages.size(); ++i) {
212
0
            auto& message = m_unprocessed_messages[i];
213
0
            if (message->endpoint_magic() != endpoint_magic)
214
0
                continue;
215
0
            if (message->message_id() == message_id)
216
0
                return m_unprocessed_messages.take(i);
217
0
        }
218
219
0
        if (!m_socket->is_open())
220
0
            break;
221
222
0
        wait_for_socket_to_become_readable();
223
0
        if (drain_messages_from_peer().is_error())
224
0
            break;
225
0
    }
226
0
    return {};
227
0
}
228
229
void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
230
0
{
231
0
    u32 message_size = 0;
232
0
    for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
233
0
        memcpy(&message_size, bytes.data() + index, sizeof(message_size));
234
0
        if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
235
0
            break;
236
0
        index += sizeof(message_size);
237
0
        auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
238
239
0
        if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) {
240
0
            m_unprocessed_messages.append(message.release_nonnull());
241
0
            continue;
242
0
        }
243
244
0
        dbgln("Failed to parse IPC message:");
245
0
        dbgln("{:hex-dump}", remaining_bytes);
246
0
        break;
247
0
    }
248
0
}
249
250
}