Coverage Report

Created: 2026-06-03 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ruby/scheduler.c
Line
Count
Source
1
/**********************************************************************
2
3
  scheduler.c
4
5
  $Author$
6
7
  Copyright (C) 2020 Samuel Grant Dawson Williams
8
9
**********************************************************************/
10
11
#include "vm_core.h"
12
#include "eval_intern.h"
13
#include "ruby/fiber/scheduler.h"
14
#include "ruby/io.h"
15
#include "ruby/io/buffer.h"
16
17
#include "ruby/thread.h"
18
19
// For `ruby_thread_has_gvl_p`:
20
#include "internal/thread.h"
21
22
// For atomic operations:
23
#include "ruby_atomic.h"
24
25
static ID id_close;
26
static ID id_scheduler_close;
27
28
static ID id_block;
29
static ID id_unblock;
30
31
static ID id_yield;
32
33
static ID id_timeout_after;
34
static ID id_kernel_sleep;
35
static ID id_process_wait;
36
37
static ID id_io_read, id_io_pread;
38
static ID id_io_write, id_io_pwrite;
39
static ID id_io_wait;
40
static ID id_io_select;
41
static ID id_io_close;
42
43
static ID id_address_resolve;
44
45
static ID id_blocking_operation_wait;
46
static ID id_fiber_interrupt;
47
48
static ID id_fiber_schedule;
49
50
// Our custom blocking operation class
51
static VALUE rb_cFiberSchedulerBlockingOperation;
52
53
/*
54
 * Custom blocking operation structure for blocking operations
55
 * This replaces the use of Ruby procs to avoid use-after-free issues
56
 * and provides a cleaner C API for native work pools.
57
 */
58
59
typedef enum {
60
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED,    // Submitted but not started
61
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
62
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
63
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED  // Cancelled
64
} rb_fiber_blocking_operation_status_t;
65
66
struct rb_fiber_scheduler_blocking_operation {
67
    void *(*function)(void *);
68
    void *data;
69
70
    rb_unblock_function_t *unblock_function;
71
    void *data2;
72
73
    int flags;
74
    struct rb_fiber_scheduler_blocking_operation_state *state;
75
76
    // Execution status
77
    volatile rb_atomic_t status;
78
};
79
80
static size_t
81
blocking_operation_memsize(const void *ptr)
82
0
{
83
0
    return sizeof(rb_fiber_scheduler_blocking_operation_t);
84
0
}
85
86
static const rb_data_type_t blocking_operation_data_type = {
87
    "Fiber::Scheduler::BlockingOperation",
88
    {
89
        NULL, // nothing to mark
90
        RUBY_DEFAULT_FREE,
91
        blocking_operation_memsize,
92
    },
93
    0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
94
};
95
96
/*
97
 * Allocate a new blocking operation
98
 */
99
static VALUE
100
blocking_operation_alloc(VALUE klass)
101
0
{
102
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation;
103
0
    VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
104
105
0
    blocking_operation->function = NULL;
106
0
    blocking_operation->data = NULL;
107
0
    blocking_operation->unblock_function = NULL;
108
0
    blocking_operation->data2 = NULL;
109
0
    blocking_operation->flags = 0;
110
0
    blocking_operation->state = NULL;
111
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
112
113
0
    return obj;
114
0
}
115
116
/*
117
 * Get the blocking operation struct from a Ruby object
118
 */
119
static rb_fiber_scheduler_blocking_operation_t *
120
get_blocking_operation(VALUE obj)
121
0
{
122
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation;
123
0
    TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
124
0
    return blocking_operation;
125
0
}
126
127
/*
128
 * Document-method: Fiber::Scheduler::BlockingOperation#call
129
 *
130
 * Execute the blocking operation. This method releases the GVL and calls
131
 * the blocking function, then restores the errno value.
132
 *
133
 * Returns nil. The actual result is stored in the associated state object.
134
 */
135
static VALUE
136
blocking_operation_call(VALUE self)
137
0
{
138
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
139
140
0
    if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
141
0
        rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
142
0
    }
143
144
0
    if (blocking_operation->function == NULL) {
145
0
        rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
146
0
    }
147
148
0
    if (blocking_operation->state == NULL) {
149
0
        rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
150
0
    }
151
152
    // Mark as executing
153
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
154
155
    // Execute the blocking operation without GVL
156
0
    blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
157
0
                                         blocking_operation->unblock_function, blocking_operation->data2,
158
0
                                         blocking_operation->flags);
159
0
    blocking_operation->state->saved_errno = rb_errno();
160
161
    // Mark as completed
162
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
163
164
0
    return Qnil;
