Coverage Report

Created: 2026-03-31 07:30

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/ruby/scheduler.c
Line
Count
Source
1
/**********************************************************************
2
3
  scheduler.c
4
5
  $Author$
6
7
  Copyright (C) 2020 Samuel Grant Dawson Williams
8
9
**********************************************************************/
10
11
#include "vm_core.h"
12
#include "eval_intern.h"
13
#include "ruby/fiber/scheduler.h"
14
#include "ruby/io.h"
15
#include "ruby/io/buffer.h"
16
17
#include "ruby/thread.h"
18
19
// For `ruby_thread_has_gvl_p`:
20
#include "internal/thread.h"
21
22
// For atomic operations:
23
#include "ruby_atomic.h"
24
25
static ID id_close;
26
static ID id_scheduler_close;
27
28
static ID id_block;
29
static ID id_unblock;
30
31
static ID id_yield;
32
33
static ID id_timeout_after;
34
static ID id_kernel_sleep;
35
static ID id_process_wait;
36
37
static ID id_io_read, id_io_pread;
38
static ID id_io_write, id_io_pwrite;
39
static ID id_io_wait;
40
static ID id_io_select;
41
static ID id_io_close;
42
43
static ID id_address_resolve;
44
45
static ID id_blocking_operation_wait;
46
static ID id_fiber_interrupt;
47
48
static ID id_fiber_schedule;
49
50
// Our custom blocking operation class
51
static VALUE rb_cFiberSchedulerBlockingOperation;
52
53
/*
54
 * Custom blocking operation structure for blocking operations
55
 * This replaces the use of Ruby procs to avoid use-after-free issues
56
 * and provides a cleaner C API for native work pools.
57
 */
58
59
typedef enum {
60
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED,    // Submitted but not started
61
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
62
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
63
    RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED  // Cancelled
64
} rb_fiber_blocking_operation_status_t;
65
66
struct rb_fiber_scheduler_blocking_operation {
67
    void *(*function)(void *);
68
    void *data;
69
70
    rb_unblock_function_t *unblock_function;
71
    void *data2;
72
73
    int flags;
74
    struct rb_fiber_scheduler_blocking_operation_state *state;
75
76
    // Execution status
77
    volatile rb_atomic_t status;
78
};
79
80
static size_t
81
blocking_operation_memsize(const void *ptr)
82
0
{
83
0
    return sizeof(rb_fiber_scheduler_blocking_operation_t);
84
0
}
85
86
static const rb_data_type_t blocking_operation_data_type = {
87
    "Fiber::Scheduler::BlockingOperation",
88
    {
89
        NULL, // nothing to mark
90
        RUBY_DEFAULT_FREE,
91
        blocking_operation_memsize,
92
    },
93
    0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
94
};
95
96
/*
97
 * Allocate a new blocking operation
98
 */
99
static VALUE
100
blocking_operation_alloc(VALUE klass)
101
0
{
102
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation;
103
0
    VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
104
105
0
    blocking_operation->function = NULL;
106
0
    blocking_operation->data = NULL;
107
0
    blocking_operation->unblock_function = NULL;
108
0
    blocking_operation->data2 = NULL;
109
0
    blocking_operation->flags = 0;
110
0
    blocking_operation->state = NULL;
111
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
112
113
0
    return obj;
114
0
}
115
116
/*
117
 * Get the blocking operation struct from a Ruby object
118
 */
119
static rb_fiber_scheduler_blocking_operation_t *
120
get_blocking_operation(VALUE obj)
121
0
{
122
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation;
123
0
    TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
124
0
    return blocking_operation;
125
0
}
126
127
/*
128
 * Document-method: Fiber::Scheduler::BlockingOperation#call
129
 *
130
 * Execute the blocking operation. This method releases the GVL and calls
131
 * the blocking function, then restores the errno value.
132
 *
133
 * Returns nil. The actual result is stored in the associated state object.
134
 */
135
static VALUE
136
blocking_operation_call(VALUE self)
137
0
{
138
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
139
140
0
    if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
141
0
        rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
142
0
    }
143
144
0
    if (blocking_operation->function == NULL) {
145
0
        rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
146
0
    }
147
148
0
    if (blocking_operation->state == NULL) {
149
0
        rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
150
0
    }
151
152
    // Mark as executing
153
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
154
155
    // Execute the blocking operation without GVL
156
0
    blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
157
0
                                         blocking_operation->unblock_function, blocking_operation->data2,
158
0
                                         blocking_operation->flags);
159
0
    blocking_operation->state->saved_errno = rb_errno();
160
161
    // Mark as completed
162
0
    blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
163
164
0
    return Qnil;
