Line | Count | Source |
1 | | /********************************************************************** |
2 | | |
3 | | scheduler.c |
4 | | |
5 | | $Author$ |
6 | | |
7 | | Copyright (C) 2020 Samuel Grant Dawson Williams |
8 | | |
9 | | **********************************************************************/ |
10 | | |
11 | | #include "vm_core.h" |
12 | | #include "eval_intern.h" |
13 | | #include "ruby/fiber/scheduler.h" |
14 | | #include "ruby/io.h" |
15 | | #include "ruby/io/buffer.h" |
16 | | |
17 | | #include "ruby/thread.h" |
18 | | |
19 | | // For `ruby_thread_has_gvl_p`: |
20 | | #include "internal/thread.h" |
21 | | |
22 | | // For atomic operations: |
23 | | #include "ruby_atomic.h" |
24 | | |
25 | | static ID id_close; |
26 | | static ID id_scheduler_close; |
27 | | |
28 | | static ID id_block; |
29 | | static ID id_unblock; |
30 | | |
31 | | static ID id_yield; |
32 | | |
33 | | static ID id_timeout_after; |
34 | | static ID id_kernel_sleep; |
35 | | static ID id_process_wait; |
36 | | |
37 | | static ID id_io_read, id_io_pread; |
38 | | static ID id_io_write, id_io_pwrite; |
39 | | static ID id_io_wait; |
40 | | static ID id_io_select; |
41 | | static ID id_io_close; |
42 | | |
43 | | static ID id_address_resolve; |
44 | | |
45 | | static ID id_blocking_operation_wait; |
46 | | static ID id_fiber_interrupt; |
47 | | |
48 | | static ID id_fiber_schedule; |
49 | | |
50 | | // Our custom blocking operation class |
51 | | static VALUE rb_cFiberSchedulerBlockingOperation; |
52 | | |
53 | | /* |
54 | | * Custom blocking operation structure for blocking operations |
55 | | * This replaces the use of Ruby procs to avoid use-after-free issues |
56 | | * and provides a cleaner C API for native work pools. |
57 | | */ |
58 | | |
59 | | typedef enum { |
60 | | RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started |
61 | | RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running |
62 | | RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error) |
63 | | RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled |
64 | | } rb_fiber_blocking_operation_status_t; |
65 | | |
66 | | struct rb_fiber_scheduler_blocking_operation { |
67 | | void *(*function)(void *); |
68 | | void *data; |
69 | | |
70 | | rb_unblock_function_t *unblock_function; |
71 | | void *data2; |
72 | | |
73 | | int flags; |
74 | | struct rb_fiber_scheduler_blocking_operation_state *state; |
75 | | |
76 | | // Execution status |
77 | | volatile rb_atomic_t status; |
78 | | }; |
79 | | |
80 | | static size_t |
81 | | blocking_operation_memsize(const void *ptr) |
82 | 0 | { |
83 | 0 | return sizeof(rb_fiber_scheduler_blocking_operation_t); |
84 | 0 | } |
85 | | |
86 | | static const rb_data_type_t blocking_operation_data_type = { |
87 | | "Fiber::Scheduler::BlockingOperation", |
88 | | { |
89 | | NULL, // nothing to mark |
90 | | RUBY_DEFAULT_FREE, |
91 | | blocking_operation_memsize, |
92 | | }, |
93 | | 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE |
94 | | }; |
95 | | |
96 | | /* |
97 | | * Allocate a new blocking operation |
98 | | */ |
99 | | static VALUE |
100 | | blocking_operation_alloc(VALUE klass) |
101 | 0 | { |
102 | 0 | rb_fiber_scheduler_blocking_operation_t *blocking_operation; |
103 | 0 | VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation); |
104 | |
|
105 | 0 | blocking_operation->function = NULL; |
106 | 0 | blocking_operation->data = NULL; |
107 | 0 | blocking_operation->unblock_function = NULL; |
108 | 0 | blocking_operation->data2 = NULL; |
109 | 0 | blocking_operation->flags = 0; |
110 | 0 | blocking_operation->state = NULL; |
111 | 0 | blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED; |
112 | |
|
113 | 0 | return obj; |
114 | 0 | } |
115 | | |
116 | | /* |
117 | | * Get the blocking operation struct from a Ruby object |
118 | | */ |
119 | | static rb_fiber_scheduler_blocking_operation_t * |
120 | | get_blocking_operation(VALUE obj) |
121 | 0 | { |
122 | 0 | rb_fiber_scheduler_blocking_operation_t *blocking_operation; |
123 | 0 | TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation); |
124 | 0 | return blocking_operation; |
125 | 0 | } |
126 | | |
127 | | /* |
128 | | * Document-method: Fiber::Scheduler::BlockingOperation#call |
129 | | * |
130 | | * Execute the blocking operation. This method releases the GVL and calls |
131 | | * the blocking function, then restores the errno value. |
132 | | * |
133 | | * Returns nil. The actual result is stored in the associated state object. |
134 | | */ |
135 | | static VALUE |
136 | | blocking_operation_call(VALUE self) |
137 | 0 | { |
138 | 0 | rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self); |
139 | |
|
140 | 0 | if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { |
141 | 0 | rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!"); |
142 | 0 | } |
143 | | |
144 | 0 | if (blocking_operation->function == NULL) { |
145 | 0 | rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!"); |
146 | 0 | } |
147 | | |
148 | 0 | if (blocking_operation->state == NULL) { |
149 | 0 | rb_raise(rb_eRuntimeError, "Blocking operation has no result object!"); |
150 | 0 | } |
151 | | |
152 | | // Mark as executing |
153 | 0 | blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING; |
154 | | |
155 | | // Execute the blocking operation without GVL |
156 | 0 | blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data, |
157 | 0 | blocking_operation->unblock_function, blocking_operation->data2, |
158 | 0 | blocking_operation->flags); |
159 | 0 | blocking_operation->state->saved_errno = rb_errno(); |
160 | | |
161 | | // Mark as completed |
162 | 0 | blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED; |
163 | |
|
164 | 0 | return Qnil; |
165 | 0 | } |
166 | | |
167 | | /* |
168 | | * C API: Extract blocking operation struct from Ruby object (GVL required) |
169 | | * |
170 | | * This function safely extracts the opaque struct from a BlockingOperation VALUE |
171 | | * while holding the GVL. The returned pointer can be passed to worker threads |
172 | | * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl. |
173 | | * |
174 | | * Returns the opaque struct pointer on success, NULL on error. |
175 | | * Must be called while holding the GVL. |
176 | | */ |
177 | | rb_fiber_scheduler_blocking_operation_t * |
178 | | rb_fiber_scheduler_blocking_operation_extract(VALUE self) |
179 | 0 | { |
180 | 0 | return get_blocking_operation(self); |
181 | 0 | } |
182 | | |
183 | | /* |
184 | | * C API: Execute blocking operation from opaque struct (GVL not required) |
185 | | * |
186 | | * This function executes a blocking operation using the opaque struct pointer |
187 | | * obtained from rb_fiber_scheduler_blocking_operation_extract. |
188 | | * It can be called from native threads without holding the GVL. |
189 | | * |
190 | | * Returns 0 on success, -1 on error. |
191 | | */ |
192 | | int |
193 | | rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation) |
194 | 0 | { |
195 | 0 | if (blocking_operation == NULL) { |
196 | 0 | return -1; |
197 | 0 | } |
198 | | |
199 | 0 | if (blocking_operation->function == NULL || blocking_operation->state == NULL) { |
200 | 0 | return -1; // Invalid blocking operation |
201 | 0 | } |
202 | | |
203 | | // Resolve sentinel values for unblock_function and data2: |
204 | 0 | rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD()); |
205 | | |
206 | | // Atomically check if we can transition from QUEUED to EXECUTING |
207 | 0 | rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED; |
208 | 0 | if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) { |
209 | | // Already cancelled or in wrong state |
210 | 0 | return -1; |
211 | 0 | } |
212 | | |
213 | | // Now we're executing - call the function |
214 | 0 | blocking_operation->state->result = blocking_operation->function(blocking_operation->data); |
215 | 0 | blocking_operation->state->saved_errno = errno; |
216 | | |
217 | | // Atomically transition to completed (unless cancelled during execution) |
218 | 0 | expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING; |
219 | 0 | if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) { |
220 | | // Successfully completed |
221 | 0 | return 0; |
222 | 0 | } else { |
223 | | // Was cancelled during execution |
224 | 0 | blocking_operation->state->saved_errno = EINTR; |
225 | 0 | return -1; |
226 | 0 | } |
227 | 0 | } |
228 | | |
229 | | /* |
230 | | * C API: Create a new blocking operation |
231 | | * |
232 | | * This creates a blocking operation that can be executed by native work pools. |
233 | | * The blocking operation holds references to the function and data safely. |
234 | | */ |
235 | | VALUE |
236 | | rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data, |
237 | | rb_unblock_function_t *unblock_function, void *data2, |
238 | | int flags, struct rb_fiber_scheduler_blocking_operation_state *state) |
239 | 0 | { |
240 | 0 | VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation); |
241 | 0 | rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self); |
242 | |
|
243 | 0 | blocking_operation->function = function; |
244 | 0 | blocking_operation->data = data; |
245 | 0 | blocking_operation->unblock_function = unblock_function; |
246 | 0 | blocking_operation->data2 = data2; |
247 | 0 | blocking_operation->flags = flags; |
248 | 0 | blocking_operation->state = state; |
249 | |
|
250 | 0 | return self; |
251 | 0 | } |
252 | | |
253 | | /* |
254 | | * |
255 | | * Document-class: Fiber::Scheduler |
256 | | * |
257 | | * This is not an existing class, but documentation of the interface that Scheduler |
258 | | * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking |
259 | | * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations |
260 | | * of some concepts. |
261 | | * |
262 | | * Scheduler's behavior and usage are expected to be as follows: |
263 | | * |
264 | | * * When the execution in the non-blocking Fiber reaches some blocking operation (like |
265 | | * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's |
266 | | * hook methods, listed below. |
267 | | * * Scheduler somehow registers what the current fiber is waiting on, and yields control |
268 | | * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its |
269 | | * wait to end, and other fibers in the same thread can perform) |
270 | | * * At the end of the current thread execution, the scheduler's method #scheduler_close is called |
271 | | * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has |
272 | | * registered on hook calls) and resuming them when the awaited resource is ready |
273 | | * (e.g. I/O ready or sleep time elapsed). |
274 | | * |
275 | | * This way concurrent execution will be achieved transparently for every |
276 | | * individual Fiber's code. |
277 | | * |
278 | | * Scheduler implementations are provided by gems, like |
279 | | * Async[https://github.com/socketry/async]. |
280 | | * |
281 | | * Hook methods are: |
282 | | * |
283 | | * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close |
284 | | * * #process_wait |
285 | | * * #kernel_sleep |
286 | | * * #timeout_after |
287 | | * * #address_resolve |
288 | | * * #block and #unblock |
289 | | * * #blocking_operation_wait |
290 | | * * #fiber_interrupt |
291 | | * * #yield |
292 | | * * (the list is expanded as Ruby developers make more methods having non-blocking calls) |
293 | | * |
294 | | * When not specified otherwise, the hook implementations are mandatory: if they are not |
295 | | * implemented, the methods trying to call hook will fail. To provide backward compatibility, |
296 | | * in the future hooks will be optional (if they are not implemented, due to the scheduler |
297 | | * being created for the older Ruby version, the code which needs this hook will not fail, |
298 | | * and will just behave in a blocking fashion). |
299 | | * |
300 | | * It is also strongly recommended that the scheduler implements the #fiber method, which is |
301 | | * delegated to by Fiber.schedule. |
302 | | * |
303 | | * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in |
304 | | * <tt>test/fiber/scheduler.rb</tt> |
305 | | * |
306 | | */ |
307 | | void |
308 | | Init_Fiber_Scheduler(void) |
309 | 9 | { |
310 | 9 | id_close = rb_intern_const("close"); |
311 | 9 | id_scheduler_close = rb_intern_const("scheduler_close"); |
312 | | |
313 | 9 | id_block = rb_intern_const("block"); |
314 | 9 | id_unblock = rb_intern_const("unblock"); |
315 | 9 | id_yield = rb_intern_const("yield"); |
316 | | |
317 | 9 | id_timeout_after = rb_intern_const("timeout_after"); |
318 | 9 | id_kernel_sleep = rb_intern_const("kernel_sleep"); |
319 | 9 | id_process_wait = rb_intern_const("process_wait"); |
320 | | |
321 | 9 | id_io_read = rb_intern_const("io_read"); |
322 | 9 | id_io_pread = rb_intern_const("io_pread"); |
323 | 9 | id_io_write = rb_intern_const("io_write"); |
324 | 9 | id_io_pwrite = rb_intern_const("io_pwrite"); |
325 | | |
326 | 9 | id_io_wait = rb_intern_const("io_wait"); |
327 | 9 | id_io_select = rb_intern_const("io_select"); |
328 | 9 | id_io_close = rb_intern_const("io_close"); |
329 | | |
330 | 9 | id_address_resolve = rb_intern_const("address_resolve"); |
331 | | |
332 | 9 | id_blocking_operation_wait = rb_intern_const("blocking_operation_wait"); |
333 | 9 | id_fiber_interrupt = rb_intern_const("fiber_interrupt"); |
334 | | |
335 | 9 | id_fiber_schedule = rb_intern_const("fiber"); |
336 | | |
337 | | // Define an anonymous BlockingOperation class for internal use only |
338 | | // This is completely hidden from Ruby code and cannot be instantiated directly |
339 | 9 | rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject); |
340 | 9 | rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc); |
341 | 9 | rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0); |
342 | | |
343 | | // Register the anonymous class as a GC root so it doesn't get collected |
344 | 9 | rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation); |
345 | | |
346 | | #if 0 /* for RDoc */ |
347 | | rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject); |
348 | | rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0); |
349 | | rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2); |
350 | | rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3); |
351 | | rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4); |
352 | | rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4); |
353 | | rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5); |
354 | | rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5); |
355 | | rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4); |
356 | | rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1); |
357 | | rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1); |
358 | | rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3); |
359 | | rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2); |
360 | | rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2); |
361 | | rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2); |
362 | | rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2); |
363 | | rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0); |
364 | | rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2); |
365 | | rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1); |
366 | | #endif |
367 | 9 | } |
368 | | |
369 | | VALUE |
370 | | rb_fiber_scheduler_get(void) |
371 | 0 | { |
372 | 0 | RUBY_ASSERT(ruby_thread_has_gvl_p()); |
373 | |
|
374 | 0 | rb_thread_t *thread = GET_THREAD(); |
375 | 0 | RUBY_ASSERT(thread); |
376 | |
|
377 | 0 | return thread->scheduler; |
378 | 0 | } |
379 | | |
380 | | static void |
381 | | verify_interface(VALUE scheduler) |
382 | 0 | { |
383 | 0 | if (!rb_respond_to(scheduler, id_block)) { |
384 | 0 | rb_raise(rb_eArgError, "Scheduler must implement #block"); |
385 | 0 | } |
386 | | |
387 | 0 | if (!rb_respond_to(scheduler, id_unblock)) { |
388 | 0 | rb_raise(rb_eArgError, "Scheduler must implement #unblock"); |
389 | 0 | } |
390 | | |
391 | 0 | if (!rb_respond_to(scheduler, id_kernel_sleep)) { |
392 | 0 | rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep"); |
393 | 0 | } |
394 | | |
395 | 0 | if (!rb_respond_to(scheduler, id_io_wait)) { |
396 | 0 | rb_raise(rb_eArgError, "Scheduler must implement #io_wait"); |
397 | 0 | } |
398 | | |
399 | 0 | if (!rb_respond_to(scheduler, id_fiber_interrupt)) { |
400 | 0 | rb_warn("Scheduler should implement #fiber_interrupt"); |
401 | 0 | } |
402 | 0 | } |
403 | | |
404 | | static VALUE |
405 | | fiber_scheduler_close(VALUE scheduler) |
406 | 0 | { |
407 | 0 | return rb_fiber_scheduler_close(scheduler); |
408 | 0 | } |
409 | | |
410 | | static VALUE |
411 | | fiber_scheduler_close_ensure(VALUE _thread) |
412 | 0 | { |
413 | 0 | rb_thread_t *thread = (rb_thread_t*)_thread; |
414 | 0 | thread->scheduler = Qnil; |
415 | |
|
416 | 0 | return Qnil; |
417 | 0 | } |
418 | | |
419 | | VALUE |
420 | | rb_fiber_scheduler_set(VALUE scheduler) |
421 | 0 | { |
422 | 0 | RUBY_ASSERT(ruby_thread_has_gvl_p()); |
423 | |
|
424 | 0 | rb_thread_t *thread = GET_THREAD(); |
425 | 0 | RUBY_ASSERT(thread); |
426 | |
|
427 | 0 | if (scheduler != Qnil) { |
428 | 0 | verify_interface(scheduler); |
429 | 0 | } |
430 | | |
431 | | // We invoke Scheduler#close when setting it to something else, to ensure |
432 | | // the previous scheduler runs to completion before changing the scheduler. |
433 | | // That way, we do not need to consider interactions, e.g., of a Fiber from |
434 | | // the previous scheduler with the new scheduler. |
435 | 0 | if (thread->scheduler != Qnil) { |
436 | | // rb_fiber_scheduler_close(thread->scheduler); |
437 | 0 | rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread); |
438 | 0 | } |
439 | |
|
440 | 0 | thread->scheduler = scheduler; |
441 | |
|
442 | 0 | return thread->scheduler; |
443 | 0 | } |
444 | | |
445 | | static VALUE |
446 | | fiber_scheduler_current_for_threadptr(rb_thread_t *thread) |
447 | 35 | { |
448 | 35 | RUBY_ASSERT(thread); |
449 | | |
450 | 35 | if (thread->blocking == 0) { |
451 | 0 | return thread->scheduler; |
452 | 0 | } |
453 | 35 | else { |
454 | 35 | return Qnil; |
455 | 35 | } |
456 | 35 | } |
457 | | |
458 | | VALUE rb_fiber_scheduler_current(void) |
459 | 35 | { |
460 | 35 | RUBY_ASSERT(ruby_thread_has_gvl_p()); |
461 | | |
462 | 35 | return fiber_scheduler_current_for_threadptr(GET_THREAD()); |
463 | 35 | } |
464 | | |
465 | | // This function is allowed to be called without holding the GVL. |
466 | | VALUE rb_fiber_scheduler_current_for_thread(VALUE thread) |
467 | 0 | { |
468 | 0 | return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); |
469 | 0 | } |
470 | | |
471 | | VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) |
472 | 0 | { |
473 | 0 | return fiber_scheduler_current_for_threadptr(thread); |
474 | 0 | } |
475 | | |
476 | | /* |
477 | | * |
478 | | * Document-method: Fiber::Scheduler#close |
479 | | * |
480 | | * Called when the current thread exits. The scheduler is expected to implement this |
481 | | * method in order to allow all waiting fibers to finalize their execution. |
482 | | * |
483 | | * The suggested pattern is to implement the main event loop in the #close method. |
484 | | * |
485 | | */ |
486 | | VALUE |
487 | | rb_fiber_scheduler_close(VALUE scheduler) |
488 | 0 | { |
489 | 0 | RUBY_ASSERT(ruby_thread_has_gvl_p()); |
490 | |
|
491 | 0 | VALUE result; |
492 | | |
493 | | // The reason for calling `scheduler_close` before calling `close` is for |
494 | | // legacy schedulers which implement `close` and expect the user to call |
495 | | // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` |
496 | | // which should call `scheduler_close`. If it were to call `close`, it |
497 | | // would create an infinite loop. |
498 | |
|
499 | 0 | result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); |
500 | 0 | if (!UNDEF_P(result)) return result; |
501 | | |
502 | 0 | result = rb_check_funcall(scheduler, id_close, 0, NULL); |
503 | 0 | if (!UNDEF_P(result)) return result; |
504 | | |
505 | 0 | return Qnil; |
506 | 0 | } |
507 | | |
508 | | VALUE |
509 | | rb_fiber_scheduler_make_timeout(struct timeval *timeout) |
510 | 0 | { |
511 | 0 | if (timeout) { |
512 | 0 | return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec)); |
513 | 0 | } |
514 | | |
515 | 0 | return Qnil; |
516 | 0 | } |
517 | | |
518 | | /* |
519 | | * Document-method: Fiber::Scheduler#kernel_sleep |
520 | | * call-seq: kernel_sleep(duration = nil) |
521 | | * |
522 | | * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide |
523 | | * an implementation of sleeping in a non-blocking way. Implementation might |
524 | | * register the current fiber in some list of "which fiber wait until what |
525 | | * moment", call Fiber.yield to pass control, and then in #close resume |
526 | | * the fibers whose wait period has elapsed. |
527 | | * |
528 | | */ |
529 | | VALUE |
530 | | rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout) |
531 | 0 | { |
532 | 0 | return rb_funcall(scheduler, id_kernel_sleep, 1, timeout); |
533 | 0 | } |
534 | | |
535 | | VALUE |
536 | | rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) |
537 | 0 | { |
538 | 0 | return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); |
539 | 0 | } |
540 | | |
541 | | /** |
542 | | * Document-method: Fiber::Scheduler#yield |
543 | | * call-seq: yield |
544 | | * |
545 | | * Yield to the scheduler, to be resumed on the next scheduling cycle. |
546 | | */ |
547 | | VALUE |
548 | | rb_fiber_scheduler_yield(VALUE scheduler) |
549 | 0 | { |
550 | | // First try to call the scheduler's yield method, if it exists: |
551 | 0 | VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL); |
552 | 0 | if (!UNDEF_P(result)) return result; |
553 | | |
554 | | // Otherwise, we can emulate yield by sleeping for 0 seconds: |
555 | 0 | return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0)); |
556 | 0 | } |
557 | | |
558 | | #if 0 |
559 | | /* |
560 | | * Document-method: Fiber::Scheduler#timeout_after |
561 | | * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block |
562 | | * |
563 | | * Invoked by Timeout.timeout to execute the given +block+ within the given |
564 | | * +duration+. It can also be invoked directly by the scheduler or user code. |
565 | | * |
566 | | * Attempt to limit the execution time of a given +block+ to the given |
567 | | * +duration+ if possible. When a non-blocking operation causes the +block+'s |
568 | | * execution time to exceed the specified +duration+, that non-blocking |
569 | | * operation should be interrupted by raising the specified +exception_class+ |
570 | | * constructed with the given +exception_arguments+. |
571 | | * |
572 | | * General execution timeouts are often considered risky. This implementation |
573 | | * will only interrupt non-blocking operations. This is by design because it's |
574 | | * expected that non-blocking operations can fail for a variety of |
575 | | * unpredictable reasons, so applications should already be robust in handling |
576 | | * these conditions and by implication timeouts. |
577 | | * |
578 | | * However, as a result of this design, if the +block+ does not invoke any |
579 | | * non-blocking operations, it will be impossible to interrupt it. If you |
580 | | * desire to provide predictable points for timeouts, consider adding |
581 | | * <tt>sleep(0)</tt>. |
582 | | * |
583 | | * If the block is executed successfully, its result will be returned. |
584 | | * |
585 | | * The exception will typically be raised using Fiber#raise. |
586 | | */ |
587 | | VALUE |
588 | | rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message) |
589 | | { |
590 | | VALUE arguments[] = { |
591 | | timeout, exception, message |
592 | | }; |
593 | | |
594 | | return rb_check_funcall(scheduler, id_timeout_after, 3, arguments); |
595 | | } |
596 | | |
597 | | VALUE |
598 | | rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv) |
599 | | { |
600 | | return rb_check_funcall(scheduler, id_timeout_after, argc, argv); |
601 | | } |
602 | | #endif |
603 | | |
604 | | /* |
605 | | * Document-method: Fiber::Scheduler#process_wait |
606 | | * call-seq: process_wait(pid, flags) |
607 | | * |
608 | | * Invoked by Process::Status.wait in order to wait for a specified process. |
609 | | * See that method description for arguments description. |
610 | | * |
611 | | * Suggested minimal implementation: |
612 | | * |
613 | | * Thread.new do |
614 | | * Process::Status.wait(pid, flags) |
615 | | * end.value |
616 | | * |
617 | | * This hook is optional: if it is not present in the current scheduler, |
618 | | * Process::Status.wait will behave as a blocking method. |
619 | | * |
620 | | * Expected to return a Process::Status instance. |
621 | | */ |
622 | | VALUE |
623 | | rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) |
624 | 0 | { |
625 | 0 | VALUE arguments[] = { |
626 | 0 | PIDT2NUM(pid), RB_INT2NUM(flags) |
627 | 0 | }; |
628 | |
|
629 | 0 | return rb_check_funcall(scheduler, id_process_wait, 2, arguments); |
630 | 0 | } |
631 | | |
632 | | /* |
633 | | * Document-method: Fiber::Scheduler#block |
634 | | * call-seq: block(blocker, timeout = nil) |
635 | | * |
636 | | * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current |
637 | | * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has |
638 | | * elapsed. |
639 | | * |
640 | | * +blocker+ is what we are waiting on, informational only (for debugging and |
641 | | * logging). There are no guarantee about its value. |
642 | | * |
643 | | * Expected to return boolean, specifying whether the blocking operation was |
644 | | * successful or not. |
645 | | */ |
646 | | VALUE |
647 | | rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) |
648 | 0 | { |
649 | 0 | return rb_funcall(scheduler, id_block, 2, blocker, timeout); |
650 | 0 | } |
651 | | |
652 | | /* |
653 | | * Document-method: Fiber::Scheduler#unblock |
654 | | * call-seq: unblock(blocker, fiber) |
655 | | * |
656 | | * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock |
657 | | * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use |
658 | | * the +fiber+ parameter to understand which fiber is unblocked. |
659 | | * |
660 | | * +blocker+ is what was awaited for, but it is informational only (for debugging |
661 | | * and logging), and it is not guaranteed to be the same value as the +blocker+ for |
662 | | * #block. |
663 | | * |
664 | | */ |
665 | | VALUE |
666 | | rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) |
667 | 0 | { |
668 | 0 | RUBY_ASSERT(rb_obj_is_fiber(fiber)); |
669 | |
|
670 | 0 | VALUE result; |
671 | 0 | enum ruby_tag_type state; |
672 | | |
673 | | // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`. |
674 | | // |
675 | | // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it. |
676 | 0 | int saved_errno = errno; |
677 | | |
678 | | // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`. |
679 | 0 | rb_execution_context_t *ec = GET_EC(); |
680 | 0 | int saved_interrupt_mask = ec->interrupt_mask; |
681 | 0 | ec->interrupt_mask |= PENDING_INTERRUPT_MASK; |
682 | |
|
683 | 0 | EC_PUSH_TAG(ec); |
684 | 0 | if ((state = EC_EXEC_TAG()) == TAG_NONE) { |
685 | 0 | result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); |
686 | 0 | } |
687 | 0 | EC_POP_TAG(); |
688 | |
|
689 | 0 | ec->interrupt_mask = saved_interrupt_mask; |
690 | |
|
691 | 0 | if (state) { |
692 | 0 | EC_JUMP_TAG(ec, state); |
693 | 0 | } |
694 | | |
695 | 0 | RUBY_VM_CHECK_INTS(ec); |
696 | |
|
697 | 0 | errno = saved_errno; |
698 | |
|
699 | 0 | return result; |
700 | 0 | } |
701 | | |
702 | | /* |
703 | | * Document-method: Fiber::Scheduler#io_wait |
704 | | * call-seq: io_wait(io, events, timeout) |
705 | | * |
706 | | * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the |
707 | | * specified descriptor is ready for specified events within |
708 | | * the specified +timeout+. |
709 | | * |
710 | | * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and |
711 | | * <tt>IO::PRIORITY</tt>. |
712 | | * |
713 | | * Suggested implementation should register which Fiber is waiting for which |
714 | | * resources and immediately calling Fiber.yield to pass control to other |
715 | | * fibers. Then, in the #close method, the scheduler might dispatch all the |
716 | | * I/O resources to fibers waiting for it. |
717 | | * |
718 | | * Expected to return the subset of events that are ready immediately. |
719 | | * |
720 | | */ |
721 | | static VALUE |
722 | 0 | fiber_scheduler_io_wait(VALUE _argument) { |
723 | 0 | VALUE *arguments = (VALUE*)_argument; |
724 | |
|
725 | 0 | return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1); |
726 | 0 | } |
727 | | |
728 | | VALUE |
729 | | rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) |
730 | 0 | { |
731 | 0 | VALUE arguments[] = { |
732 | 0 | scheduler, io, events, timeout |
733 | 0 | }; |
734 | |
|
735 | 0 | if (rb_respond_to(scheduler, id_fiber_interrupt)) { |
736 | 0 | return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments); |
737 | 0 | } else { |
738 | 0 | return fiber_scheduler_io_wait((VALUE)&arguments); |
739 | 0 | } |
740 | 0 | } |
741 | | |
742 | | VALUE |
743 | | rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io) |
744 | 0 | { |
745 | 0 | return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io)); |
746 | 0 | } |
747 | | |
748 | | VALUE |
749 | | rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io) |
750 | 0 | { |
751 | 0 | return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io)); |
752 | 0 | } |
753 | | |
754 | | /* |
755 | | * Document-method: Fiber::Scheduler#io_select |
756 | | * call-seq: io_select(readables, writables, exceptables, timeout) |
757 | | * |
758 | | * Invoked by IO.select to ask whether the specified descriptors are ready for |
759 | | * specified events within the specified +timeout+. |
760 | | * |
761 | | * Expected to return the 3-tuple of Array of IOs that are ready. |
762 | | * |
763 | | */ |
764 | | VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout) |
765 | 0 | { |
766 | 0 | VALUE arguments[] = { |
767 | 0 | readables, writables, exceptables, timeout |
768 | 0 | }; |
769 | |
|
770 | 0 | return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments); |
771 | 0 | } |
772 | | |
773 | | VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv) |
774 | 0 | { |
775 | | // I wondered about extracting argv, and checking if there is only a single |
776 | | // IO instance, and instead calling `io_wait`. However, it would require a |
777 | | // decent amount of work and it would be hard to preserve the exact |
778 | | // semantics of IO.select. |
779 | |
|
780 | 0 | return rb_check_funcall(scheduler, id_io_select, argc, argv); |
781 | 0 | } |
782 | | |
783 | | /* |
784 | | * Document-method: Fiber::Scheduler#io_read |
785 | | * call-seq: io_read(io, buffer, length, offset) -> read length or -errno |
786 | | * |
787 | | * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a |
788 | | * specified +buffer+ (see IO::Buffer) at the given +offset+. |
789 | | * |
790 | | * The +length+ argument is the "minimum length to be read". If the IO buffer |
791 | | * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read, |
792 | | * but at least 1KiB will be. Generally, the only case where less data than |
793 | | * +length+ will be read is if there is an error reading the data. |
794 | | * |
795 | | * Specifying a +length+ of 0 is valid and means try reading at least once and |
796 | | * return any available data. |
797 | | * |
798 | | * Suggested implementation should try to read from +io+ in a non-blocking |
799 | | * manner and call #io_wait if the +io+ is not ready (which will yield control |
800 | | * to other fibers). |
801 | | * |
802 | | * See IO::Buffer for an interface available to return data. |
803 | | * |
804 | | * Expected to return number of bytes read, or, in case of an error, |
805 | | * <tt>-errno</tt> (negated number corresponding to system's error code). |
806 | | * |
807 | | * The method should be considered _experimental_. |
808 | | */ |
809 | | static VALUE |
810 | 0 | fiber_scheduler_io_read(VALUE _argument) { |
811 | 0 | VALUE *arguments = (VALUE*)_argument; |
812 | |
|
813 | 0 | return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1); |
814 | 0 | } |
815 | | |
816 | | VALUE |
817 | | rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) |
818 | 0 | { |
819 | 0 | if (!rb_respond_to(scheduler, id_io_read)) { |
820 | 0 | return RUBY_Qundef; |
821 | 0 | } |
822 | | |
823 | 0 | VALUE arguments[] = { |
824 | 0 | scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) |
825 | 0 | }; |
826 | |
|
827 | 0 | if (rb_respond_to(scheduler, id_fiber_interrupt)) { |
828 | 0 | return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments); |
829 | 0 | } else { |
830 | 0 | return fiber_scheduler_io_read((VALUE)&arguments); |
831 | 0 | } |
832 | 0 | } |
833 | | |
834 | | /* |
835 | | * Document-method: Fiber::Scheduler#io_pread |
836 | | * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno |
837 | | * |
838 | | * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+ |
839 | | * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given |
840 | | * +offset+. |
841 | | * |
842 | | * This method is semantically the same as #io_read, but it allows to specify |
843 | | * the offset to read from and is often better for asynchronous IO on the same |
844 | | * file. |
845 | | * |
846 | | * The method should be considered _experimental_. |
847 | | */ |
848 | | static VALUE |
849 | 0 | fiber_scheduler_io_pread(VALUE _argument) { |
850 | 0 | VALUE *arguments = (VALUE*)_argument; |
851 | |
|
852 | 0 | return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1); |
853 | 0 | } |
854 | | |
855 | | VALUE |
856 | | rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) |
857 | 0 | { |
858 | 0 | if (!rb_respond_to(scheduler, id_io_pread)) { |
859 | 0 | return RUBY_Qundef; |
860 | 0 | } |
861 | | |
862 | 0 | VALUE arguments[] = { |
863 | 0 | scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) |
864 | 0 | }; |
865 | |
|
866 | 0 | if (rb_respond_to(scheduler, id_fiber_interrupt)) { |
867 | 0 | return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments); |
868 | 0 | } else { |
869 | 0 | return fiber_scheduler_io_pread((VALUE)&arguments); |
870 | 0 | } |
871 | 0 | } |
872 | | |
873 | | /* |
874 | | * Document-method: Fiber::Scheduler#io_write |
875 | | * call-seq: io_write(io, buffer, length, offset) -> written length or -errno |
876 | | * |
877 | | * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from |
878 | | * from a specified +buffer+ (see IO::Buffer) at the given +offset+. |
879 | | * |
880 | | * The +length+ argument is the "minimum length to be written". If the IO |
881 | | * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB |
882 | | * will be written, but at least 1KiB will be. Generally, the only case where |
883 | | * less data than +length+ will be written is if there is an error writing the |
884 | | * data. |
885 | | * |
886 | | * Specifying a +length+ of 0 is valid and means try writing at least once, as |
887 | | * much data as possible. |
888 | | * |
889 | | * Suggested implementation should try to write to +io+ in a non-blocking |
890 | | * manner and call #io_wait if the +io+ is not ready (which will yield control |
891 | | * to other fibers). |
892 | | * |
893 | | * See IO::Buffer for an interface available to get data from buffer |
894 | | * efficiently. |
895 | | * |
896 | | * Expected to return number of bytes written, or, in case of an error, |
897 | | * <tt>-errno</tt> (negated number corresponding to system's error code). |
898 | | * |
899 | | * The method should be considered _experimental_. |
900 | | */ |
901 | | static VALUE |
902 | 0 | fiber_scheduler_io_write(VALUE _argument) { |
903 | 0 | VALUE *arguments = (VALUE*)_argument; |
904 | |
|
905 | 0 | return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1); |
906 | 0 | } |
907 | | |
908 | | VALUE |
909 | | rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) |
910 | 0 | { |
911 | 0 | if (!rb_respond_to(scheduler, id_io_write)) { |
912 | 0 | return RUBY_Qundef; |
913 | 0 | } |
914 | | |
915 | 0 | VALUE arguments[] = { |
916 | 0 | scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) |
917 | 0 | }; |
918 | |
|
919 | 0 | if (rb_respond_to(scheduler, id_fiber_interrupt)) { |
920 | 0 | return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments); |
921 | 0 | } else { |
922 | 0 | return fiber_scheduler_io_write((VALUE)&arguments); |
923 | 0 | } |
924 | 0 | } |
925 | | |
926 | | /* |
927 | | * Document-method: Fiber::Scheduler#io_pwrite |
928 | | * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno |
929 | | * |
930 | | * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+ |
931 | | * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given |
932 | | * +offset+. |
933 | | * |
934 | | * This method is semantically the same as #io_write, but it allows to specify |
935 | | * the offset to write to and is often better for asynchronous IO on the same |
936 | | * file. |
937 | | * |
938 | | * The method should be considered _experimental_. |
939 | | * |
940 | | */ |
941 | | static VALUE |
942 | 0 | fiber_scheduler_io_pwrite(VALUE _argument) { |
943 | 0 | VALUE *arguments = (VALUE*)_argument; |
944 | |
|
945 | 0 | return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1); |
946 | 0 | } |
947 | | |
948 | | VALUE |
949 | | rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) |
950 | 0 | { |
951 | | |
952 | |
|
953 | 0 | if (!rb_respond_to(scheduler, id_io_pwrite)) { |
954 | 0 | return RUBY_Qundef; |
955 | 0 | } |
956 | | |
957 | 0 | VALUE arguments[] = { |
958 | 0 | scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) |
959 | 0 | }; |
960 | |
|
961 | 0 | if (rb_respond_to(scheduler, id_fiber_interrupt)) { |
962 | 0 | return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments); |
963 | 0 | } else { |
964 | 0 | return fiber_scheduler_io_pwrite((VALUE)&arguments); |
965 | 0 | } |
966 | 0 | } |
967 | | |
968 | | VALUE |
969 | | rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length) |
970 | 0 | { |
971 | 0 | VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); |
972 | |
|
973 | 0 | VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0); |
974 | |
|
975 | 0 | rb_io_buffer_free_locked(buffer); |
976 | |
|
977 | 0 | return result; |
978 | 0 | } |
979 | | |
980 | | VALUE |
981 | | rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length) |
982 | 0 | { |
983 | 0 | VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY); |
984 | |
|
985 | 0 | VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0); |
986 | |
|
987 | 0 | rb_io_buffer_free_locked(buffer); |
988 | |
|
989 | 0 | return result; |
990 | 0 | } |
991 | | |
992 | | VALUE |
993 | | rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length) |
994 | 0 | { |
995 | 0 | VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); |
996 | |
|
997 | 0 | VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0); |
998 | |
|
999 | 0 | rb_io_buffer_free_locked(buffer); |
1000 | |
|
1001 | 0 | return result; |
1002 | 0 | } |
1003 | | |
1004 | | VALUE |
1005 | | rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length) |
1006 | 0 | { |
1007 | 0 | VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY); |
1008 | |
|
1009 | 0 | VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0); |
1010 | |
|
1011 | 0 | rb_io_buffer_free_locked(buffer); |
1012 | |
|
1013 | 0 | return result; |
1014 | 0 | } |
1015 | | |
1016 | | /* |
1017 | | * Document-method: Fiber::Scheduler#io_close |
1018 | | * call-seq: io_close(fd) |
1019 | | * |
1020 | | * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that |
1021 | | * the method will receive an integer file descriptor of the closed object, not an object |
1022 | | * itself. |
1023 | | */ |
1024 | | VALUE |
1025 | | rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io) |
1026 | 0 | { |
1027 | 0 | VALUE arguments[] = {io}; |
1028 | |
|
1029 | 0 | return rb_check_funcall(scheduler, id_io_close, 1, arguments); |
1030 | 0 | } |
1031 | | |
1032 | | /* |
1033 | | * Document-method: Fiber::Scheduler#address_resolve |
1034 | | * call-seq: address_resolve(hostname) -> array_of_strings or nil |
1035 | | * |
1036 | | * Invoked by any method that performs a non-reverse DNS lookup. The most |
1037 | | * notable method is Addrinfo.getaddrinfo, but there are many other. |
1038 | | * |
1039 | | * The method is expected to return an array of strings corresponding to ip |
1040 | | * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved. |
1041 | | * |
1042 | | * Fairly exhaustive list of all possible call-sites: |
1043 | | * |
1044 | | * - Addrinfo.getaddrinfo |
1045 | | * - Addrinfo.tcp |
1046 | | * - Addrinfo.udp |
1047 | | * - Addrinfo.ip |
1048 | | * - Addrinfo.new |
1049 | | * - Addrinfo.marshal_load |
1050 | | * - SOCKSSocket.new |
1051 | | * - TCPServer.new |
1052 | | * - TCPSocket.new |
1053 | | * - IPSocket.getaddress |
1054 | | * - TCPSocket.gethostbyname |
1055 | | * - UDPSocket#connect |
1056 | | * - UDPSocket#bind |
1057 | | * - UDPSocket#send |
1058 | | * - Socket.getaddrinfo |
1059 | | * - Socket.gethostbyname |
1060 | | * - Socket.pack_sockaddr_in |
1061 | | * - Socket.sockaddr_in |
1062 | | * - Socket.unpack_sockaddr_in |
1063 | | */ |
1064 | | VALUE |
1065 | | rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) |
1066 | 0 | { |
1067 | 0 | VALUE arguments[] = { |
1068 | 0 | hostname |
1069 | 0 | }; |
1070 | |
|
1071 | 0 | return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); |
1072 | 0 | } |
1073 | | |
1074 | | /* |
1075 | | * Document-method: Fiber::Scheduler#blocking_operation_wait |
1076 | | * call-seq: blocking_operation_wait(blocking_operation) |
1077 | | * |
1078 | | * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way. |
1079 | | * The blocking_operation is an opaque object that encapsulates the blocking operation |
1080 | | * and responds to a <tt>#call</tt> method without any arguments. |
1081 | | * |
1082 | | * If the scheduler doesn't implement this method, or if the scheduler doesn't execute |
1083 | | * the blocking operation, Ruby will fall back to the non-scheduler implementation. |
1084 | | * |
1085 | | * Minimal suggested implementation is: |
1086 | | * |
1087 | | * def blocking_operation_wait(blocking_operation) |
1088 | | * Thread.new { blocking_operation.call }.join |
1089 | | * end |
1090 | | */ |
1091 | | VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state) |
1092 | 0 | { |
1093 | | // Check if scheduler supports blocking_operation_wait before creating the object |
1094 | 0 | if (!rb_respond_to(scheduler, id_blocking_operation_wait)) { |
1095 | 0 | return Qundef; |
1096 | 0 | } |
1097 | | |
1098 | | // Create a new BlockingOperation with the blocking operation |
1099 | 0 | VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state); |
1100 | |
|
1101 | 0 | VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation); |
1102 | | |
1103 | | // Get the operation data to check if it was executed |
1104 | 0 | rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation); |
1105 | 0 | rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status); |
1106 | | |
1107 | | // Invalidate the operation now that we're done with it |
1108 | 0 | operation->function = NULL; |
1109 | 0 | operation->state = NULL; |
1110 | 0 | operation->data = NULL; |
1111 | 0 | operation->data2 = NULL; |
1112 | 0 | operation->unblock_function = NULL; |
1113 | | |
1114 | | // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead |
1115 | 0 | if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { |
1116 | 0 | return Qundef; |
1117 | 0 | } |
1118 | | |
1119 | 0 | return result; |
1120 | 0 | } |
1121 | | |
1122 | | /* |
1123 | | * Document-method: Fiber::Scheduler#fiber_interrupt |
1124 | | * call-seq: fiber_interrupt(fiber, exception) |
1125 | | * |
1126 | | * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted |
1127 | | * with an exception. For example, IO#close uses this method to interrupt fibers that are performing |
1128 | | * blocking IO operations. |
1129 | | * |
1130 | | */ |
1131 | | VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception) |
1132 | 0 | { |
1133 | 0 | VALUE arguments[] = { |
1134 | 0 | fiber, exception |
1135 | 0 | }; |
1136 | |
|
1137 | 0 | VALUE result; |
1138 | 0 | enum ruby_tag_type state; |
1139 | | |
1140 | | // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`. |
1141 | 0 | rb_execution_context_t *ec = GET_EC(); |
1142 | 0 | int saved_interrupt_mask = ec->interrupt_mask; |
1143 | 0 | ec->interrupt_mask |= PENDING_INTERRUPT_MASK; |
1144 | |
|
1145 | 0 | EC_PUSH_TAG(ec); |
1146 | 0 | if ((state = EC_EXEC_TAG()) == TAG_NONE) { |
1147 | 0 | result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments); |
1148 | 0 | } |
1149 | 0 | EC_POP_TAG(); |
1150 | |
|
1151 | 0 | ec->interrupt_mask = saved_interrupt_mask; |
1152 | |
|
1153 | 0 | if (state) { |
1154 | 0 | EC_JUMP_TAG(ec, state); |
1155 | 0 | } |
1156 | | |
1157 | 0 | RUBY_VM_CHECK_INTS(ec); |
1158 | |
|
1159 | 0 | return result; |
1160 | 0 | } |
1161 | | |
1162 | | /* |
1163 | | * Document-method: Fiber::Scheduler#fiber |
1164 | | * call-seq: fiber(&block) |
1165 | | * |
1166 | | * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately |
1167 | | * run the given block of code in a separate non-blocking fiber, and to return that Fiber. |
1168 | | * |
1169 | | * Minimal suggested implementation is: |
1170 | | * |
1171 | | * def fiber(&block) |
1172 | | * fiber = Fiber.new(blocking: false, &block) |
1173 | | * fiber.resume |
1174 | | * fiber |
1175 | | * end |
1176 | | */ |
1177 | | VALUE |
1178 | | rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat) |
1179 | 0 | { |
1180 | 0 | return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat); |
1181 | 0 | } |
1182 | | |
1183 | | /* |
1184 | | * C API: Cancel a blocking operation |
1185 | | * |
1186 | | * This function cancels a blocking operation. If the operation is queued, |
1187 | | * it just marks it as cancelled. If it's executing, it marks it as cancelled |
1188 | | * and calls the unblock function to interrupt the operation. |
1189 | | * |
1190 | | * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error. |
1191 | | */ |
1192 | | int |
1193 | | rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation) |
1194 | 0 | { |
1195 | 0 | if (blocking_operation == NULL) { |
1196 | 0 | return -1; |
1197 | 0 | } |
1198 | | |
1199 | 0 | rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status); |
1200 | |
|
1201 | 0 | switch (current_state) { |
1202 | 0 | case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED: |
1203 | | // Work hasn't started - just mark as cancelled: |
1204 | 0 | if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) { |
1205 | | // Successfully cancelled before execution: |
1206 | 0 | return 0; |
1207 | 0 | } |
1208 | | // Fall through if state changed between load and CAS |
1209 | | |
1210 | 0 | case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING: |
1211 | | // Work is running - mark cancelled AND call unblock function |
1212 | 0 | if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) { |
1213 | | // State changed between load and CAS - operation may have completed: |
1214 | 0 | return 0; |
1215 | 0 | } |
1216 | | // Otherwise, we successfully marked it as cancelled, so we can call the unblock function: |
1217 | 0 | rb_unblock_function_t *unblock_function = blocking_operation->unblock_function; |
1218 | 0 | if (unblock_function) { |
1219 | 0 | RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier"); |
1220 | 0 | blocking_operation->unblock_function(blocking_operation->data2); |
1221 | 0 | } |
1222 | | // Cancelled during execution (unblock function called): |
1223 | 0 | return 1; |
1224 | | |
1225 | 0 | case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED: |
1226 | 0 | case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED: |
1227 | | // Already finished or cancelled: |
1228 | 0 | return 0; |
1229 | 0 | } |
1230 | | |
1231 | 0 | return 0; |
1232 | 0 | } |