165
0
}
166
167
/*
168
 * C API: Extract blocking operation struct from Ruby object (GVL required)
169
 *
170
 * This function safely extracts the opaque struct from a BlockingOperation VALUE
171
 * while holding the GVL. The returned pointer can be passed to worker threads
172
 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
173
 *
174
 * Returns the opaque struct pointer on success, NULL on error.
175
 * Must be called while holding the GVL.
176
 */
177
rb_fiber_scheduler_blocking_operation_t *
178
rb_fiber_scheduler_blocking_operation_extract(VALUE self)
179
0
{
180
0
    return get_blocking_operation(self);
181
0
}
182
183
/*
184
 * C API: Execute blocking operation from opaque struct (GVL not required)
185
 *
186
 * This function executes a blocking operation using the opaque struct pointer
187
 * obtained from rb_fiber_scheduler_blocking_operation_extract.
188
 * It can be called from native threads without holding the GVL.
189
 *
190
 * Returns 0 on success, -1 on error.
191
 */
192
int
193
rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
194
0
{
195
0
    if (blocking_operation == NULL) {
196
0
        return -1;
197
0
    }
198
199
0
    if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
200
0
        return -1; // Invalid blocking operation
201
0
    }
202
203
    // Resolve sentinel values for unblock_function and data2:
204
0
    rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
205
206
    // Atomically check if we can transition from QUEUED to EXECUTING
207
0
    rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
208
0
    if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
209
        // Already cancelled or in wrong state
210
0
        return -1;
211
0
    }
212
213
    // Now we're executing - call the function
214
0
    blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
215
0
    blocking_operation->state->saved_errno = errno;
216
217
    // Atomically transition to completed (unless cancelled during execution)
218
0
    expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
219
0
    if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
220
        // Successfully completed
221
0
        return 0;
222
0
    } else {
223
        // Was cancelled during execution
224
0
        blocking_operation->state->saved_errno = EINTR;
225
0
        return -1;
226
0
    }
227
0
}
228
229
/*
230
 * C API: Create a new blocking operation
231
 *
232
 * This creates a blocking operation that can be executed by native work pools.
233
 * The blocking operation holds references to the function and data safely.
234
 */
235
VALUE
236
rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
237
                                         rb_unblock_function_t *unblock_function, void *data2,
238
                                         int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
239
0
{
240
0
    VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
241
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
242
243
0
    blocking_operation->function = function;
244
0
    blocking_operation->data = data;
245
0
    blocking_operation->unblock_function = unblock_function;
246
0
    blocking_operation->data2 = data2;
247
0
    blocking_operation->flags = flags;
248
0
    blocking_operation->state = state;
249
250
0
    return self;
251
0
}
252
253
/*
254
 *
255
 *  Document-class: Fiber::Scheduler
256
 *
257
 *  This is not an existing class, but documentation of the interface that Scheduler
258
 *  object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
259
 *  fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
260
 *  of some concepts.
261
 *
262
 *  Scheduler's behavior and usage are expected to be as follows:
263
 *
264
 *  * When the execution in the non-blocking Fiber reaches some blocking operation (like
265
 *    sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
266
 *    hook methods, listed below.
267
 *  * Scheduler somehow registers what the current fiber is waiting on, and yields control
268
 *    to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
269
 *    wait to end, and other fibers in the same thread can perform)
270
 *  * At the end of the current thread execution, the scheduler's method #scheduler_close is called
271
 *  * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
272
 *    registered on hook calls) and resuming them when the awaited resource is ready
273
 *    (e.g. I/O ready or sleep time elapsed).
274
 *
275
 *  This way concurrent execution will be achieved transparently for every
276
 *  individual Fiber's code.
277
 *
278
 *  Scheduler implementations are provided by gems, like
279
 *  Async[https://github.com/socketry/async].
280
 *
281
 *  Hook methods are:
282
 *
283
 *  * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close
284
 *  * #process_wait
285
 *  * #kernel_sleep
286
 *  * #timeout_after
287
 *  * #address_resolve
288
 *  * #block and #unblock
289
 *  * #blocking_operation_wait
290
 *  * #fiber_interrupt
291
 *  * #yield
292
 *  * (the list is expanded as Ruby developers make more methods having non-blocking calls)
293
 *
294
 *  When not specified otherwise, the hook implementations are mandatory: if they are not
295
 *  implemented, the methods trying to call hook will fail. To provide backward compatibility,
296
 *  in the future hooks will be optional (if they are not implemented, due to the scheduler
297
 *  being created for the older Ruby version, the code which needs this hook will not fail,
298
 *  and will just behave in a blocking fashion).
299
 *
300
 *  It is also strongly recommended that the scheduler implements the #fiber method, which is
301
 *  delegated to by Fiber.schedule.
302
 *
303
 *  Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
304
 *  <tt>test/fiber/scheduler.rb</tt>
305
 *
306
 */