165
0
}
166
167
/*
168
 * C API: Extract blocking operation struct from Ruby object (GVL required)
169
 *
170
 * This function safely extracts the opaque struct from a BlockingOperation VALUE
171
 * while holding the GVL. The returned pointer can be passed to worker threads
172
 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
173
 *
174
 * Returns the opaque struct pointer on success, NULL on error.
175
 * Must be called while holding the GVL.
176
 */
177
rb_fiber_scheduler_blocking_operation_t *
178
rb_fiber_scheduler_blocking_operation_extract(VALUE self)
179
0
{
180
0
    return get_blocking_operation(self);
181
0
}
182
183
/*
184
 * C API: Execute blocking operation from opaque struct (GVL not required)
185
 *
186
 * This function executes a blocking operation using the opaque struct pointer
187
 * obtained from rb_fiber_scheduler_blocking_operation_extract.
188
 * It can be called from native threads without holding the GVL.
189
 *
190
 * Returns 0 on success, -1 on error.
191
 */
192
int
193
rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
194
0
{
195
0
    if (blocking_operation == NULL) {
196
0
        return -1;
197
0
    }
198
199
0
    if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
200
0
        return -1; // Invalid blocking operation
201
0
    }
202
203
    // Resolve sentinel values for unblock_function and data2:
204
0
    rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
205
206
    // Atomically check if we can transition from QUEUED to EXECUTING
207
0
    rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
208
0
    if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
209
        // Already cancelled or in wrong state
210
0
        return -1;
211
0
    }
212
213
    // Now we're executing - call the function
214
0
    blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
215
0
    blocking_operation->state->saved_errno = errno;
216
217
    // Atomically transition to completed (unless cancelled during execution)
218
0
    expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
219
0
    if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
220
        // Successfully completed
221
0
        return 0;
222
0
    } else {
223
        // Was cancelled during execution
224
0
        blocking_operation->state->saved_errno = EINTR;
225
0
        return -1;
226
0
    }
227
0
}
228
229
/*
230
 * C API: Create a new blocking operation
231
 *
232
 * This creates a blocking operation that can be executed by native work pools.
233
 * The blocking operation holds references to the function and data safely.
234
 */
235
VALUE
236
rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
237
                                         rb_unblock_function_t *unblock_function, void *data2,
238
                                         int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
239
0
{
240
0
    VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
241
0
    rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
242
243
0
    blocking_operation->function = function;
244
0
    blocking_operation->data = data;
245
0
    blocking_operation->unblock_function = unblock_function;
246
0
    blocking_operation->data2 = data2;
247
0
    blocking_operation->flags = flags;
248
0
    blocking_operation->state = state;
249
250
0
    return self;
251
0
}
252
253
/*
254
 *
255
 *  Document-class: Fiber::Scheduler
256
 *
257
 *  This is not an existing class, but documentation of the interface that Scheduler
258
 *  object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
259
 *  fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
260
 *  of some concepts.
261
 *
262
 *  Scheduler's behavior and usage are expected to be as follows:
263
 *
264
 *  * When the execution in the non-blocking Fiber reaches some blocking operation (like
265
 *    sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
266
 *    hook methods, listed below.
267
 *  * Scheduler somehow registers what the current fiber is waiting on, and yields control
268
 *    to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
269
 *    wait to end, and other fibers in the same thread can perform)
270
 *  * At the end of the current thread execution, the scheduler's method #scheduler_close is called
271
 *  * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
272
 *    registered on hook calls) and resuming them when the awaited resource is ready
273
 *    (e.g. I/O ready or sleep time elapsed).
274
 *
275
 *  This way concurrent execution will be achieved transparently for every
276
 *  individual Fiber's code.
277
 *
278
 *  Scheduler implementations are provided by gems, like
279
 *  Async[https://github.com/socketry/async].
280
 *
281
 *  Hook methods are:
282
 *
283
 *  * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close
284
 *  * #process_wait
285
 *  * #kernel_sleep
286
 *  * #timeout_after
287
 *  * #address_resolve
288
 *  * #block and #unblock
289
 *  * #blocking_operation_wait
290
 *  * #fiber_interrupt
291
 *  * #yield
292
 *  * (the list is expanded as Ruby developers make more methods having non-blocking calls)
293
 *
294
 *  When not specified otherwise, the hook implementations are mandatory: if they are not
295
 *  implemented, the methods trying to call hook will fail. To provide backward compatibility,
296
 *  in the future hooks will be optional (if they are not implemented, due to the scheduler
297
 *  being created for the older Ruby version, the code which needs this hook will not fail,
298
 *  and will just behave in a blocking fashion).
299
 *
300
 *  It is also strongly recommended that the scheduler implements the #fiber method, which is
301
 *  delegated to by Fiber.schedule.
302
 *
303
 *  Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
304
 *  <tt>test/fiber/scheduler.rb</tt>
305
 *
306
 */