307
void
308
Init_Fiber_Scheduler(void)
309
9
{
310
9
    id_close = rb_intern_const("close");
311
9
    id_scheduler_close = rb_intern_const("scheduler_close");
312
313
9
    id_block = rb_intern_const("block");
314
9
    id_unblock = rb_intern_const("unblock");
315
9
    id_yield = rb_intern_const("yield");
316
317
9
    id_timeout_after = rb_intern_const("timeout_after");
318
9
    id_kernel_sleep = rb_intern_const("kernel_sleep");
319
9
    id_process_wait = rb_intern_const("process_wait");
320
321
9
    id_io_read = rb_intern_const("io_read");
322
9
    id_io_pread = rb_intern_const("io_pread");
323
9
    id_io_write = rb_intern_const("io_write");
324
9
    id_io_pwrite = rb_intern_const("io_pwrite");
325
326
9
    id_io_wait = rb_intern_const("io_wait");
327
9
    id_io_select = rb_intern_const("io_select");
328
9
    id_io_close = rb_intern_const("io_close");
329
330
9
    id_address_resolve = rb_intern_const("address_resolve");
331
332
9
    id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
333
9
    id_fiber_interrupt = rb_intern_const("fiber_interrupt");
334
335
9
    id_fiber_schedule = rb_intern_const("fiber");
336
337
    // Define an anonymous BlockingOperation class for internal use only
338
    // This is completely hidden from Ruby code and cannot be instantiated directly
339
9
    rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
340
9
    rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
341
9
    rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
342
343
    // Register the anonymous class as a GC root so it doesn't get collected
344
9
    rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
345
346
#if 0 /* for RDoc */
347
    rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
348
    rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
349
    rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
350
    rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
351
    rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
352
    rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
353
    rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
354
    rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
355
    rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
356
    rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
357
    rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
358
    rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
359
    rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
360
    rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
361
    rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
362
    rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
363
    rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0);
364
    rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2);
365
    rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1);
366
#endif
367
9
}
368
369
VALUE
370
rb_fiber_scheduler_get(void)
371
0
{
372
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
373
374
0
    rb_thread_t *thread = GET_THREAD();
375
0
    RUBY_ASSERT(thread);
376
377
0
    return thread->scheduler;
378
0
}
379
380
static void
381
verify_interface(VALUE scheduler)
382
0
{
383
0
    if (!rb_respond_to(scheduler, id_block)) {
384
0
        rb_raise(rb_eArgError, "Scheduler must implement #block");
385
0
    }
386
387
0
    if (!rb_respond_to(scheduler, id_unblock)) {
388
0
        rb_raise(rb_eArgError, "Scheduler must implement #unblock");
389
0
    }
390
391
0
    if (!rb_respond_to(scheduler, id_kernel_sleep)) {
392
0
        rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
393
0
    }
394
395
0
    if (!rb_respond_to(scheduler, id_io_wait)) {
396
0
        rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
397
0
    }
398
399
0
    if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
400
0
        rb_warn("Scheduler should implement #fiber_interrupt");
401
0
    }
402
0
}
403
404
static VALUE
405
fiber_scheduler_close(VALUE scheduler)
406
0
{
407
0
    return rb_fiber_scheduler_close(scheduler);
408
0
}
409
410
static VALUE
411
fiber_scheduler_close_ensure(VALUE _thread)
412
0
{
413
0
    rb_thread_t *thread = (rb_thread_t*)_thread;
414
0
    thread->scheduler = Qnil;
415
416
0
    return Qnil;
417
0
}
418
419
VALUE
420
rb_fiber_scheduler_set(VALUE scheduler)
421
0
{
422
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
423
424
0
    rb_thread_t *thread = GET_THREAD();
425
0
    RUBY_ASSERT(thread);
426
427
0
    if (scheduler != Qnil) {
428
0
        verify_interface(scheduler);
429
0
    }
430
431
    // We invoke Scheduler#close when setting it to something else, to ensure
432
    // the previous scheduler runs to completion before changing the scheduler.
433
    // That way, we do not need to consider interactions, e.g., of a Fiber from
434
    // the previous scheduler with the new scheduler.
435
0
    if (thread->scheduler != Qnil) {
436
        // rb_fiber_scheduler_close(thread->scheduler);
437
0
        rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
438
0
    }
439
440
0
    thread->scheduler = scheduler;
441
442
0
    return thread->scheduler;
443
0
}
444
445
static VALUE
446
fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
447
27
{
448
27
    RUBY_ASSERT(thread);
449
450
27
    if (thread->blocking == 0) {
451
0
        return thread->scheduler;
452
0
    }
453
27
    else {
454
27
        return Qnil;
455
27
    }
456
27
}
457
458
VALUE rb_fiber_scheduler_current(void)
459
27
{
460
27
    RUBY_ASSERT(ruby_thread_has_gvl_p());
461
462
27
    return fiber_scheduler_current_for_threadptr(GET_THREAD());
463
27
}
464
465
// This function is allowed to be called without holding the GVL.
466
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
467
0
{
468
0
    return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
469
0
}
470
471
VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
472
0
{
473
0
    return fiber_scheduler_current_for_threadptr(thread);
474
0
}
475
476
/*
477
 *
478
 *  Document-method: Fiber::Scheduler#close
479
 *
480
 *  Called when the current thread exits. The scheduler is expected to implement this
481
 *  method in order to allow all waiting fibers to finalize their execution.
482
 *
483
 *  The suggested pattern is to implement the main event loop in the #close method.
484
 *
485
 */
486
VALUE
487
rb_fiber_scheduler_close(VALUE scheduler)
488
0
{
489
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
490
491
0
    VALUE result;
492
493
    // The reason for calling `scheduler_close` before calling `close` is for
494
    // legacy schedulers which implement `close` and expect the user to call
495
    // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
496
    // which should call `scheduler_close`. If it were to call `close`, it
497
    // would create an infinite loop.
498
499
0
    result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
500
0
    if (!UNDEF_P(result)) return result;
501
502
0
    result = rb_check_funcall(scheduler, id_close, 0, NULL);
503
0
    if (!UNDEF_P(result)) return result;
504
505
0
    return Qnil;
506
0
}
507
508
VALUE
509
rb_fiber_scheduler_make_timeout(struct timeval *timeout)
510
0
{
511
0
    if (timeout) {
512
0
        return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
513
0
    }
514
515
0
    return Qnil;
516
0
}
517
518
/*
519
 *  Document-method: Fiber::Scheduler#kernel_sleep
520
 *  call-seq: kernel_sleep(duration = nil)
521
 *
522
 *  Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide
523
 *  an implementation of sleeping in a non-blocking way. Implementation might
524
 *  register the current fiber in some list of "which fiber wait until what
525
 *  moment", call Fiber.yield to pass control, and then in #close resume
526
 *  the fibers whose wait period has elapsed.
527
 *
528
 */
529
VALUE
530
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
531
0
{
532
0
    return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
533
0
}
534
535
VALUE
536
rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
537
0
{
538
0
    return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
539
0
}
540
541
/**
542
 *  Document-method: Fiber::Scheduler#yield
543
 *  call-seq: yield
544
 *
545
 *  Yield to the scheduler, to be resumed on the next scheduling cycle.
546
 */
547
VALUE
548
rb_fiber_scheduler_yield(VALUE scheduler)
549
0
{
550
    // First try to call the scheduler's yield method, if it exists:
551
0
    VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
552
0
    if (!UNDEF_P(result)) return result;
553
554
    // Otherwise, we can emulate yield by sleeping for 0 seconds:
555
0
    return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
556
0
}
557
558
#if 0
559
/*
560
 *  Document-method: Fiber::Scheduler#timeout_after
561
 *  call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
562
 *
563
 *  Invoked by Timeout.timeout to execute the given +block+ within the given
564
 *  +duration+. It can also be invoked directly by the scheduler or user code.
565
 *
566
 *  Attempt to limit the execution time of a given +block+ to the given
567
 *  +duration+ if possible. When a non-blocking operation causes the +block+'s
568
 *  execution time to exceed the specified +duration+, that non-blocking
569
 *  operation should be interrupted by raising the specified +exception_class+
570
 *  constructed with the given +exception_arguments+.
571
 *
572
 *  General execution timeouts are often considered risky. This implementation
573
 *  will only interrupt non-blocking operations. This is by design because it's
574
 *  expected that non-blocking operations can fail for a variety of
575
 *  unpredictable reasons, so applications should already be robust in handling
576
 *  these conditions and by implication timeouts.
577
 *
578
 *  However, as a result of this design, if the +block+ does not invoke any
579
 *  non-blocking operations, it will be impossible to interrupt it. If you
580
 *  desire to provide predictable points for timeouts, consider adding
581
 *  <tt>sleep(0)</tt>.
582
 *
583
 *  If the block is executed successfully, its result will be returned.
584
 *
585
 *  The exception will typically be raised using Fiber#raise.
586
 */