307
void
308
Init_Fiber_Scheduler(void)
309
9
{
310
9
    id_close = rb_intern_const("close");
311
9
    id_scheduler_close = rb_intern_const("scheduler_close");
312
313
9
    id_block = rb_intern_const("block");
314
9
    id_unblock = rb_intern_const("unblock");
315
9
    id_yield = rb_intern_const("yield");
316
317
9
    id_timeout_after = rb_intern_const("timeout_after");
318
9
    id_kernel_sleep = rb_intern_const("kernel_sleep");
319
9
    id_process_wait = rb_intern_const("process_wait");
320
321
9
    id_io_read = rb_intern_const("io_read");
322
9
    id_io_pread = rb_intern_const("io_pread");
323
9
    id_io_write = rb_intern_const("io_write");
324
9
    id_io_pwrite = rb_intern_const("io_pwrite");
325
326
9
    id_io_wait = rb_intern_const("io_wait");
327
9
    id_io_select = rb_intern_const("io_select");
328
9
    id_io_close = rb_intern_const("io_close");
329
330
9
    id_address_resolve = rb_intern_const("address_resolve");
331
332
9
    id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
333
9
    id_fiber_interrupt = rb_intern_const("fiber_interrupt");
334
335
9
    id_fiber_schedule = rb_intern_const("fiber");
336
337
    // Define an anonymous BlockingOperation class for internal use only
338
    // This is completely hidden from Ruby code and cannot be instantiated directly
339
9
    rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
340
9
    rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
341
9
    rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
342
343
    // Register the anonymous class as a GC root so it doesn't get collected
344
9
    rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
345
346
#if 0 /* for RDoc */
347
    rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
348
    rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
349
    rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
350
    rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
351
    rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
352
    rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
353
    rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
354
    rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
355
    rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
356
    rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
357
    rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
358
    rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
359
    rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
360
    rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
361
    rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
362
    rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
363
    rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0);
364
    rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2);
365
    rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1);
366
#endif
367
9
}
368
369
VALUE
370
rb_fiber_scheduler_get(void)
371
0
{
372
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
373
374
0
    rb_thread_t *thread = GET_THREAD();
375
0
    RUBY_ASSERT(thread);
376
377
0
    return thread->scheduler;
378
0
}
379
380
static void
381
verify_interface(VALUE scheduler)
382
0
{
383
0
    if (!rb_respond_to(scheduler, id_block)) {
384
0
        rb_raise(rb_eArgError, "Scheduler must implement #block");
385
0
    }
386
387
0
    if (!rb_respond_to(scheduler, id_unblock)) {
388
0
        rb_raise(rb_eArgError, "Scheduler must implement #unblock");
389
0
    }
390
391
0
    if (!rb_respond_to(scheduler, id_kernel_sleep)) {
392
0
        rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
393
0
    }
394
395
0
    if (!rb_respond_to(scheduler, id_io_wait)) {
396
0
        rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
397
0
    }
398
399
0
    if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
400
0
        rb_warn("Scheduler should implement #fiber_interrupt");
401
0
    }
402
0
}
403
404
static VALUE
405
fiber_scheduler_close(VALUE scheduler)
406
0
{
407
0
    return rb_fiber_scheduler_close(scheduler);
408
0
}
409
410
static VALUE
411
fiber_scheduler_close_ensure(VALUE _thread)
412
0
{
413
0
    rb_thread_t *thread = (rb_thread_t*)_thread;
414
0
    thread->scheduler = Qnil;
415
416
0
    return Qnil;
417
0
}
418
419
VALUE
420
rb_fiber_scheduler_set(VALUE scheduler)
421
0
{
422
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
423
424
0
    rb_thread_t *thread = GET_THREAD();
425
0
    RUBY_ASSERT(thread);
426
427
0
    if (scheduler != Qnil) {
428
0
        verify_interface(scheduler);
429
0
    }
430
431
    // We invoke Scheduler#close when setting it to something else, to ensure
432
    // the previous scheduler runs to completion before changing the scheduler.
433
    // That way, we do not need to consider interactions, e.g., of a Fiber from
434
    // the previous scheduler with the new scheduler.
435
0
    if (thread->scheduler != Qnil) {
436
        // rb_fiber_scheduler_close(thread->scheduler);
437
0
        rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
438
0
    }
439
440
0
    thread->scheduler = scheduler;
441
442
0
    return thread->scheduler;
443
0
}
444
445
static VALUE
446
fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
447
35
{
448
35
    RUBY_ASSERT(thread);
449
450
35
    if (thread->blocking == 0) {
451
0
        return thread->scheduler;
452
0
    }
453
35
    else {
454
35
        return Qnil;
455
35
    }
456
35
}
457
458
VALUE rb_fiber_scheduler_current(void)
459
35
{
460
35
    RUBY_ASSERT(ruby_thread_has_gvl_p());
461
462
35
    return fiber_scheduler_current_for_threadptr(GET_THREAD());
463
35
}
464
465
// This function is allowed to be called without holding the GVL.
466
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
467
0
{
468
0
    return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
469
0
}
470
471
VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
472
0
{
473
0
    return fiber_scheduler_current_for_threadptr(thread);
474
0
}
475
476
/*
477
 *
478
 *  Document-method: Fiber::Scheduler#close
479
 *
480
 *  Called when the current thread exits. The scheduler is expected to implement this
481
 *  method in order to allow all waiting fibers to finalize their execution.
482
 *
483
 *  The suggested pattern is to implement the main event loop in the #close method.
484
 *
485
 */