587
VALUE
588
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
589
{
590
    VALUE arguments[] = {
591
        timeout, exception, message
592
    };
593
594
    return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
595
}
596
597
VALUE
598
rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
599
{
600
    return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
601
}
602
#endif
603
604
/*
605
 *  Document-method: Fiber::Scheduler#process_wait
606
 *  call-seq: process_wait(pid, flags)
607
 *
608
 *  Invoked by Process::Status.wait in order to wait for a specified process.
609
 *  See that method description for arguments description.
610
 *
611
 *  Suggested minimal implementation:
612
 *
613
 *      Thread.new do
614
 *        Process::Status.wait(pid, flags)
615
 *      end.value
616
 *
617
 *  This hook is optional: if it is not present in the current scheduler,
618
 *  Process::Status.wait will behave as a blocking method.
619
 *
620
 *  Expected to return a Process::Status instance.
621
 */
622
VALUE
623
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
624
0
{
625
0
    VALUE arguments[] = {
626
0
        PIDT2NUM(pid), RB_INT2NUM(flags)
627
0
    };
628
629
0
    return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
630
0
}
631
632
/*
633
 *  Document-method: Fiber::Scheduler#block
634
 *  call-seq: block(blocker, timeout = nil)
635
 *
636
 *  Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current
637
 *  Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
638
 *  elapsed.
639
 *
640
 *  +blocker+ is what we are waiting on, informational only (for debugging and
641
 *  logging). There are no guarantee about its value.
642
 *
643
 *  Expected to return boolean, specifying whether the blocking operation was
644
 *  successful or not.
645
 */
646
VALUE
647
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
648
0
{
649
0
    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
650
0
}
651
652
/*
653
 *  Document-method: Fiber::Scheduler#unblock
654
 *  call-seq: unblock(blocker, fiber)
655
 *
656
 *  Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock
657
 *  calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use
658
 *  the +fiber+ parameter to understand which fiber is unblocked.
659
 *
660
 *  +blocker+ is what was awaited for, but it is informational only (for debugging
661
 *  and logging), and it is not guaranteed to be the same value as the +blocker+ for
662
 *  #block.
663
 *
664
 */
665
VALUE
666
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
667
0
{
668
0
    RUBY_ASSERT(rb_obj_is_fiber(fiber));
669
670
0
    VALUE result;
671
0
    enum ruby_tag_type state;
672
673
    // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
674
    //
675
    // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
676
0
    int saved_errno = errno;
677
678
    // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
679
0
    rb_execution_context_t *ec = GET_EC();
680
0
    int saved_interrupt_mask = ec->interrupt_mask;
681
0
    ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
682
683
0
    rb_control_frame_t *volatile cfp = ec->cfp;
684
0
    EC_PUSH_TAG(ec);
685
0
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
686
0
        result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
687
0
    }
688
0
    else {
689
0
        rb_vm_rewind_cfp(ec, cfp);
690
0
    }
691
0
    EC_POP_TAG();
692
693
0
    ec->interrupt_mask = saved_interrupt_mask;
694
695
0
    if (state) {
696
0
        EC_JUMP_TAG(ec, state);
697
0
    }
698
699
0
    RUBY_VM_CHECK_INTS(ec);
700
701
0
    errno = saved_errno;
702
703
0
    return result;
704
0
}
705
706
/*
707
 *  Document-method: Fiber::Scheduler#io_wait
708
 *  call-seq: io_wait(io, events, timeout)
709
 *
710
 *  Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
711
 *  specified descriptor is ready for specified events within
712
 *  the specified +timeout+.
713
 *
714
 *  +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
715
 *  <tt>IO::PRIORITY</tt>.
716
 *
717
 *  Suggested implementation should register which Fiber is waiting for which
718
 *  resources and immediately calling Fiber.yield to pass control to other
719
 *  fibers. Then, in the #close method, the scheduler might dispatch all the
720
 *  I/O resources to fibers waiting for it.
721
 *
722
 *  Expected to return the subset of events that are ready immediately.
723
 *
724
 */
725
static VALUE
726
0
fiber_scheduler_io_wait(VALUE _argument) {
727
0
    VALUE *arguments = (VALUE*)_argument;
728
729
0
    return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
730
0
}
731
732
VALUE
733
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
734
0
{
735
0
    VALUE arguments[] = {
736
0
        scheduler, io, events, timeout
737
0
    };
738
739
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
740
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
741
0
    } else {
742
0
        return fiber_scheduler_io_wait((VALUE)&arguments);
743
0
    }
744
0
}
745
746
VALUE
747
rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
748
0
{
749
0
    return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io));
750
0
}
751
752
VALUE
753
rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
754
0
{
755
0
    return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
756
0
}
757
758
/*
759
 *  Document-method: Fiber::Scheduler#io_select
760
 *  call-seq: io_select(readables, writables, exceptables, timeout)
761
 *
762
 *  Invoked by IO.select to ask whether the specified descriptors are ready for
763
 *  specified events within the specified +timeout+.
764
 *
765
 *  Expected to return the 3-tuple of Array of IOs that are ready.
766
 *
767
 */