486
VALUE
487
rb_fiber_scheduler_close(VALUE scheduler)
488
0
{
489
0
    RUBY_ASSERT(ruby_thread_has_gvl_p());
490
491
0
    VALUE result;
492
493
    // The reason for calling `scheduler_close` before calling `close` is for
494
    // legacy schedulers which implement `close` and expect the user to call
495
    // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
496
    // which should call `scheduler_close`. If it were to call `close`, it
497
    // would create an infinite loop.
498
499
0
    result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
500
0
    if (!UNDEF_P(result)) return result;
501
502
0
    result = rb_check_funcall(scheduler, id_close, 0, NULL);
503
0
    if (!UNDEF_P(result)) return result;
504
505
0
    return Qnil;
506
0
}
507
508
VALUE
509
rb_fiber_scheduler_make_timeout(struct timeval *timeout)
510
0
{
511
0
    if (timeout) {
512
0
        return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
513
0
    }
514
515
0
    return Qnil;
516
0
}
517
518
/*
519
 *  Document-method: Fiber::Scheduler#kernel_sleep
520
 *  call-seq: kernel_sleep(duration = nil)
521
 *
522
 *  Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide
523
 *  an implementation of sleeping in a non-blocking way. Implementation might
524
 *  register the current fiber in some list of "which fiber wait until what
525
 *  moment", call Fiber.yield to pass control, and then in #close resume
526
 *  the fibers whose wait period has elapsed.
527
 *
528
 */
529
VALUE
530
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
531
0
{
532
0
    return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
533
0
}
534
535
VALUE
536
rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
537
0
{
538
0
    return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
539
0
}
540
541
/**
542
 *  Document-method: Fiber::Scheduler#yield
543
 *  call-seq: yield
544
 *
545
 *  Yield to the scheduler, to be resumed on the next scheduling cycle.
546
 */
547
VALUE
548
rb_fiber_scheduler_yield(VALUE scheduler)
549
0
{
550
    // First try to call the scheduler's yield method, if it exists:
551
0
    VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
552
0
    if (!UNDEF_P(result)) return result;
553
554
    // Otherwise, we can emulate yield by sleeping for 0 seconds:
555
0
    return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
556
0
}
557
558
#if 0
559
/*
560
 *  Document-method: Fiber::Scheduler#timeout_after
561
 *  call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
562
 *
563
 *  Invoked by Timeout.timeout to execute the given +block+ within the given
564
 *  +duration+. It can also be invoked directly by the scheduler or user code.
565
 *
566
 *  Attempt to limit the execution time of a given +block+ to the given
567
 *  +duration+ if possible. When a non-blocking operation causes the +block+'s
568
 *  execution time to exceed the specified +duration+, that non-blocking
569
 *  operation should be interrupted by raising the specified +exception_class+
570
 *  constructed with the given +exception_arguments+.
571
 *
572
 *  General execution timeouts are often considered risky. This implementation
573
 *  will only interrupt non-blocking operations. This is by design because it's
574
 *  expected that non-blocking operations can fail for a variety of
575
 *  unpredictable reasons, so applications should already be robust in handling
576
 *  these conditions and by implication timeouts.
577
 *
578
 *  However, as a result of this design, if the +block+ does not invoke any
579
 *  non-blocking operations, it will be impossible to interrupt it. If you
580
 *  desire to provide predictable points for timeouts, consider adding
581
 *  <tt>sleep(0)</tt>.
582
 *
583
 *  If the block is executed successfully, its result will be returned.
584
 *
585
 *  The exception will typically be raised using Fiber#raise.
586
 */