768
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
769
0
{
770
0
    VALUE arguments[] = {
771
0
        readables, writables, exceptables, timeout
772
0
    };
773
774
0
    return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
775
0
}
776
777
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
778
0
{
779
    // I wondered about extracting argv, and checking if there is only a single
780
    // IO instance, and instead calling `io_wait`. However, it would require a
781
    // decent amount of work and it would be hard to preserve the exact
782
    // semantics of IO.select.
783
784
0
    return rb_check_funcall(scheduler, id_io_select, argc, argv);
785
0
}
786
787
/*
788
 *  Document-method: Fiber::Scheduler#io_read
789
 *  call-seq: io_read(io, buffer, length, offset) -> read length or -errno
790
 *
791
 *  Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
792
 *  specified +buffer+ (see IO::Buffer) at the given +offset+.
793
 *
794
 *  The +length+ argument is the "minimum length to be read". If the IO buffer
795
 *  size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
796
 *  but at least 1KiB will be. Generally, the only case where less data than
797
 *  +length+ will be read is if there is an error reading the data.
798
 *
799
 *  Specifying a +length+ of 0 is valid and means try reading at least once and
800
 *  return any available data.
801
 *
802
 *  Suggested implementation should try to read from +io+ in a non-blocking
803
 *  manner and call #io_wait if the +io+ is not ready (which will yield control
804
 *  to other fibers).
805
 *
806
 *  See IO::Buffer for an interface available to return data.
807
 *
808
 *  Expected to return number of bytes read, or, in case of an error,
809
 *  <tt>-errno</tt> (negated number corresponding to system's error code).
810
 *
811
 *  The method should be considered _experimental_.
812
 */
813
static VALUE
814
0
fiber_scheduler_io_read(VALUE _argument) {
815
0
    VALUE *arguments = (VALUE*)_argument;
816
817
0
    return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
818
0
}
819
820
VALUE
821
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
822
0
{
823
0
    if (!rb_respond_to(scheduler, id_io_read)) {
824
0
        return RUBY_Qundef;
825
0
    }
826
827
0
    VALUE arguments[] = {
828
0
        scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
829
0
    };
830
831
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
832
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
833
0
    } else {
834
0
        return fiber_scheduler_io_read((VALUE)&arguments);
835
0
    }
836
0
}
837
838
/*
839
 *  Document-method: Fiber::Scheduler#io_pread
840
 *  call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
841
 *
842
 *  Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
843
 *  at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
844
 *  +offset+.
845
 *
846
 *  This method is semantically the same as #io_read, but it allows to specify
847
 *  the offset to read from and is often better for asynchronous IO on the same
848
 *  file.
849
 *
850
 *  The method should be considered _experimental_.
851
 */
852
static VALUE
853
0
fiber_scheduler_io_pread(VALUE _argument) {
854
0
    VALUE *arguments = (VALUE*)_argument;
855
856
0
    return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
857
0
}
858
859
VALUE
860
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
861
0
{
862
0
    if (!rb_respond_to(scheduler, id_io_pread)) {
863
0
        return RUBY_Qundef;
864
0
    }
865
866
0
    VALUE arguments[] = {
867
0
        scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
868
0
    };
869
870
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
871
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
872
0
    } else {
873
0
        return fiber_scheduler_io_pread((VALUE)&arguments);
874
0
    }
875
0
}
876
877
/*
878
 *  Document-method: Fiber::Scheduler#io_write
879
 *  call-seq: io_write(io, buffer, length, offset) -> written length or -errno
880
 *
881
 *  Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
882
 *  from a specified +buffer+ (see IO::Buffer) at the given +offset+.
883
 *
884
 *  The +length+ argument is the "minimum length to be written". If the IO
885
 *  buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
886
 *  will be written, but at least 1KiB will be. Generally, the only case where
887
 *  less data than +length+ will be written is if there is an error writing the
888
 *  data.
889
 *
890
 *  Specifying a +length+ of 0 is valid and means try writing at least once, as
891
 *  much data as possible.
892
 *
893
 *  Suggested implementation should try to write to +io+ in a non-blocking
894
 *  manner and call #io_wait if the +io+ is not ready (which will yield control
895
 *  to other fibers).
896
 *
897
 *  See IO::Buffer for an interface available to get data from buffer
898
 *  efficiently.
899
 *
900
 *  Expected to return number of bytes written, or, in case of an error,
901
 *  <tt>-errno</tt> (negated number corresponding to system's error code).
902
 *
903
 *  The method should be considered _experimental_.
904
 */
905
static VALUE
906
0
fiber_scheduler_io_write(VALUE _argument) {
907
0
    VALUE *arguments = (VALUE*)_argument;
908
909
0
    return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
910
0
}
911
912
VALUE
913
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
914
0
{
915
0
    if (!rb_respond_to(scheduler, id_io_write)) {
916
0
        return RUBY_Qundef;
917
0
    }
918
919
0
    VALUE arguments[] = {
920
0
        scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
921
0
    };
922
923
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
924
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
925
0
    } else {
926
0
        return fiber_scheduler_io_write((VALUE)&arguments);
927
0
    }
928
0
}
929
930
/*
931
 *  Document-method: Fiber::Scheduler#io_pwrite
932
 *  call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
933
 *
934
 *  Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
935
 *  at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
936
 *  +offset+.
937
 *
938
 *  This method is semantically the same as #io_write, but it allows to specify
939
 *  the offset to write to and is often better for asynchronous IO on the same
940
 *  file.
941
 *
942
 *  The method should be considered _experimental_.
943
 *
944
 */
945
static VALUE
946
0
fiber_scheduler_io_pwrite(VALUE _argument) {
947
0
    VALUE *arguments = (VALUE*)_argument;
948
949
0
    return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
950
0
}
951
952
VALUE
953
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
954
0
{
955
956
957
0
    if (!rb_respond_to(scheduler, id_io_pwrite)) {
958
0
        return RUBY_Qundef;
959
0
    }
960
961
0
    VALUE arguments[] = {
962
0
        scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
963
0
    };
964
965
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
966
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
967
0
    } else {
968
0
        return fiber_scheduler_io_pwrite((VALUE)&arguments);
969
0
    }
970
0
}
971
972
VALUE
973
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
974
0
{
975
0
    VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
976
977
0
    VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
978
979
0
    rb_io_buffer_free_locked(buffer);
980
981
0
    return result;
982
0
}
983
984
VALUE
985
rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
986
0
{
987
0
    VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
988
989
0
    VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
990
991
0
    rb_io_buffer_free_locked(buffer);
992
993
0
    return result;
994
0
}
995
996
VALUE
997
rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
998
0
{
999
0
    VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
1000
1001
0
    VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
1002
1003
0
    rb_io_buffer_free_locked(buffer);
1004
1005
0
    return result;
1006
0
}
1007
1008
VALUE
1009
rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
1010
0
{
1011
0
    VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
1012
1013
0
    VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
1014
1015
0
    rb_io_buffer_free_locked(buffer);
1016
1017
0
    return result;
1018
0
}
1019
1020
/*
1021
 *  Document-method: Fiber::Scheduler#io_close
1022
 *  call-seq: io_close(fd)
1023
 *
1024
 *  Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that
1025
 *  the method will receive an integer file descriptor of the closed object, not an object
1026
 *  itself.
1027
 */
1028
VALUE
1029
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
1030
0
{
1031
0
    VALUE arguments[] = {io};
1032
1033
0
    return rb_check_funcall(scheduler, id_io_close, 1, arguments);
1034
0
}
1035
1036
/*
1037
 *  Document-method: Fiber::Scheduler#address_resolve
1038
 *  call-seq: address_resolve(hostname) -> array_of_strings or nil
1039
 *
1040
 *  Invoked by any method that performs a non-reverse DNS lookup. The most
1041
 *  notable method is Addrinfo.getaddrinfo, but there are many other.
1042
 *
1043
 *  The method is expected to return an array of strings corresponding to ip
1044
 *  addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
1045
 *
1046
 *  Fairly exhaustive list of all possible call-sites:
1047
 *
1048
 *  - Addrinfo.getaddrinfo
1049
 *  - Addrinfo.tcp
1050
 *  - Addrinfo.udp
1051
 *  - Addrinfo.ip
1052
 *  - Addrinfo.new
1053
 *  - Addrinfo.marshal_load
1054
 *  - SOCKSSocket.new
1055
 *  - TCPServer.new
1056
 *  - TCPSocket.new
1057
 *  - IPSocket.getaddress
1058
 *  - TCPSocket.gethostbyname
1059
 *  - UDPSocket#connect
1060
 *  - UDPSocket#bind
1061
 *  - UDPSocket#send
1062
 *  - Socket.getaddrinfo
1063
 *  - Socket.gethostbyname
1064
 *  - Socket.pack_sockaddr_in
1065
 *  - Socket.sockaddr_in
1066
 *  - Socket.unpack_sockaddr_in
1067
 */
1068
VALUE
1069
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
1070
0
{
1071
0
    VALUE arguments[] = {
1072
0
        hostname
1073
0
    };
1074
1075
0
    return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1076
0
}
1077
1078
/*
1079
 *  Document-method: Fiber::Scheduler#blocking_operation_wait
1080
 *  call-seq: blocking_operation_wait(blocking_operation)
1081
 *
1082
 *  Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1083
 *  The blocking_operation is an opaque object that encapsulates the blocking operation
1084
 *  and responds to a <tt>#call</tt> method without any arguments.
1085
 *
1086
 *  If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1087
 *  the blocking operation, Ruby will fall back to the non-scheduler implementation.
1088
 *
1089
 *  Minimal suggested implementation is:
1090
 *
1091
 *     def blocking_operation_wait(blocking_operation)
1092
 *       Thread.new { blocking_operation.call }.join
1093
 *     end
1094
 */