587
VALUE
588
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
589
{
590
    VALUE arguments[] = {
591
        timeout, exception, message
592
    };
593
594
    return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
595
}
596
597
VALUE
598
rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
599
{
600
    return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
601
}
602
#endif
603
604
/*
605
 *  Document-method: Fiber::Scheduler#process_wait
606
 *  call-seq: process_wait(pid, flags)
607
 *
608
 *  Invoked by Process::Status.wait in order to wait for a specified process.
609
 *  See that method description for arguments description.
610
 *
611
 *  Suggested minimal implementation:
612
 *
613
 *      Thread.new do
614
 *        Process::Status.wait(pid, flags)
615
 *      end.value
616
 *
617
 *  This hook is optional: if it is not present in the current scheduler,
618
 *  Process::Status.wait will behave as a blocking method.
619
 *
620
 *  Expected to return a Process::Status instance.
621
 */
622
VALUE
623
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
624
0
{
625
0
    VALUE arguments[] = {
626
0
        PIDT2NUM(pid), RB_INT2NUM(flags)
627
0
    };
628
629
0
    return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
630
0
}
631
632
/*
633
 *  Document-method: Fiber::Scheduler#block
634
 *  call-seq: block(blocker, timeout = nil)
635
 *
636
 *  Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current
637
 *  Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
638
 *  elapsed.
639
 *
640
 *  +blocker+ is what we are waiting on, informational only (for debugging and
641
 *  logging). There are no guarantee about its value.
642
 *
643
 *  Expected to return boolean, specifying whether the blocking operation was
644
 *  successful or not.
645
 */
646
VALUE
647
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
648
0
{
649
0
    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
650
0
}
651
652
/*
653
 *  Document-method: Fiber::Scheduler#unblock
654
 *  call-seq: unblock(blocker, fiber)
655
 *
656
 *  Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock
657
 *  calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use
658
 *  the +fiber+ parameter to understand which fiber is unblocked.
659
 *
660
 *  +blocker+ is what was awaited for, but it is informational only (for debugging
661
 *  and logging), and it is not guaranteed to be the same value as the +blocker+ for
662
 *  #block.
663
 *
664
 */
665
VALUE
666
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
667
0
{
668
0
    RUBY_ASSERT(rb_obj_is_fiber(fiber));
669
670
0
    VALUE result;
671
0
    enum ruby_tag_type state;
672
673
    // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
674
    //
675
    // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
676
0
    int saved_errno = errno;
677
678
    // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
679
0
    rb_execution_context_t *ec = GET_EC();
680
0
    int saved_interrupt_mask = ec->interrupt_mask;
681
0
    ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
682
683
0
    EC_PUSH_TAG(ec);
684
0
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
685
0
        result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
686
0
    }
687
0
    EC_POP_TAG();
688
689
0
    ec->interrupt_mask = saved_interrupt_mask;
690
691
0
    if (state) {
692
0
        EC_JUMP_TAG(ec, state);
693
0
    }
694
695
0
    RUBY_VM_CHECK_INTS(ec);
696
697
0
    errno = saved_errno;
698
699
0
    return result;
700
0
}
701
702
/*
703
 *  Document-method: Fiber::Scheduler#io_wait
704
 *  call-seq: io_wait(io, events, timeout)
705
 *
706
 *  Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
707
 *  specified descriptor is ready for specified events within
708
 *  the specified +timeout+.
709
 *
710
 *  +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
711
 *  <tt>IO::PRIORITY</tt>.
712
 *
713
 *  Suggested implementation should register which Fiber is waiting for which
714
 *  resources and immediately calling Fiber.yield to pass control to other
715
 *  fibers. Then, in the #close method, the scheduler might dispatch all the
716
 *  I/O resources to fibers waiting for it.
717
 *
718
 *  Expected to return the subset of events that are ready immediately.
719
 *
720
 */
721
static VALUE
722
0
fiber_scheduler_io_wait(VALUE _argument) {
723
0
    VALUE *arguments = (VALUE*)_argument;
724
725
0
    return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
726
0
}
727
728
VALUE
729
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
730
0
{
731
0
    VALUE arguments[] = {
732
0
        scheduler, io, events, timeout
733
0
    };
734
735
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
736
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
737
0
    } else {
738
0
        return fiber_scheduler_io_wait((VALUE)&arguments);
739
0
    }
740
0
}
741
742
VALUE
743
rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
744
0
{
745
0
    return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io));
746
0
}
747
748
VALUE
749
rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
750
0
{
751
0
    return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
752
0
}
753
754
/*
755
 *  Document-method: Fiber::Scheduler#io_select
756
 *  call-seq: io_select(readables, writables, exceptables, timeout)
757
 *
758
 *  Invoked by IO.select to ask whether the specified descriptors are ready for
759
 *  specified events within the specified +timeout+.
760
 *
761
 *  Expected to return the 3-tuple of Array of IOs that are ready.
762
 *
763
 */