1095
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1096
0
{
1097
    // Check if scheduler supports blocking_operation_wait before creating the object
1098
0
    if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1099
0
        return Qundef;
1100
0
    }
1101
1102
    // Create a new BlockingOperation with the blocking operation
1103
0
    VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1104
1105
0
    VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1106
1107
    // Get the operation data to check if it was executed
1108
0
    rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1109
0
    rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1110
1111
    // Invalidate the operation now that we're done with it
1112
0
    operation->function = NULL;
1113
0
    operation->state = NULL;
1114
0
    operation->data = NULL;
1115
0
    operation->data2 = NULL;
1116
0
    operation->unblock_function = NULL;
1117
1118
    // Ensure that the blocking operation remains visible until this point:
1119
0
    RB_GC_GUARD(blocking_operation);
1120
1121
    // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
1122
0
    if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1123
0
        return Qundef;
1124
0
    }
1125
1126
0
    return result;
1127
0
}
1128
1129
/*
1130
 * Document-method: Fiber::Scheduler#fiber_interrupt
1131
 * call-seq: fiber_interrupt(fiber, exception)
1132
 *
1133
 * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted
1134
 * with an exception. For example, IO#close uses this method to interrupt fibers that are performing
1135
 * blocking IO operations.
1136
 *
1137
 */
1138
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
1139
0
{
1140
0
    VALUE arguments[] = {
1141
0
        fiber, exception
1142
0
    };
1143
1144
0
    VALUE result;
1145
0
    enum ruby_tag_type state;
1146
1147
    // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
1148
0
    rb_execution_context_t *ec = GET_EC();
1149
0
    int saved_interrupt_mask = ec->interrupt_mask;
1150
0
    ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
1151
1152
0
    rb_control_frame_t *volatile cfp = ec->cfp;
1153
0
    EC_PUSH_TAG(ec);
1154
0
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1155
0
        result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1156
0
    }
1157
0
    else {
1158
0
        rb_vm_rewind_cfp(ec, cfp);
1159
0
    }
1160
0
    EC_POP_TAG();
1161
1162
0
    ec->interrupt_mask = saved_interrupt_mask;
1163
1164
0
    if (state) {
1165
0
        EC_JUMP_TAG(ec, state);
1166
0
    }
1167
1168
0
    RUBY_VM_CHECK_INTS(ec);
1169
1170
0
    return result;
1171
0
}
1172
1173
/*
1174
 *  Document-method: Fiber::Scheduler#fiber
1175
 *  call-seq: fiber(&block)
1176
 *
1177
 *  Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1178
 *  run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1179
 *
1180
 *  Minimal suggested implementation is:
1181
 *
1182
 *     def fiber(&block)
1183
 *       fiber = Fiber.new(blocking: false, &block)
1184
 *       fiber.resume
1185
 *       fiber
1186
 *     end
1187
 */
1188
VALUE
1189
rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1190
0
{
1191
0
    return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1192
0
}
1193
1194
/*
1195
 * C API: Cancel a blocking operation
1196
 *
1197
 * This function cancels a blocking operation. If the operation is queued,
1198
 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1199
 * and calls the unblock function to interrupt the operation.
1200
 *
1201
 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1202
 */
1203
int
1204
rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
1205
0
{
1206
0
    if (blocking_operation == NULL) {
1207
0
        return -1;
1208
0
    }
1209
1210
0
    rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1211
1212
0
    switch (current_state) {
1213
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1214
            // Work hasn't started - just mark as cancelled:
1215
0
            if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1216
                // Successfully cancelled before execution:
1217
0
                return 0;
1218
0
            }
1219
            // Fall through if state changed between load and CAS
1220
1221
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1222
            // Work is running - mark cancelled AND call unblock function
1223
0
            if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1224
                // State changed between load and CAS - operation may have completed:
1225
0
                return 0;
1226
0
            }
1227
            // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
1228
0
            rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
1229
0
            if (unblock_function) {
1230
0
                RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
1231
0
                blocking_operation->unblock_function(blocking_operation->data2);
1232
0
            }
1233
            // Cancelled during execution (unblock function called):
1234
0
            return 1;
1235
1236
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1237
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1238
            // Already finished or cancelled:
1239
0
            return 0;
1240
0
    }
1241
1242
0
    return 0;
1243
0
}