764
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
765
0
{
766
0
    VALUE arguments[] = {
767
0
        readables, writables, exceptables, timeout
768
0
    };
769
770
0
    return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
771
0
}
772
773
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
774
0
{
775
    // I wondered about extracting argv, and checking if there is only a single
776
    // IO instance, and instead calling `io_wait`. However, it would require a
777
    // decent amount of work and it would be hard to preserve the exact
778
    // semantics of IO.select.
779
780
0
    return rb_check_funcall(scheduler, id_io_select, argc, argv);
781
0
}
782
783
/*
784
 *  Document-method: Fiber::Scheduler#io_read
785
 *  call-seq: io_read(io, buffer, length, offset) -> read length or -errno
786
 *
787
 *  Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
788
 *  specified +buffer+ (see IO::Buffer) at the given +offset+.
789
 *
790
 *  The +length+ argument is the "minimum length to be read". If the IO buffer
791
 *  size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
792
 *  but at least 1KiB will be. Generally, the only case where less data than
793
 *  +length+ will be read is if there is an error reading the data.
794
 *
795
 *  Specifying a +length+ of 0 is valid and means try reading at least once and
796
 *  return any available data.
797
 *
798
 *  Suggested implementation should try to read from +io+ in a non-blocking
799
 *  manner and call #io_wait if the +io+ is not ready (which will yield control
800
 *  to other fibers).
801
 *
802
 *  See IO::Buffer for an interface available to return data.
803
 *
804
 *  Expected to return number of bytes read, or, in case of an error,
805
 *  <tt>-errno</tt> (negated number corresponding to system's error code).
806
 *
807
 *  The method should be considered _experimental_.
808
 */
809
static VALUE
810
0
fiber_scheduler_io_read(VALUE _argument) {
811
0
    VALUE *arguments = (VALUE*)_argument;
812
813
0
    return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
814
0
}
815
816
VALUE
817
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
818
0
{
819
0
    if (!rb_respond_to(scheduler, id_io_read)) {
820
0
        return RUBY_Qundef;
821
0
    }
822
823
0
    VALUE arguments[] = {
824
0
        scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
825
0
    };
826
827
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
828
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
829
0
    } else {
830
0
        return fiber_scheduler_io_read((VALUE)&arguments);
831
0
    }
832
0
}
833
834
/*
835
 *  Document-method: Fiber::Scheduler#io_pread
836
 *  call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
837
 *
838
 *  Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
839
 *  at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
840
 *  +offset+.
841
 *
842
 *  This method is semantically the same as #io_read, but it allows to specify
843
 *  the offset to read from and is often better for asynchronous IO on the same
844
 *  file.
845
 *
846
 *  The method should be considered _experimental_.
847
 */
848
static VALUE
849
0
fiber_scheduler_io_pread(VALUE _argument) {
850
0
    VALUE *arguments = (VALUE*)_argument;
851
852
0
    return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
853
0
}
854
855
VALUE
856
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
857
0
{
858
0
    if (!rb_respond_to(scheduler, id_io_pread)) {
859
0
        return RUBY_Qundef;
860
0
    }
861
862
0
    VALUE arguments[] = {
863
0
        scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
864
0
    };
865
866
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
867
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
868
0
    } else {
869
0
        return fiber_scheduler_io_pread((VALUE)&arguments);
870
0
    }
871
0
}
872
873
/*
874
 *  Document-method: Fiber::Scheduler#io_write
875
 *  call-seq: io_write(io, buffer, length, offset) -> written length or -errno
876
 *
877
 *  Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
878
 *  from a specified +buffer+ (see IO::Buffer) at the given +offset+.
879
 *
880
 *  The +length+ argument is the "minimum length to be written". If the IO
881
 *  buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
882
 *  will be written, but at least 1KiB will be. Generally, the only case where
883
 *  less data than +length+ will be written is if there is an error writing the
884
 *  data.
885
 *
886
 *  Specifying a +length+ of 0 is valid and means try writing at least once, as
887
 *  much data as possible.
888
 *
889
 *  Suggested implementation should try to write to +io+ in a non-blocking
890
 *  manner and call #io_wait if the +io+ is not ready (which will yield control
891
 *  to other fibers).
892
 *
893
 *  See IO::Buffer for an interface available to get data from buffer
894
 *  efficiently.
895
 *
896
 *  Expected to return number of bytes written, or, in case of an error,
897
 *  <tt>-errno</tt> (negated number corresponding to system's error code).
898
 *
899
 *  The method should be considered _experimental_.
900
 */
901
static VALUE
902
0
fiber_scheduler_io_write(VALUE _argument) {
903
0
    VALUE *arguments = (VALUE*)_argument;
904
905
0
    return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
906
0
}
907
908
VALUE
909
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
910
0
{
911
0
    if (!rb_respond_to(scheduler, id_io_write)) {
912
0
        return RUBY_Qundef;
913
0
    }
914
915
0
    VALUE arguments[] = {
916
0
        scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
917
0
    };
918
919
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
920
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
921
0
    } else {
922
0
        return fiber_scheduler_io_write((VALUE)&arguments);
923
0
    }
924
0
}
925
926
/*
927
 *  Document-method: Fiber::Scheduler#io_pwrite
928
 *  call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
929
 *
930
 *  Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
931
 *  at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
932
 *  +offset+.
933
 *
934
 *  This method is semantically the same as #io_write, but it allows to specify
935
 *  the offset to write to and is often better for asynchronous IO on the same
936
 *  file.
937
 *
938
 *  The method should be considered _experimental_.
939
 *
940
 */
941
static VALUE
942
0
fiber_scheduler_io_pwrite(VALUE _argument) {
943
0
    VALUE *arguments = (VALUE*)_argument;
944
945
0
    return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
946
0
}
947
948
VALUE
949
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
950
0
{
951
952
953
0
    if (!rb_respond_to(scheduler, id_io_pwrite)) {
954
0
        return RUBY_Qundef;
955
0
    }
956
957
0
    VALUE arguments[] = {
958
0
        scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
959
0
    };
960
961
0
    if (rb_respond_to(scheduler, id_fiber_interrupt)) {
962
0
        return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
963
0
    } else {
964
0
        return fiber_scheduler_io_pwrite((VALUE)&arguments);
965
0
    }
966
0
}
967
968
VALUE
969
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
970
0
{
971
0
    VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
972
973
0
    VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
974
975
0
    rb_io_buffer_free_locked(buffer);
976
977
0
    return result;
978
0
}
979
980
VALUE
981
rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
982
0
{
983
0
    VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
984
985
0
    VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
986
987
0
    rb_io_buffer_free_locked(buffer);
988
989
0
    return result;
990
0
}
991
992
VALUE
993
rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
994
0
{
995
0
    VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
996
997
0
    VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
998
999
0
    rb_io_buffer_free_locked(buffer);
1000
1001
0
    return result;
1002
0
}
1003
1004
VALUE
1005
rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
1006
0
{
1007
0
    VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
1008
1009
0
    VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
1010
1011
0
    rb_io_buffer_free_locked(buffer);
1012
1013
0
    return result;
1014
0
}
1015
1016
/*
1017
 *  Document-method: Fiber::Scheduler#io_close
1018
 *  call-seq: io_close(fd)
1019
 *
1020
 *  Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that
1021
 *  the method will receive an integer file descriptor of the closed object, not an object
1022
 *  itself.
1023
 */
1024
VALUE
1025
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
1026
0
{
1027
0
    VALUE arguments[] = {io};
1028
1029
0
    return rb_check_funcall(scheduler, id_io_close, 1, arguments);
1030
0
}
1031
1032
/*
1033
 *  Document-method: Fiber::Scheduler#address_resolve
1034
 *  call-seq: address_resolve(hostname) -> array_of_strings or nil
1035
 *
1036
 *  Invoked by any method that performs a non-reverse DNS lookup. The most
1037
 *  notable method is Addrinfo.getaddrinfo, but there are many other.
1038
 *
1039
 *  The method is expected to return an array of strings corresponding to ip
1040
 *  addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
1041
 *
1042
 *  Fairly exhaustive list of all possible call-sites:
1043
 *
1044
 *  - Addrinfo.getaddrinfo
1045
 *  - Addrinfo.tcp
1046
 *  - Addrinfo.udp
1047
 *  - Addrinfo.ip
1048
 *  - Addrinfo.new
1049
 *  - Addrinfo.marshal_load
1050
 *  - SOCKSSocket.new
1051
 *  - TCPServer.new
1052
 *  - TCPSocket.new
1053
 *  - IPSocket.getaddress
1054
 *  - TCPSocket.gethostbyname
1055
 *  - UDPSocket#connect
1056
 *  - UDPSocket#bind
1057
 *  - UDPSocket#send
1058
 *  - Socket.getaddrinfo
1059
 *  - Socket.gethostbyname
1060
 *  - Socket.pack_sockaddr_in
1061
 *  - Socket.sockaddr_in
1062
 *  - Socket.unpack_sockaddr_in
1063
 */
1064
VALUE
1065
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
1066
0
{
1067
0
    VALUE arguments[] = {
1068
0
        hostname
1069
0
    };
1070
1071
0
    return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1072
0
}
1073
1074
/*
1075
 *  Document-method: Fiber::Scheduler#blocking_operation_wait
1076
 *  call-seq: blocking_operation_wait(blocking_operation)
1077
 *
1078
 *  Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1079
 *  The blocking_operation is an opaque object that encapsulates the blocking operation
1080
 *  and responds to a <tt>#call</tt> method without any arguments.
1081
 *
1082
 *  If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1083
 *  the blocking operation, Ruby will fall back to the non-scheduler implementation.
1084
 *
1085
 *  Minimal suggested implementation is:
1086
 *
1087
 *     def blocking_operation_wait(blocking_operation)
1088
 *       Thread.new { blocking_operation.call }.join
1089
 *     end
1090
 */
1091
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1092
0
{
1093
    // Check if scheduler supports blocking_operation_wait before creating the object
1094
0
    if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1095
0
        return Qundef;
1096
0
    }
1097
1098
    // Create a new BlockingOperation with the blocking operation
1099
0
    VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1100
1101
0
    VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1102
1103
    // Get the operation data to check if it was executed
1104
0
    rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1105
0
    rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1106
1107
    // Invalidate the operation now that we're done with it
1108
0
    operation->function = NULL;
1109
0
    operation->state = NULL;
1110
0
    operation->data = NULL;
1111
0
    operation->data2 = NULL;
1112
0
    operation->unblock_function = NULL;
1113
1114
    // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
1115
0
    if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1116
0
        return Qundef;
1117
0
    }
1118
1119
0
    return result;
1120
0
}
1121
1122
/*
1123
 * Document-method: Fiber::Scheduler#fiber_interrupt
1124
 * call-seq: fiber_interrupt(fiber, exception)
1125
 *
1126
 * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted
1127
 * with an exception. For example, IO#close uses this method to interrupt fibers that are performing
1128
 * blocking IO operations.
1129
 *
1130
 */
1131
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
1132
0
{
1133
0
    VALUE arguments[] = {
1134
0
        fiber, exception
1135
0
    };
1136
1137
0
    VALUE result;
1138
0
    enum ruby_tag_type state;
1139
1140
    // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
1141
0
    rb_execution_context_t *ec = GET_EC();
1142
0
    int saved_interrupt_mask = ec->interrupt_mask;
1143
0
    ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
1144
1145
0
    EC_PUSH_TAG(ec);
1146
0
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1147
0
        result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1148
0
    }
1149
0
    EC_POP_TAG();
1150
1151
0
    ec->interrupt_mask = saved_interrupt_mask;
1152
1153
0
    if (state) {
1154
0
        EC_JUMP_TAG(ec, state);
1155
0
    }
1156
1157
0
    RUBY_VM_CHECK_INTS(ec);
1158
1159
0
    return result;
1160
0
}
1161
1162
/*
1163
 *  Document-method: Fiber::Scheduler#fiber
1164
 *  call-seq: fiber(&block)
1165
 *
1166
 *  Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1167
 *  run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1168
 *
1169
 *  Minimal suggested implementation is:
1170
 *
1171
 *     def fiber(&block)
1172
 *       fiber = Fiber.new(blocking: false, &block)
1173
 *       fiber.resume
1174
 *       fiber
1175
 *     end
1176
 */
1177
VALUE
1178
rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1179
0
{
1180
0
    return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1181
0
}
1182
1183
/*
1184
 * C API: Cancel a blocking operation
1185
 *
1186
 * This function cancels a blocking operation. If the operation is queued,
1187
 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1188
 * and calls the unblock function to interrupt the operation.
1189
 *
1190
 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1191
 */
1192
int
1193
rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
1194
0
{
1195
0
    if (blocking_operation == NULL) {
1196
0
        return -1;
1197
0
    }
1198
1199
0
    rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1200
1201
0
    switch (current_state) {
1202
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1203
            // Work hasn't started - just mark as cancelled:
1204
0
            if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1205
                // Successfully cancelled before execution:
1206
0
                return 0;
1207
0
            }
1208
            // Fall through if state changed between load and CAS
1209
1210
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1211
            // Work is running - mark cancelled AND call unblock function
1212
0
            if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1213
                // State changed between load and CAS - operation may have completed:
1214
0
                return 0;
1215
0
            }
1216
            // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
1217
0
            rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
1218
0
            if (unblock_function) {
1219
0
                RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
1220
0
                blocking_operation->unblock_function(blocking_operation->data2);
1221
0
            }
1222
            // Cancelled during execution (unblock function called):
1223
0
            return 1;
1224
1225
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1226
0
        case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1227
            // Already finished or cancelled:
1228
0
            return 0;
1229
0
    }
1230
1231
0
    return 0;
1232
0
}