|
Ruby
1.9.3p537(2014-02-19revision0)
|
00001 /********************************************************************** 00002 00003 thread.c - 00004 00005 $Author$ 00006 00007 Copyright (C) 2004-2007 Koichi Sasada 00008 00009 **********************************************************************/ 00010 00011 /* 00012 YARV Thread Design 00013 00014 model 1: Userlevel Thread 00015 Same as traditional ruby thread. 00016 00017 model 2: Native Thread with Global VM lock 00018 Using pthread (or Windows thread) and Ruby threads run concurrent. 00019 00020 model 3: Native Thread with fine grain lock 00021 Using pthread and Ruby threads run concurrent or parallel. 00022 00023 ------------------------------------------------------------------------ 00024 00025 model 2: 00026 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run. 00027 When thread scheduling, running thread release GVL. If running thread 00028 try blocking operation, this thread must release GVL and another 00029 thread can continue this flow. After blocking operation, thread 00030 must check interrupt (RUBY_VM_CHECK_INTS). 00031 00032 Every VM can run parallel. 00033 00034 Ruby threads are scheduled by OS thread scheduler. 00035 00036 ------------------------------------------------------------------------ 00037 00038 model 3: 00039 Every threads run concurrent or parallel and to access shared object 00040 exclusive access control is needed. For example, to access String 00041 object or Array object, fine grain lock must be locked every time. 00042 */ 00043 00044 00045 /* 00046 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc 00047 * 2.15 or later and set _FORTIFY_SOURCE > 0. 00048 * However, the implementation is wrong. Even though Linux's select(2) 00049 * support large fd size (>FD_SETSIZE), it wrongly assume fd is always 00050 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT, 00051 * it doesn't work correctly and makes program abort. Therefore we need to 00052 * disable FORTY_SOURCE until glibc fixes it. 00053 */ 00054 #undef _FORTIFY_SOURCE 00055 #undef __USE_FORTIFY_LEVEL 00056 #define __USE_FORTIFY_LEVEL 0 00057 00058 /* for model 2 */ 00059 00060 #include "eval_intern.h" 00061 #include "gc.h" 00062 #include "internal.h" 00063 #include "ruby/io.h" 00064 00065 #ifndef USE_NATIVE_THREAD_PRIORITY 00066 #define USE_NATIVE_THREAD_PRIORITY 0 00067 #define RUBY_THREAD_PRIORITY_MAX 3 00068 #define RUBY_THREAD_PRIORITY_MIN -3 00069 #endif 00070 00071 #ifndef THREAD_DEBUG 00072 #define THREAD_DEBUG 0 00073 #endif 00074 00075 VALUE rb_cMutex; 00076 VALUE rb_cBarrier; 00077 00078 static void sleep_timeval(rb_thread_t *th, struct timeval time); 00079 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec); 00080 static void sleep_forever(rb_thread_t *th, int nodeadlock); 00081 static double timeofday(void); 00082 static int rb_threadptr_dead(rb_thread_t *th); 00083 00084 static void rb_check_deadlock(rb_vm_t *vm); 00085 00086 #define eKillSignal INT2FIX(0) 00087 #define eTerminateSignal INT2FIX(1) 00088 static volatile int system_working = 1; 00089 00090 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] 00091 00092 inline static void 00093 st_delete_wrap(st_table *table, st_data_t key) 00094 { 00095 st_delete(table, &key, 0); 00096 } 00097 00098 /********************************************************************************/ 00099 00100 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION 00101 00102 struct rb_blocking_region_buffer { 00103 enum rb_thread_status prev_status; 00104 struct rb_unblock_callback oldubf; 00105 }; 00106 00107 static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00108 struct rb_unblock_callback *old); 00109 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); 00110 00111 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); 00112 00113 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \ 00114 do { \ 00115 rb_gc_save_machine_context(th); \ 00116 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \ 00117 } while (0) 00118 00119 #define GVL_UNLOCK_BEGIN() do { \ 00120 rb_thread_t *_th_stored = GET_THREAD(); \ 00121 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ 00122 gvl_release(_th_stored->vm); 00123 00124 #define GVL_UNLOCK_END() \ 00125 gvl_acquire(_th_stored->vm, _th_stored); \ 00126 rb_thread_set_current(_th_stored); \ 00127 } while(0) 00128 00129 #define blocking_region_begin(th, region, func, arg) \ 00130 do { \ 00131 (region)->prev_status = (th)->status; \ 00132 set_unblock_function((th), (func), (arg), &(region)->oldubf); \ 00133 (th)->blocking_region_buffer = (region); \ 00134 (th)->status = THREAD_STOPPED; \ 00135 thread_debug("enter blocking region (%p)\n", (void *)(th)); \ 00136 RB_GC_SAVE_MACHINE_CONTEXT(th); \ 00137 gvl_release((th)->vm); \ 00138 } while (0) 00139 00140 #define BLOCKING_REGION(exec, ubf, ubfarg) do { \ 00141 rb_thread_t *__th = GET_THREAD(); \ 00142 struct rb_blocking_region_buffer __region; \ 00143 blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \ 00144 exec; \ 00145 blocking_region_end(__th, &__region); \ 00146 RUBY_VM_CHECK_INTS(); \ 00147 } while(0) 00148 00149 #if THREAD_DEBUG 00150 #ifdef HAVE_VA_ARGS_MACRO 00151 void rb_thread_debug(const char *file, int line, const char *fmt, ...); 00152 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) 00153 #define POSITION_FORMAT "%s:%d:" 00154 #define POSITION_ARGS ,file, line 00155 #else 00156 void rb_thread_debug(const char *fmt, ...); 00157 #define thread_debug rb_thread_debug 00158 #define POSITION_FORMAT 00159 #define POSITION_ARGS 00160 #endif 00161 00162 # if THREAD_DEBUG < 0 00163 static int rb_thread_debug_enabled; 00164 00165 /* 00166 * call-seq: 00167 * Thread.DEBUG -> num 00168 * 00169 * Returns the thread debug level. Available only if compiled with 00170 * THREAD_DEBUG=-1. 00171 */ 00172 00173 static VALUE 00174 rb_thread_s_debug(void) 00175 { 00176 return INT2NUM(rb_thread_debug_enabled); 00177 } 00178 00179 /* 00180 * call-seq: 00181 * Thread.DEBUG = num 00182 * 00183 * Sets the thread debug level. Available only if compiled with 00184 * THREAD_DEBUG=-1. 00185 */ 00186 00187 static VALUE 00188 rb_thread_s_debug_set(VALUE self, VALUE val) 00189 { 00190 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; 00191 return val; 00192 } 00193 # else 00194 # define rb_thread_debug_enabled THREAD_DEBUG 00195 # endif 00196 #else 00197 #define thread_debug if(0)printf 00198 #endif 00199 00200 #ifndef __ia64 00201 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) 00202 #endif 00203 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, 00204 VALUE *register_stack_start)); 00205 static void timer_thread_function(void *); 00206 00207 #if defined(_WIN32) 00208 #include "thread_win32.c" 00209 00210 #define DEBUG_OUT() \ 00211 WaitForSingleObject(&debug_mutex, INFINITE); \ 00212 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ 00213 fflush(stdout); \ 00214 ReleaseMutex(&debug_mutex); 00215 00216 #elif defined(HAVE_PTHREAD_H) 00217 #include "thread_pthread.c" 00218 00219 #define DEBUG_OUT() \ 00220 pthread_mutex_lock(&debug_mutex); \ 00221 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \ 00222 fflush(stdout); \ 00223 pthread_mutex_unlock(&debug_mutex); 00224 00225 #else 00226 #error "unsupported thread type" 00227 #endif 00228 00229 #if THREAD_DEBUG 00230 static int debug_mutex_initialized = 1; 00231 static rb_thread_lock_t debug_mutex; 00232 00233 void 00234 rb_thread_debug( 00235 #ifdef HAVE_VA_ARGS_MACRO 00236 const char *file, int line, 00237 #endif 00238 const char *fmt, ...) 00239 { 00240 va_list args; 00241 char buf[BUFSIZ]; 00242 00243 if (!rb_thread_debug_enabled) return; 00244 00245 if (debug_mutex_initialized == 1) { 00246 debug_mutex_initialized = 0; 00247 native_mutex_initialize(&debug_mutex); 00248 } 00249 00250 va_start(args, fmt); 00251 vsnprintf(buf, BUFSIZ, fmt, args); 00252 va_end(args); 00253 00254 DEBUG_OUT(); 00255 } 00256 #endif 00257 00258 void 00259 rb_vm_gvl_destroy(rb_vm_t *vm) 00260 { 00261 gvl_release(vm); 00262 gvl_destroy(vm); 00263 } 00264 00265 void 00266 rb_thread_lock_unlock(rb_thread_lock_t *lock) 00267 { 00268 native_mutex_unlock(lock); 00269 } 00270 00271 void 00272 rb_thread_lock_destroy(rb_thread_lock_t *lock) 00273 { 00274 native_mutex_destroy(lock); 00275 } 00276 00277 static void 00278 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00279 struct rb_unblock_callback *old) 00280 { 00281 check_ints: 00282 RUBY_VM_CHECK_INTS(); /* check signal or so */ 00283 native_mutex_lock(&th->interrupt_lock); 00284 if (th->interrupt_flag) { 00285 native_mutex_unlock(&th->interrupt_lock); 00286 goto check_ints; 00287 } 00288 else { 00289 if (old) *old = th->unblock; 00290 th->unblock.func = func; 00291 th->unblock.arg = arg; 00292 } 00293 native_mutex_unlock(&th->interrupt_lock); 00294 } 00295 00296 static void 00297 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) 00298 { 00299 native_mutex_lock(&th->interrupt_lock); 00300 th->unblock = *old; 00301 native_mutex_unlock(&th->interrupt_lock); 00302 } 00303 00304 void 00305 rb_threadptr_interrupt(rb_thread_t *th) 00306 { 00307 native_mutex_lock(&th->interrupt_lock); 00308 RUBY_VM_SET_INTERRUPT(th); 00309 if (th->unblock.func) { 00310 (th->unblock.func)(th->unblock.arg); 00311 } 00312 else { 00313 /* none */ 00314 } 00315 native_mutex_unlock(&th->interrupt_lock); 00316 } 00317 00318 00319 static int 00320 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) 00321 { 00322 VALUE thval = key; 00323 rb_thread_t *th; 00324 GetThreadPtr(thval, th); 00325 00326 if (th != main_thread) { 00327 thread_debug("terminate_i: %p\n", (void *)th); 00328 rb_threadptr_interrupt(th); 00329 th->thrown_errinfo = eTerminateSignal; 00330 th->status = THREAD_TO_KILL; 00331 } 00332 else { 00333 thread_debug("terminate_i: main thread (%p)\n", (void *)th); 00334 } 00335 return ST_CONTINUE; 00336 } 00337 00338 typedef struct rb_mutex_struct 00339 { 00340 rb_thread_lock_t lock; 00341 rb_thread_cond_t cond; 00342 struct rb_thread_struct volatile *th; 00343 int cond_waiting; 00344 struct rb_mutex_struct *next_mutex; 00345 } rb_mutex_t; 00346 00347 static void rb_mutex_abandon_all(rb_mutex_t *mutexes); 00348 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); 00349 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); 00350 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 00351 00352 void 00353 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) 00354 { 00355 const char *err; 00356 rb_mutex_t *mutex; 00357 rb_mutex_t *mutexes = th->keeping_mutexes; 00358 00359 while (mutexes) { 00360 mutex = mutexes; 00361 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", 00362 mutexes); */ 00363 mutexes = mutex->next_mutex; 00364 err = rb_mutex_unlock_th(mutex, th); 00365 if (err) rb_bug("invalid keeping_mutexes: %s", err); 00366 } 00367 } 00368 00369 void 00370 rb_thread_terminate_all(void) 00371 { 00372 rb_thread_t *th = GET_THREAD(); /* main thread */ 00373 rb_vm_t *vm = th->vm; 00374 00375 if (vm->main_thread != th) { 00376 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", 00377 (void *)vm->main_thread, (void *)th); 00378 } 00379 00380 /* unlock all locking mutexes */ 00381 rb_threadptr_unlock_all_locking_mutexes(th); 00382 00383 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); 00384 st_foreach(vm->living_threads, terminate_i, (st_data_t)th); 00385 vm->inhibit_thread_creation = 1; 00386 00387 while (!rb_thread_alone()) { 00388 PUSH_TAG(); 00389 if (EXEC_TAG() == 0) { 00390 rb_thread_schedule(); 00391 } 00392 else { 00393 /* ignore exception */ 00394 } 00395 POP_TAG(); 00396 } 00397 } 00398 00399 static void 00400 thread_cleanup_func_before_exec(void *th_ptr) 00401 { 00402 rb_thread_t *th = th_ptr; 00403 th->status = THREAD_KILLED; 00404 th->machine_stack_start = th->machine_stack_end = 0; 00405 #ifdef __ia64 00406 th->machine_register_stack_start = th->machine_register_stack_end = 0; 00407 #endif 00408 } 00409 00410 static void 00411 thread_cleanup_func(void *th_ptr, int atfork) 00412 { 00413 rb_thread_t *th = th_ptr; 00414 00415 th->locking_mutex = Qfalse; 00416 thread_cleanup_func_before_exec(th_ptr); 00417 00418 /* 00419 * Unfortunately, we can't release native threading resource at fork 00420 * because libc may have unstable locking state therefore touching 00421 * a threading resource may cause a deadlock. 00422 */ 00423 if (atfork) 00424 return; 00425 00426 native_mutex_destroy(&th->interrupt_lock); 00427 native_thread_destroy(th); 00428 } 00429 00430 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); 00431 00432 void 00433 ruby_thread_init_stack(rb_thread_t *th) 00434 { 00435 native_thread_init_stack(th); 00436 } 00437 00438 static int 00439 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) 00440 { 00441 int state; 00442 VALUE args = th->first_args; 00443 rb_proc_t *proc; 00444 rb_thread_t *join_th; 00445 rb_thread_t *main_th; 00446 VALUE errinfo = Qnil; 00447 # ifdef USE_SIGALTSTACK 00448 void rb_register_sigaltstack(rb_thread_t *th); 00449 00450 rb_register_sigaltstack(th); 00451 # endif 00452 00453 ruby_thread_set_native(th); 00454 00455 th->machine_stack_start = stack_start; 00456 #ifdef __ia64 00457 th->machine_register_stack_start = register_stack_start; 00458 #endif 00459 thread_debug("thread start: %p\n", (void *)th); 00460 00461 gvl_acquire(th->vm, th); 00462 { 00463 thread_debug("thread start (get lock): %p\n", (void *)th); 00464 rb_thread_set_current(th); 00465 00466 TH_PUSH_TAG(th); 00467 if ((state = EXEC_TAG()) == 0) { 00468 SAVE_ROOT_JMPBUF(th, { 00469 if (!th->first_func) { 00470 GetProcPtr(th->first_proc, proc); 00471 th->errinfo = Qnil; 00472 th->local_lfp = proc->block.lfp; 00473 th->local_svar = Qnil; 00474 th->value = rb_vm_invoke_proc(th, proc, proc->block.self, 00475 (int)RARRAY_LEN(args), RARRAY_PTR(args), 0); 00476 } 00477 else { 00478 th->value = (*th->first_func)((void *)args); 00479 } 00480 }); 00481 } 00482 else { 00483 errinfo = th->errinfo; 00484 if (NIL_P(errinfo)) errinfo = rb_errinfo(); 00485 if (state == TAG_FATAL) { 00486 /* fatal error within this thread, need to stop whole script */ 00487 } 00488 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { 00489 if (th->safe_level >= 4) { 00490 th->errinfo = rb_exc_new3(rb_eSecurityError, 00491 rb_sprintf("Insecure exit at level %d", th->safe_level)); 00492 errinfo = Qnil; 00493 } 00494 } 00495 else if (th->safe_level < 4 && 00496 (th->vm->thread_abort_on_exception || 00497 th->abort_on_exception || RTEST(ruby_debug))) { 00498 /* exit on main_thread */ 00499 } 00500 else { 00501 errinfo = Qnil; 00502 } 00503 th->value = Qnil; 00504 } 00505 00506 th->status = THREAD_KILLED; 00507 thread_debug("thread end: %p\n", (void *)th); 00508 00509 main_th = th->vm->main_thread; 00510 if (th != main_th) { 00511 if (TYPE(errinfo) == T_OBJECT) { 00512 /* treat with normal error object */ 00513 rb_threadptr_raise(main_th, 1, &errinfo); 00514 } 00515 } 00516 TH_POP_TAG(); 00517 00518 /* locking_mutex must be Qfalse */ 00519 if (th->locking_mutex != Qfalse) { 00520 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", 00521 (void *)th, th->locking_mutex); 00522 } 00523 00524 /* delete self other than main thread from living_threads */ 00525 if (th != main_th) { 00526 st_delete_wrap(th->vm->living_threads, th->self); 00527 } 00528 00529 /* wake up joining threads */ 00530 join_th = th->join_list_head; 00531 while (join_th) { 00532 if (join_th == main_th) errinfo = Qnil; 00533 rb_threadptr_interrupt(join_th); 00534 switch (join_th->status) { 00535 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: 00536 join_th->status = THREAD_RUNNABLE; 00537 default: break; 00538 } 00539 join_th = join_th->join_list_next; 00540 } 00541 00542 rb_threadptr_unlock_all_locking_mutexes(th); 00543 if (th != main_th) rb_check_deadlock(th->vm); 00544 00545 if (!th->root_fiber) { 00546 rb_thread_recycle_stack_release(th->stack); 00547 th->stack = 0; 00548 } 00549 } 00550 if (th->vm->main_thread == th) { 00551 ruby_cleanup(state); 00552 } 00553 else { 00554 thread_cleanup_func(th, FALSE); 00555 gvl_release(th->vm); 00556 } 00557 00558 return 0; 00559 } 00560 00561 static VALUE 00562 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) 00563 { 00564 rb_thread_t *th; 00565 int err; 00566 00567 if (OBJ_FROZEN(GET_THREAD()->thgroup)) { 00568 rb_raise(rb_eThreadError, 00569 "can't start a new thread (frozen ThreadGroup)"); 00570 } 00571 GetThreadPtr(thval, th); 00572 00573 /* setup thread environment */ 00574 th->first_func = fn; 00575 th->first_proc = fn ? Qfalse : rb_block_proc(); 00576 th->first_args = args; /* GC: shouldn't put before above line */ 00577 00578 th->priority = GET_THREAD()->priority; 00579 th->thgroup = GET_THREAD()->thgroup; 00580 00581 native_mutex_initialize(&th->interrupt_lock); 00582 if (GET_VM()->event_hooks != NULL) 00583 th->event_flags |= RUBY_EVENT_VM; 00584 00585 /* kick thread */ 00586 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id); 00587 err = native_thread_create(th); 00588 if (err) { 00589 st_delete_wrap(th->vm->living_threads, th->self); 00590 th->status = THREAD_KILLED; 00591 rb_raise(rb_eThreadError, "can't create Thread (%d)", err); 00592 } 00593 return thval; 00594 } 00595 00596 /* :nodoc: */ 00597 static VALUE 00598 thread_s_new(int argc, VALUE *argv, VALUE klass) 00599 { 00600 rb_thread_t *th; 00601 VALUE thread = rb_thread_alloc(klass); 00602 00603 if (GET_VM()->inhibit_thread_creation) 00604 rb_raise(rb_eThreadError, "can't alloc thread"); 00605 00606 rb_obj_call_init(thread, argc, argv); 00607 GetThreadPtr(thread, th); 00608 if (!th->first_args) { 00609 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'", 00610 rb_class2name(klass)); 00611 } 00612 return thread; 00613 } 00614 00615 /* 00616 * call-seq: 00617 * Thread.start([args]*) {|args| block } -> thread 00618 * Thread.fork([args]*) {|args| block } -> thread 00619 * 00620 * Basically the same as <code>Thread::new</code>. However, if class 00621 * <code>Thread</code> is subclassed, then calling <code>start</code> in that 00622 * subclass will not invoke the subclass's <code>initialize</code> method. 00623 */ 00624 00625 static VALUE 00626 thread_start(VALUE klass, VALUE args) 00627 { 00628 return thread_create_core(rb_thread_alloc(klass), args, 0); 00629 } 00630 00631 /* :nodoc: */ 00632 static VALUE 00633 thread_initialize(VALUE thread, VALUE args) 00634 { 00635 rb_thread_t *th; 00636 if (!rb_block_given_p()) { 00637 rb_raise(rb_eThreadError, "must be called with a block"); 00638 } 00639 GetThreadPtr(thread, th); 00640 if (th->first_args) { 00641 VALUE proc = th->first_proc, line, loc; 00642 const char *file; 00643 if (!proc || !RTEST(loc = rb_proc_location(proc))) { 00644 rb_raise(rb_eThreadError, "already initialized thread"); 00645 } 00646 file = RSTRING_PTR(RARRAY_PTR(loc)[0]); 00647 if (NIL_P(line = RARRAY_PTR(loc)[1])) { 00648 rb_raise(rb_eThreadError, "already initialized thread - %s", 00649 file); 00650 } 00651 rb_raise(rb_eThreadError, "already initialized thread - %s:%d", 00652 file, NUM2INT(line)); 00653 } 00654 return thread_create_core(thread, args, 0); 00655 } 00656 00657 VALUE 00658 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) 00659 { 00660 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); 00661 } 00662 00663 00664 /* +infty, for this purpose */ 00665 #define DELAY_INFTY 1E30 00666 00667 struct join_arg { 00668 rb_thread_t *target, *waiting; 00669 double limit; 00670 int forever; 00671 }; 00672 00673 static VALUE 00674 remove_from_join_list(VALUE arg) 00675 { 00676 struct join_arg *p = (struct join_arg *)arg; 00677 rb_thread_t *target_th = p->target, *th = p->waiting; 00678 00679 if (target_th->status != THREAD_KILLED) { 00680 rb_thread_t **pth = &target_th->join_list_head; 00681 00682 while (*pth) { 00683 if (*pth == th) { 00684 *pth = th->join_list_next; 00685 break; 00686 } 00687 pth = &(*pth)->join_list_next; 00688 } 00689 } 00690 00691 return Qnil; 00692 } 00693 00694 static VALUE 00695 thread_join_sleep(VALUE arg) 00696 { 00697 struct join_arg *p = (struct join_arg *)arg; 00698 rb_thread_t *target_th = p->target, *th = p->waiting; 00699 double now, limit = p->limit; 00700 00701 while (target_th->status != THREAD_KILLED) { 00702 if (p->forever) { 00703 sleep_forever(th, 1); 00704 } 00705 else { 00706 now = timeofday(); 00707 if (now > limit) { 00708 thread_debug("thread_join: timeout (thid: %p)\n", 00709 (void *)target_th->thread_id); 00710 return Qfalse; 00711 } 00712 sleep_wait_for_interrupt(th, limit - now); 00713 } 00714 thread_debug("thread_join: interrupted (thid: %p)\n", 00715 (void *)target_th->thread_id); 00716 } 00717 return Qtrue; 00718 } 00719 00720 static VALUE 00721 thread_join(rb_thread_t *target_th, double delay) 00722 { 00723 rb_thread_t *th = GET_THREAD(); 00724 struct join_arg arg; 00725 00726 arg.target = target_th; 00727 arg.waiting = th; 00728 arg.limit = timeofday() + delay; 00729 arg.forever = delay == DELAY_INFTY; 00730 00731 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id); 00732 00733 if (target_th->status != THREAD_KILLED) { 00734 th->join_list_next = target_th->join_list_head; 00735 target_th->join_list_head = th; 00736 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, 00737 remove_from_join_list, (VALUE)&arg)) { 00738 return Qnil; 00739 } 00740 } 00741 00742 thread_debug("thread_join: success (thid: %p)\n", 00743 (void *)target_th->thread_id); 00744 00745 if (target_th->errinfo != Qnil) { 00746 VALUE err = target_th->errinfo; 00747 00748 if (FIXNUM_P(err)) { 00749 /* */ 00750 } 00751 else if (TYPE(target_th->errinfo) == T_NODE) { 00752 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump( 00753 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err))); 00754 } 00755 else { 00756 /* normal exception */ 00757 rb_exc_raise(err); 00758 } 00759 } 00760 return target_th->self; 00761 } 00762 00763 /* 00764 * call-seq: 00765 * thr.join -> thr 00766 * thr.join(limit) -> thr 00767 * 00768 * The calling thread will suspend execution and run <i>thr</i>. Does not 00769 * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If 00770 * the time limit expires, <code>nil</code> will be returned, otherwise 00771 * <i>thr</i> is returned. 00772 * 00773 * Any threads not joined will be killed when the main program exits. If 00774 * <i>thr</i> had previously raised an exception and the 00775 * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set 00776 * (so the exception has not yet been processed) it will be processed at this 00777 * time. 00778 * 00779 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" } 00780 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } 00781 * x.join # Let x thread finish, a will be killed on exit. 00782 * 00783 * <em>produces:</em> 00784 * 00785 * axyz 00786 * 00787 * The following example illustrates the <i>limit</i> parameter. 00788 * 00789 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} 00790 * puts "Waiting" until y.join(0.15) 00791 * 00792 * <em>produces:</em> 00793 * 00794 * tick... 00795 * Waiting 00796 * tick... 00797 * Waitingtick... 00798 * 00799 * 00800 * tick... 00801 */ 00802 00803 static VALUE 00804 thread_join_m(int argc, VALUE *argv, VALUE self) 00805 { 00806 rb_thread_t *target_th; 00807 double delay = DELAY_INFTY; 00808 VALUE limit; 00809 00810 GetThreadPtr(self, target_th); 00811 00812 rb_scan_args(argc, argv, "01", &limit); 00813 if (!NIL_P(limit)) { 00814 delay = rb_num2dbl(limit); 00815 } 00816 00817 return thread_join(target_th, delay); 00818 } 00819 00820 /* 00821 * call-seq: 00822 * thr.value -> obj 00823 * 00824 * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns 00825 * its value. 00826 * 00827 * a = Thread.new { 2 + 2 } 00828 * a.value #=> 4 00829 */ 00830 00831 static VALUE 00832 thread_value(VALUE self) 00833 { 00834 rb_thread_t *th; 00835 GetThreadPtr(self, th); 00836 thread_join(th, DELAY_INFTY); 00837 return th->value; 00838 } 00839 00840 /* 00841 * Thread Scheduling 00842 */ 00843 00844 static struct timeval 00845 double2timeval(double d) 00846 { 00847 struct timeval time; 00848 00849 time.tv_sec = (int)d; 00850 time.tv_usec = (int)((d - (int)d) * 1e6); 00851 if (time.tv_usec < 0) { 00852 time.tv_usec += (int)1e6; 00853 time.tv_sec -= 1; 00854 } 00855 return time; 00856 } 00857 00858 static void 00859 sleep_forever(rb_thread_t *th, int deadlockable) 00860 { 00861 enum rb_thread_status prev_status = th->status; 00862 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; 00863 00864 th->status = status; 00865 do { 00866 if (deadlockable) { 00867 th->vm->sleeper++; 00868 rb_check_deadlock(th->vm); 00869 } 00870 native_sleep(th, 0); 00871 if (deadlockable) { 00872 th->vm->sleeper--; 00873 } 00874 RUBY_VM_CHECK_INTS(); 00875 } while (th->status == status); 00876 th->status = prev_status; 00877 } 00878 00879 static void 00880 getclockofday(struct timeval *tp) 00881 { 00882 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00883 struct timespec ts; 00884 00885 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { 00886 tp->tv_sec = ts.tv_sec; 00887 tp->tv_usec = ts.tv_nsec / 1000; 00888 } else 00889 #endif 00890 { 00891 gettimeofday(tp, NULL); 00892 } 00893 } 00894 00895 static void 00896 sleep_timeval(rb_thread_t *th, struct timeval tv) 00897 { 00898 struct timeval to, tvn; 00899 enum rb_thread_status prev_status = th->status; 00900 00901 getclockofday(&to); 00902 to.tv_sec += tv.tv_sec; 00903 if ((to.tv_usec += tv.tv_usec) >= 1000000) { 00904 to.tv_sec++; 00905 to.tv_usec -= 1000000; 00906 } 00907 00908 th->status = THREAD_STOPPED; 00909 do { 00910 native_sleep(th, &tv); 00911 RUBY_VM_CHECK_INTS(); 00912 getclockofday(&tvn); 00913 if (to.tv_sec < tvn.tv_sec) break; 00914 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; 00915 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", 00916 (long)to.tv_sec, (long)to.tv_usec, 00917 (long)tvn.tv_sec, (long)tvn.tv_usec); 00918 tv.tv_sec = to.tv_sec - tvn.tv_sec; 00919 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { 00920 --tv.tv_sec; 00921 tv.tv_usec += 1000000; 00922 } 00923 } while (th->status == THREAD_STOPPED); 00924 th->status = prev_status; 00925 } 00926 00927 void 00928 rb_thread_sleep_forever(void) 00929 { 00930 thread_debug("rb_thread_sleep_forever\n"); 00931 sleep_forever(GET_THREAD(), 0); 00932 } 00933 00934 static void 00935 rb_thread_sleep_deadly(void) 00936 { 00937 thread_debug("rb_thread_sleep_deadly\n"); 00938 sleep_forever(GET_THREAD(), 1); 00939 } 00940 00941 static double 00942 timeofday(void) 00943 { 00944 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00945 struct timespec tp; 00946 00947 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { 00948 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; 00949 } else 00950 #endif 00951 { 00952 struct timeval tv; 00953 gettimeofday(&tv, NULL); 00954 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; 00955 } 00956 } 00957 00958 static void 00959 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec) 00960 { 00961 sleep_timeval(th, double2timeval(sleepsec)); 00962 } 00963 00964 static void 00965 sleep_for_polling(rb_thread_t *th) 00966 { 00967 struct timeval time; 00968 time.tv_sec = 0; 00969 time.tv_usec = 100 * 1000; /* 0.1 sec */ 00970 sleep_timeval(th, time); 00971 } 00972 00973 void 00974 rb_thread_wait_for(struct timeval time) 00975 { 00976 rb_thread_t *th = GET_THREAD(); 00977 sleep_timeval(th, time); 00978 } 00979 00980 void 00981 rb_thread_polling(void) 00982 { 00983 RUBY_VM_CHECK_INTS(); 00984 if (!rb_thread_alone()) { 00985 rb_thread_t *th = GET_THREAD(); 00986 sleep_for_polling(th); 00987 } 00988 } 00989 00990 /* 00991 * CAUTION: This function causes thread switching. 00992 * rb_thread_check_ints() check ruby's interrupts. 00993 * some interrupt needs thread switching/invoke handlers, 00994 * and so on. 00995 */ 00996 00997 void 00998 rb_thread_check_ints(void) 00999 { 01000 RUBY_VM_CHECK_INTS(); 01001 } 01002 01003 /* 01004 * Hidden API for tcl/tk wrapper. 01005 * There is no guarantee to perpetuate it. 01006 */ 01007 int 01008 rb_thread_check_trap_pending(void) 01009 { 01010 return rb_signal_buff_size() != 0; 01011 } 01012 01013 /* This function can be called in blocking region. */ 01014 int 01015 rb_thread_interrupted(VALUE thval) 01016 { 01017 rb_thread_t *th; 01018 GetThreadPtr(thval, th); 01019 return RUBY_VM_INTERRUPTED(th); 01020 } 01021 01022 void 01023 rb_thread_sleep(int sec) 01024 { 01025 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); 01026 } 01027 01028 static void rb_threadptr_execute_interrupts_common(rb_thread_t *); 01029 01030 static void 01031 rb_thread_schedule_limits(unsigned long limits_us) 01032 { 01033 thread_debug("rb_thread_schedule\n"); 01034 if (!rb_thread_alone()) { 01035 rb_thread_t *th = GET_THREAD(); 01036 01037 if (th->running_time_us >= limits_us) { 01038 thread_debug("rb_thread_schedule/switch start\n"); 01039 RB_GC_SAVE_MACHINE_CONTEXT(th); 01040 gvl_yield(th->vm, th); 01041 rb_thread_set_current(th); 01042 thread_debug("rb_thread_schedule/switch done\n"); 01043 } 01044 } 01045 } 01046 01047 void 01048 rb_thread_schedule(void) 01049 { 01050 rb_thread_schedule_limits(0); 01051 01052 if (UNLIKELY(GET_THREAD()->interrupt_flag)) { 01053 rb_threadptr_execute_interrupts_common(GET_THREAD()); 01054 } 01055 } 01056 01057 /* blocking region */ 01058 01059 static inline void 01060 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) 01061 { 01062 gvl_acquire(th->vm, th); 01063 rb_thread_set_current(th); 01064 thread_debug("leave blocking region (%p)\n", (void *)th); 01065 remove_signal_thread_list(th); 01066 th->blocking_region_buffer = 0; 01067 reset_unblock_function(th, ®ion->oldubf); 01068 if (th->status == THREAD_STOPPED) { 01069 th->status = region->prev_status; 01070 } 01071 } 01072 01073 struct rb_blocking_region_buffer * 01074 rb_thread_blocking_region_begin(void) 01075 { 01076 rb_thread_t *th = GET_THREAD(); 01077 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); 01078 blocking_region_begin(th, region, ubf_select, th); 01079 return region; 01080 } 01081 01082 void 01083 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) 01084 { 01085 int saved_errno = errno; 01086 rb_thread_t *th = GET_THREAD(); 01087 blocking_region_end(th, region); 01088 xfree(region); 01089 RUBY_VM_CHECK_INTS(); 01090 errno = saved_errno; 01091 } 01092 01093 /* 01094 * rb_thread_blocking_region - permit concurrent/parallel execution. 01095 * 01096 * This function does: 01097 * (1) release GVL. 01098 * Other Ruby threads may run in parallel. 01099 * (2) call func with data1. 01100 * (3) acquire GVL. 01101 * Other Ruby threads can not run in parallel any more. 01102 * 01103 * If another thread interrupts this thread (Thread#kill, signal delivery, 01104 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means 01105 * "un-blocking function"). `ubf()' should interrupt `func()' execution. 01106 * 01107 * There are built-in ubfs and you can specify these ubfs. 01108 * However, we can not guarantee our built-in ubfs interrupt 01109 * your `func()' correctly. Be careful to use rb_thread_blocking_region(). 01110 * 01111 * * RUBY_UBF_IO: ubf for IO operation 01112 * * RUBY_UBF_PROCESS: ubf for process operation 01113 * 01114 * NOTE: You can not execute most of Ruby C API and touch Ruby 01115 * objects in `func()' and `ubf()', including raising an 01116 * exception, because current thread doesn't acquire GVL 01117 * (cause synchronization problem). If you need to do it, 01118 * read source code of C APIs and confirm by yourself. 01119 * 01120 * NOTE: In short, this API is difficult to use safely. I recommend you 01121 * use other ways if you have. We lack experiences to use this API. 01122 * Please report your problem related on it. 01123 * 01124 * Safe C API: 01125 * * rb_thread_interrupted() - check interrupt flag 01126 * * ruby_xalloc(), ruby_xrealloc(), ruby_xfree() - 01127 * if they called without GVL, acquire GVL automatically. 01128 */ 01129 VALUE 01130 rb_thread_blocking_region( 01131 rb_blocking_function_t *func, void *data1, 01132 rb_unblock_function_t *ubf, void *data2) 01133 { 01134 VALUE val; 01135 rb_thread_t *th = GET_THREAD(); 01136 int saved_errno = 0; 01137 01138 th->waiting_fd = -1; 01139 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { 01140 ubf = ubf_select; 01141 data2 = th; 01142 } 01143 01144 BLOCKING_REGION({ 01145 val = func(data1); 01146 saved_errno = errno; 01147 }, ubf, data2); 01148 errno = saved_errno; 01149 01150 return val; 01151 } 01152 01153 VALUE 01154 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) 01155 { 01156 VALUE val; 01157 rb_thread_t *th = GET_THREAD(); 01158 int saved_errno = 0; 01159 01160 th->waiting_fd = fd; 01161 BLOCKING_REGION({ 01162 val = func(data1); 01163 saved_errno = errno; 01164 }, ubf_select, th); 01165 th->waiting_fd = -1; 01166 errno = saved_errno; 01167 01168 return val; 01169 } 01170 01171 /* alias of rb_thread_blocking_region() */ 01172 01173 VALUE 01174 rb_thread_call_without_gvl( 01175 rb_blocking_function_t *func, void *data1, 01176 rb_unblock_function_t *ubf, void *data2) 01177 { 01178 return rb_thread_blocking_region(func, data1, ubf, data2); 01179 } 01180 01181 /* 01182 * rb_thread_call_with_gvl - re-enter into Ruby world while releasing GVL. 01183 * 01184 *** 01185 *** This API is EXPERIMENTAL! 01186 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01187 *** 01188 * 01189 * While releasing GVL using rb_thread_blocking_region() or 01190 * rb_thread_call_without_gvl(), you can not access Ruby values or invoke methods. 01191 * If you need to access it, you must use this function rb_thread_call_with_gvl(). 01192 * 01193 * This function rb_thread_call_with_gvl() does: 01194 * (1) acquire GVL. 01195 * (2) call passed function `func'. 01196 * (3) release GVL. 01197 * (4) return a value which is returned at (2). 01198 * 01199 * NOTE: You should not return Ruby object at (2) because such Object 01200 * will not marked. 01201 * 01202 * NOTE: If an exception is raised in `func', this function "DOES NOT" 01203 * protect (catch) the exception. If you have any resources 01204 * which should free before throwing exception, you need use 01205 * rb_protect() in `func' and return a value which represents 01206 * exception is raised. 01207 * 01208 * NOTE: This functions should not be called by a thread which 01209 * is not created as Ruby thread (created by Thread.new or so). 01210 * In other words, this function *DOES NOT* associate 01211 * NON-Ruby thread to Ruby thread. 01212 */ 01213 void * 01214 rb_thread_call_with_gvl(void *(*func)(void *), void *data1) 01215 { 01216 rb_thread_t *th = ruby_thread_from_native(); 01217 struct rb_blocking_region_buffer *brb; 01218 struct rb_unblock_callback prev_unblock; 01219 void *r; 01220 01221 if (th == 0) { 01222 /* Error is occurred, but we can't use rb_bug() 01223 * because this thread is not Ruby's thread. 01224 * What should we do? 01225 */ 01226 01227 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); 01228 exit(EXIT_FAILURE); 01229 } 01230 01231 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; 01232 prev_unblock = th->unblock; 01233 01234 if (brb == 0) { 01235 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); 01236 } 01237 01238 blocking_region_end(th, brb); 01239 /* enter to Ruby world: You can access Ruby values, methods and so on. */ 01240 r = (*func)(data1); 01241 /* leave from Ruby world: You can not access Ruby values, etc. */ 01242 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg); 01243 return r; 01244 } 01245 01246 /* 01247 * ruby_thread_has_gvl_p - check if current native thread has GVL. 01248 * 01249 *** 01250 *** This API is EXPERIMENTAL! 01251 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01252 *** 01253 */ 01254 01255 int 01256 ruby_thread_has_gvl_p(void) 01257 { 01258 rb_thread_t *th = ruby_thread_from_native(); 01259 01260 if (th && th->blocking_region_buffer == 0) { 01261 return 1; 01262 } 01263 else { 01264 return 0; 01265 } 01266 } 01267 01268 /* 01269 * call-seq: 01270 * Thread.pass -> nil 01271 * 01272 * Give the thread scheduler a hint to pass execution to another thread. 01273 * A running thread may or may not switch, it depends on OS and processor. 01274 */ 01275 01276 static VALUE 01277 thread_s_pass(VALUE klass) 01278 { 01279 rb_thread_schedule(); 01280 return Qnil; 01281 } 01282 01283 /* 01284 * 01285 */ 01286 01287 static void 01288 rb_threadptr_execute_interrupts_common(rb_thread_t *th) 01289 { 01290 rb_atomic_t interrupt; 01291 01292 if (th->raised_flag) return; 01293 01294 while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) { 01295 enum rb_thread_status status = th->status; 01296 int timer_interrupt = interrupt & 0x01; 01297 int finalizer_interrupt = interrupt & 0x04; 01298 int sig; 01299 01300 th->status = THREAD_RUNNABLE; 01301 01302 /* signal handling */ 01303 if (th == th->vm->main_thread) { 01304 while ((sig = rb_get_next_signal()) != 0) { 01305 rb_signal_exec(th, sig); 01306 } 01307 } 01308 01309 /* exception from another thread */ 01310 if (th->thrown_errinfo) { 01311 VALUE err = th->thrown_errinfo; 01312 th->thrown_errinfo = 0; 01313 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); 01314 01315 if (err == eKillSignal || err == eTerminateSignal) { 01316 th->errinfo = INT2FIX(TAG_FATAL); 01317 TH_JUMP_TAG(th, TAG_FATAL); 01318 } 01319 else { 01320 rb_exc_raise(err); 01321 } 01322 } 01323 th->status = status; 01324 01325 if (finalizer_interrupt) { 01326 rb_gc_finalize_deferred(); 01327 } 01328 01329 if (timer_interrupt) { 01330 unsigned long limits_us = 250 * 1000; 01331 01332 if (th->priority > 0) 01333 limits_us <<= th->priority; 01334 else 01335 limits_us >>= -th->priority; 01336 01337 if (status == THREAD_RUNNABLE) 01338 th->running_time_us += TIME_QUANTUM_USEC; 01339 01340 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); 01341 01342 rb_thread_schedule_limits(limits_us); 01343 } 01344 } 01345 } 01346 01347 void 01348 rb_threadptr_execute_interrupts(rb_thread_t *th) 01349 { 01350 rb_threadptr_execute_interrupts_common(th); 01351 } 01352 01353 void 01354 rb_thread_execute_interrupts(VALUE thval) 01355 { 01356 rb_thread_t *th; 01357 GetThreadPtr(thval, th); 01358 rb_threadptr_execute_interrupts_common(th); 01359 } 01360 01361 void 01362 rb_gc_mark_threads(void) 01363 { 01364 rb_bug("deprecated function rb_gc_mark_threads is called"); 01365 } 01366 01367 /*****************************************************/ 01368 01369 static void 01370 rb_threadptr_ready(rb_thread_t *th) 01371 { 01372 rb_threadptr_interrupt(th); 01373 } 01374 01375 static VALUE 01376 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) 01377 { 01378 VALUE exc; 01379 01380 again: 01381 if (rb_threadptr_dead(th)) { 01382 return Qnil; 01383 } 01384 01385 if (th->thrown_errinfo != 0 || th->raised_flag) { 01386 rb_thread_schedule(); 01387 goto again; 01388 } 01389 01390 exc = rb_make_exception(argc, argv); 01391 th->thrown_errinfo = exc; 01392 rb_threadptr_ready(th); 01393 return Qnil; 01394 } 01395 01396 void 01397 rb_threadptr_signal_raise(rb_thread_t *th, int sig) 01398 { 01399 VALUE argv[2]; 01400 01401 argv[0] = rb_eSignal; 01402 argv[1] = INT2FIX(sig); 01403 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01404 } 01405 01406 void 01407 rb_threadptr_signal_exit(rb_thread_t *th) 01408 { 01409 VALUE argv[2]; 01410 01411 argv[0] = rb_eSystemExit; 01412 argv[1] = rb_str_new2("exit"); 01413 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01414 } 01415 01416 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) 01417 #define USE_SIGALTSTACK 01418 #endif 01419 01420 void 01421 ruby_thread_stack_overflow(rb_thread_t *th) 01422 { 01423 th->raised_flag = 0; 01424 #ifdef USE_SIGALTSTACK 01425 rb_exc_raise(sysstack_error); 01426 #else 01427 th->errinfo = sysstack_error; 01428 TH_JUMP_TAG(th, TAG_RAISE); 01429 #endif 01430 } 01431 01432 int 01433 rb_threadptr_set_raised(rb_thread_t *th) 01434 { 01435 if (th->raised_flag & RAISED_EXCEPTION) { 01436 return 1; 01437 } 01438 th->raised_flag |= RAISED_EXCEPTION; 01439 return 0; 01440 } 01441 01442 int 01443 rb_threadptr_reset_raised(rb_thread_t *th) 01444 { 01445 if (!(th->raised_flag & RAISED_EXCEPTION)) { 01446 return 0; 01447 } 01448 th->raised_flag &= ~RAISED_EXCEPTION; 01449 return 1; 01450 } 01451 01452 #define THREAD_IO_WAITING_P(th) ( \ 01453 ((th)->status == THREAD_STOPPED || \ 01454 (th)->status == THREAD_STOPPED_FOREVER) && \ 01455 (th)->blocking_region_buffer && \ 01456 (th)->unblock.func == ubf_select && \ 01457 1) 01458 01459 static int 01460 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) 01461 { 01462 int fd = (int)data; 01463 rb_thread_t *th; 01464 GetThreadPtr((VALUE)key, th); 01465 01466 if (THREAD_IO_WAITING_P(th)) { 01467 native_mutex_lock(&th->interrupt_lock); 01468 if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) { 01469 th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream]; 01470 RUBY_VM_SET_INTERRUPT(th); 01471 (th->unblock.func)(th->unblock.arg); 01472 } 01473 native_mutex_unlock(&th->interrupt_lock); 01474 } 01475 return ST_CONTINUE; 01476 } 01477 01478 void 01479 rb_thread_fd_close(int fd) 01480 { 01481 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); 01482 } 01483 01484 /* 01485 * call-seq: 01486 * thr.raise 01487 * thr.raise(string) 01488 * thr.raise(exception [, string [, array]]) 01489 * 01490 * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The 01491 * caller does not have to be <i>thr</i>. 01492 * 01493 * Thread.abort_on_exception = true 01494 * a = Thread.new { sleep(200) } 01495 * a.raise("Gotcha") 01496 * 01497 * <em>produces:</em> 01498 * 01499 * prog.rb:3: Gotcha (RuntimeError) 01500 * from prog.rb:2:in `initialize' 01501 * from prog.rb:2:in `new' 01502 * from prog.rb:2 01503 */ 01504 01505 static VALUE 01506 thread_raise_m(int argc, VALUE *argv, VALUE self) 01507 { 01508 rb_thread_t *th; 01509 GetThreadPtr(self, th); 01510 rb_threadptr_raise(th, argc, argv); 01511 return Qnil; 01512 } 01513 01514 01515 /* 01516 * call-seq: 01517 * thr.exit -> thr or nil 01518 * thr.kill -> thr or nil 01519 * thr.terminate -> thr or nil 01520 * 01521 * Terminates <i>thr</i> and schedules another thread to be run. If this thread 01522 * is already marked to be killed, <code>exit</code> returns the 01523 * <code>Thread</code>. If this is the main thread, or the last thread, exits 01524 * the process. 01525 */ 01526 01527 VALUE 01528 rb_thread_kill(VALUE thread) 01529 { 01530 rb_thread_t *th; 01531 01532 GetThreadPtr(thread, th); 01533 01534 if (th != GET_THREAD() && th->safe_level < 4) { 01535 rb_secure(4); 01536 } 01537 if (th->status == THREAD_TO_KILL || th->status == THREAD_KILLED) { 01538 return thread; 01539 } 01540 if (th == th->vm->main_thread) { 01541 rb_exit(EXIT_SUCCESS); 01542 } 01543 01544 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); 01545 01546 rb_threadptr_interrupt(th); 01547 th->thrown_errinfo = eKillSignal; 01548 th->status = THREAD_TO_KILL; 01549 01550 return thread; 01551 } 01552 01553 01554 /* 01555 * call-seq: 01556 * Thread.kill(thread) -> thread 01557 * 01558 * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>). 01559 * 01560 * count = 0 01561 * a = Thread.new { loop { count += 1 } } 01562 * sleep(0.1) #=> 0 01563 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead> 01564 * count #=> 93947 01565 * a.alive? #=> false 01566 */ 01567 01568 static VALUE 01569 rb_thread_s_kill(VALUE obj, VALUE th) 01570 { 01571 return rb_thread_kill(th); 01572 } 01573 01574 01575 /* 01576 * call-seq: 01577 * Thread.exit -> thread 01578 * 01579 * Terminates the currently running thread and schedules another thread to be 01580 * run. If this thread is already marked to be killed, <code>exit</code> 01581 * returns the <code>Thread</code>. If this is the main thread, or the last 01582 * thread, exit the process. 01583 */ 01584 01585 static VALUE 01586 rb_thread_exit(void) 01587 { 01588 return rb_thread_kill(GET_THREAD()->self); 01589 } 01590 01591 01592 /* 01593 * call-seq: 01594 * thr.wakeup -> thr 01595 * 01596 * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on 01597 * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>). 01598 * 01599 * c = Thread.new { Thread.stop; puts "hey!" } 01600 * sleep 0.1 while c.status!='sleep' 01601 * c.wakeup 01602 * c.join 01603 * 01604 * <em>produces:</em> 01605 * 01606 * hey! 01607 */ 01608 01609 VALUE 01610 rb_thread_wakeup(VALUE thread) 01611 { 01612 if (!RTEST(rb_thread_wakeup_alive(thread))) { 01613 rb_raise(rb_eThreadError, "killed thread"); 01614 } 01615 return thread; 01616 } 01617 01618 VALUE 01619 rb_thread_wakeup_alive(VALUE thread) 01620 { 01621 rb_thread_t *th; 01622 GetThreadPtr(thread, th); 01623 01624 if (th->status == THREAD_KILLED) { 01625 return Qnil; 01626 } 01627 rb_threadptr_ready(th); 01628 if (th->status != THREAD_TO_KILL) { 01629 th->status = THREAD_RUNNABLE; 01630 } 01631 return thread; 01632 } 01633 01634 01635 /* 01636 * call-seq: 01637 * thr.run -> thr 01638 * 01639 * Wakes up <i>thr</i>, making it eligible for scheduling. 01640 * 01641 * a = Thread.new { puts "a"; Thread.stop; puts "c" } 01642 * sleep 0.1 while a.status!='sleep' 01643 * puts "Got here" 01644 * a.run 01645 * a.join 01646 * 01647 * <em>produces:</em> 01648 * 01649 * a 01650 * Got here 01651 * c 01652 */ 01653 01654 VALUE 01655 rb_thread_run(VALUE thread) 01656 { 01657 rb_thread_wakeup(thread); 01658 rb_thread_schedule(); 01659 return thread; 01660 } 01661 01662 01663 /* 01664 * call-seq: 01665 * Thread.stop -> nil 01666 * 01667 * Stops execution of the current thread, putting it into a ``sleep'' state, 01668 * and schedules execution of another thread. 01669 * 01670 * a = Thread.new { print "a"; Thread.stop; print "c" } 01671 * sleep 0.1 while a.status!='sleep' 01672 * print "b" 01673 * a.run 01674 * a.join 01675 * 01676 * <em>produces:</em> 01677 * 01678 * abc 01679 */ 01680 01681 VALUE 01682 rb_thread_stop(void) 01683 { 01684 if (rb_thread_alone()) { 01685 rb_raise(rb_eThreadError, 01686 "stopping only thread\n\tnote: use sleep to stop forever"); 01687 } 01688 rb_thread_sleep_deadly(); 01689 return Qnil; 01690 } 01691 01692 static int 01693 thread_list_i(st_data_t key, st_data_t val, void *data) 01694 { 01695 VALUE ary = (VALUE)data; 01696 rb_thread_t *th; 01697 GetThreadPtr((VALUE)key, th); 01698 01699 switch (th->status) { 01700 case THREAD_RUNNABLE: 01701 case THREAD_STOPPED: 01702 case THREAD_STOPPED_FOREVER: 01703 case THREAD_TO_KILL: 01704 rb_ary_push(ary, th->self); 01705 default: 01706 break; 01707 } 01708 return ST_CONTINUE; 01709 } 01710 01711 /********************************************************************/ 01712 01713 /* 01714 * call-seq: 01715 * Thread.list -> array 01716 * 01717 * Returns an array of <code>Thread</code> objects for all threads that are 01718 * either runnable or stopped. 01719 * 01720 * Thread.new { sleep(200) } 01721 * Thread.new { 1000000.times {|i| i*i } } 01722 * Thread.new { Thread.stop } 01723 * Thread.list.each {|t| p t} 01724 * 01725 * <em>produces:</em> 01726 * 01727 * #<Thread:0x401b3e84 sleep> 01728 * #<Thread:0x401b3f38 run> 01729 * #<Thread:0x401b3fb0 sleep> 01730 * #<Thread:0x401bdf4c run> 01731 */ 01732 01733 VALUE 01734 rb_thread_list(void) 01735 { 01736 VALUE ary = rb_ary_new(); 01737 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary); 01738 return ary; 01739 } 01740 01741 VALUE 01742 rb_thread_current(void) 01743 { 01744 return GET_THREAD()->self; 01745 } 01746 01747 /* 01748 * call-seq: 01749 * Thread.current -> thread 01750 * 01751 * Returns the currently executing thread. 01752 * 01753 * Thread.current #=> #<Thread:0x401bdf4c run> 01754 */ 01755 01756 static VALUE 01757 thread_s_current(VALUE klass) 01758 { 01759 return rb_thread_current(); 01760 } 01761 01762 VALUE 01763 rb_thread_main(void) 01764 { 01765 return GET_THREAD()->vm->main_thread->self; 01766 } 01767 01768 /* 01769 * call-seq: 01770 * Thread.main -> thread 01771 * 01772 * Returns the main thread. 01773 */ 01774 01775 static VALUE 01776 rb_thread_s_main(VALUE klass) 01777 { 01778 return rb_thread_main(); 01779 } 01780 01781 01782 /* 01783 * call-seq: 01784 * Thread.abort_on_exception -> true or false 01785 * 01786 * Returns the status of the global ``abort on exception'' condition. The 01787 * default is <code>false</code>. When set to <code>true</code>, or if the 01788 * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the 01789 * command line option <code>-d</code> was specified) all threads will abort 01790 * (the process will <code>exit(0)</code>) if an exception is raised in any 01791 * thread. See also <code>Thread::abort_on_exception=</code>. 01792 */ 01793 01794 static VALUE 01795 rb_thread_s_abort_exc(void) 01796 { 01797 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; 01798 } 01799 01800 01801 /* 01802 * call-seq: 01803 * Thread.abort_on_exception= boolean -> true or false 01804 * 01805 * When set to <code>true</code>, all threads will abort if an exception is 01806 * raised. Returns the new state. 01807 * 01808 * Thread.abort_on_exception = true 01809 * t1 = Thread.new do 01810 * puts "In new thread" 01811 * raise "Exception from thread" 01812 * end 01813 * sleep(1) 01814 * puts "not reached" 01815 * 01816 * <em>produces:</em> 01817 * 01818 * In new thread 01819 * prog.rb:4: Exception from thread (RuntimeError) 01820 * from prog.rb:2:in `initialize' 01821 * from prog.rb:2:in `new' 01822 * from prog.rb:2 01823 */ 01824 01825 static VALUE 01826 rb_thread_s_abort_exc_set(VALUE self, VALUE val) 01827 { 01828 rb_secure(4); 01829 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); 01830 return val; 01831 } 01832 01833 01834 /* 01835 * call-seq: 01836 * thr.abort_on_exception -> true or false 01837 * 01838 * Returns the status of the thread-local ``abort on exception'' condition for 01839 * <i>thr</i>. The default is <code>false</code>. See also 01840 * <code>Thread::abort_on_exception=</code>. 01841 */ 01842 01843 static VALUE 01844 rb_thread_abort_exc(VALUE thread) 01845 { 01846 rb_thread_t *th; 01847 GetThreadPtr(thread, th); 01848 return th->abort_on_exception ? Qtrue : Qfalse; 01849 } 01850 01851 01852 /* 01853 * call-seq: 01854 * thr.abort_on_exception= boolean -> true or false 01855 * 01856 * When set to <code>true</code>, causes all threads (including the main 01857 * program) to abort if an exception is raised in <i>thr</i>. The process will 01858 * effectively <code>exit(0)</code>. 01859 */ 01860 01861 static VALUE 01862 rb_thread_abort_exc_set(VALUE thread, VALUE val) 01863 { 01864 rb_thread_t *th; 01865 rb_secure(4); 01866 01867 GetThreadPtr(thread, th); 01868 th->abort_on_exception = RTEST(val); 01869 return val; 01870 } 01871 01872 01873 /* 01874 * call-seq: 01875 * thr.group -> thgrp or nil 01876 * 01877 * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if 01878 * the thread is not a member of any group. 01879 * 01880 * Thread.main.group #=> #<ThreadGroup:0x4029d914> 01881 */ 01882 01883 VALUE 01884 rb_thread_group(VALUE thread) 01885 { 01886 rb_thread_t *th; 01887 VALUE group; 01888 GetThreadPtr(thread, th); 01889 group = th->thgroup; 01890 01891 if (!group) { 01892 group = Qnil; 01893 } 01894 return group; 01895 } 01896 01897 static const char * 01898 thread_status_name(enum rb_thread_status status) 01899 { 01900 switch (status) { 01901 case THREAD_RUNNABLE: 01902 return "run"; 01903 case THREAD_STOPPED: 01904 case THREAD_STOPPED_FOREVER: 01905 return "sleep"; 01906 case THREAD_TO_KILL: 01907 return "aborting"; 01908 case THREAD_KILLED: 01909 return "dead"; 01910 default: 01911 return "unknown"; 01912 } 01913 } 01914 01915 static int 01916 rb_threadptr_dead(rb_thread_t *th) 01917 { 01918 return th->status == THREAD_KILLED; 01919 } 01920 01921 01922 /* 01923 * call-seq: 01924 * thr.status -> string, false or nil 01925 * 01926 * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is 01927 * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing, 01928 * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if 01929 * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i> 01930 * terminated with an exception. 01931 * 01932 * a = Thread.new { raise("die now") } 01933 * b = Thread.new { Thread.stop } 01934 * c = Thread.new { Thread.exit } 01935 * d = Thread.new { sleep } 01936 * d.kill #=> #<Thread:0x401b3678 aborting> 01937 * a.status #=> nil 01938 * b.status #=> "sleep" 01939 * c.status #=> false 01940 * d.status #=> "aborting" 01941 * Thread.current.status #=> "run" 01942 */ 01943 01944 static VALUE 01945 rb_thread_status(VALUE thread) 01946 { 01947 rb_thread_t *th; 01948 GetThreadPtr(thread, th); 01949 01950 if (rb_threadptr_dead(th)) { 01951 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) 01952 /* TODO */ ) { 01953 return Qnil; 01954 } 01955 return Qfalse; 01956 } 01957 return rb_str_new2(thread_status_name(th->status)); 01958 } 01959 01960 01961 /* 01962 * call-seq: 01963 * thr.alive? -> true or false 01964 * 01965 * Returns <code>true</code> if <i>thr</i> is running or sleeping. 01966 * 01967 * thr = Thread.new { } 01968 * thr.join #=> #<Thread:0x401b3fb0 dead> 01969 * Thread.current.alive? #=> true 01970 * thr.alive? #=> false 01971 */ 01972 01973 static VALUE 01974 rb_thread_alive_p(VALUE thread) 01975 { 01976 rb_thread_t *th; 01977 GetThreadPtr(thread, th); 01978 01979 if (rb_threadptr_dead(th)) 01980 return Qfalse; 01981 return Qtrue; 01982 } 01983 01984 /* 01985 * call-seq: 01986 * thr.stop? -> true or false 01987 * 01988 * Returns <code>true</code> if <i>thr</i> is dead or sleeping. 01989 * 01990 * a = Thread.new { Thread.stop } 01991 * b = Thread.current 01992 * a.stop? #=> true 01993 * b.stop? #=> false 01994 */ 01995 01996 static VALUE 01997 rb_thread_stop_p(VALUE thread) 01998 { 01999 rb_thread_t *th; 02000 GetThreadPtr(thread, th); 02001 02002 if (rb_threadptr_dead(th)) 02003 return Qtrue; 02004 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 02005 return Qtrue; 02006 return Qfalse; 02007 } 02008 02009 /* 02010 * call-seq: 02011 * thr.safe_level -> integer 02012 * 02013 * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe 02014 * levels can help when implementing sandboxes which run insecure code. 02015 * 02016 * thr = Thread.new { $SAFE = 3; sleep } 02017 * Thread.current.safe_level #=> 0 02018 * thr.safe_level #=> 3 02019 */ 02020 02021 static VALUE 02022 rb_thread_safe_level(VALUE thread) 02023 { 02024 rb_thread_t *th; 02025 GetThreadPtr(thread, th); 02026 02027 return INT2NUM(th->safe_level); 02028 } 02029 02030 /* 02031 * call-seq: 02032 * thr.inspect -> string 02033 * 02034 * Dump the name, id, and status of _thr_ to a string. 02035 */ 02036 02037 static VALUE 02038 rb_thread_inspect(VALUE thread) 02039 { 02040 const char *cname = rb_obj_classname(thread); 02041 rb_thread_t *th; 02042 const char *status; 02043 VALUE str; 02044 02045 GetThreadPtr(thread, th); 02046 status = thread_status_name(th->status); 02047 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status); 02048 OBJ_INFECT(str, thread); 02049 02050 return str; 02051 } 02052 02053 VALUE 02054 rb_thread_local_aref(VALUE thread, ID id) 02055 { 02056 rb_thread_t *th; 02057 st_data_t val; 02058 02059 GetThreadPtr(thread, th); 02060 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02061 rb_raise(rb_eSecurityError, "Insecure: thread locals"); 02062 } 02063 if (!th->local_storage) { 02064 return Qnil; 02065 } 02066 if (st_lookup(th->local_storage, id, &val)) { 02067 return (VALUE)val; 02068 } 02069 return Qnil; 02070 } 02071 02072 /* 02073 * call-seq: 02074 * thr[sym] -> obj or nil 02075 * 02076 * Attribute Reference---Returns the value of a thread-local variable, using 02077 * either a symbol or a string name. If the specified variable does not exist, 02078 * returns <code>nil</code>. 02079 * 02080 * [ 02081 * Thread.new { Thread.current["name"] = "A" }, 02082 * Thread.new { Thread.current[:name] = "B" }, 02083 * Thread.new { Thread.current["name"] = "C" } 02084 * ].each do |th| 02085 * th.join 02086 * puts "#{th.inspect}: #{th[:name]}" 02087 * end 02088 * 02089 * <em>produces:</em> 02090 * 02091 * #<Thread:0x00000002a54220 dead>: A 02092 * #<Thread:0x00000002a541a8 dead>: B 02093 * #<Thread:0x00000002a54130 dead>: C 02094 */ 02095 02096 static VALUE 02097 rb_thread_aref(VALUE thread, VALUE id) 02098 { 02099 return rb_thread_local_aref(thread, rb_to_id(id)); 02100 } 02101 02102 VALUE 02103 rb_thread_local_aset(VALUE thread, ID id, VALUE val) 02104 { 02105 rb_thread_t *th; 02106 GetThreadPtr(thread, th); 02107 02108 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02109 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02110 } 02111 if (OBJ_FROZEN(thread)) { 02112 rb_error_frozen("thread locals"); 02113 } 02114 if (!th->local_storage) { 02115 th->local_storage = st_init_numtable(); 02116 } 02117 if (NIL_P(val)) { 02118 st_delete_wrap(th->local_storage, id); 02119 return Qnil; 02120 } 02121 st_insert(th->local_storage, id, val); 02122 return val; 02123 } 02124 02125 /* 02126 * call-seq: 02127 * thr[sym] = obj -> obj 02128 * 02129 * Attribute Assignment---Sets or creates the value of a thread-local variable, 02130 * using either a symbol or a string. See also <code>Thread#[]</code>. 02131 */ 02132 02133 static VALUE 02134 rb_thread_aset(VALUE self, VALUE id, VALUE val) 02135 { 02136 return rb_thread_local_aset(self, rb_to_id(id), val); 02137 } 02138 02139 /* 02140 * call-seq: 02141 * thr.key?(sym) -> true or false 02142 * 02143 * Returns <code>true</code> if the given string (or symbol) exists as a 02144 * thread-local variable. 02145 * 02146 * me = Thread.current 02147 * me[:oliver] = "a" 02148 * me.key?(:oliver) #=> true 02149 * me.key?(:stanley) #=> false 02150 */ 02151 02152 static VALUE 02153 rb_thread_key_p(VALUE self, VALUE key) 02154 { 02155 rb_thread_t *th; 02156 ID id = rb_to_id(key); 02157 02158 GetThreadPtr(self, th); 02159 02160 if (!th->local_storage) { 02161 return Qfalse; 02162 } 02163 if (st_lookup(th->local_storage, id, 0)) { 02164 return Qtrue; 02165 } 02166 return Qfalse; 02167 } 02168 02169 static int 02170 thread_keys_i(ID key, VALUE value, VALUE ary) 02171 { 02172 rb_ary_push(ary, ID2SYM(key)); 02173 return ST_CONTINUE; 02174 } 02175 02176 static int 02177 vm_living_thread_num(rb_vm_t *vm) 02178 { 02179 return (int)vm->living_threads->num_entries; 02180 } 02181 02182 int 02183 rb_thread_alone(void) 02184 { 02185 int num = 1; 02186 if (GET_THREAD()->vm->living_threads) { 02187 num = vm_living_thread_num(GET_THREAD()->vm); 02188 thread_debug("rb_thread_alone: %d\n", num); 02189 } 02190 return num == 1; 02191 } 02192 02193 /* 02194 * call-seq: 02195 * thr.keys -> array 02196 * 02197 * Returns an an array of the names of the thread-local variables (as Symbols). 02198 * 02199 * thr = Thread.new do 02200 * Thread.current[:cat] = 'meow' 02201 * Thread.current["dog"] = 'woof' 02202 * end 02203 * thr.join #=> #<Thread:0x401b3f10 dead> 02204 * thr.keys #=> [:dog, :cat] 02205 */ 02206 02207 static VALUE 02208 rb_thread_keys(VALUE self) 02209 { 02210 rb_thread_t *th; 02211 VALUE ary = rb_ary_new(); 02212 GetThreadPtr(self, th); 02213 02214 if (th->local_storage) { 02215 st_foreach(th->local_storage, thread_keys_i, ary); 02216 } 02217 return ary; 02218 } 02219 02220 /* 02221 * call-seq: 02222 * thr.priority -> integer 02223 * 02224 * Returns the priority of <i>thr</i>. Default is inherited from the 02225 * current thread which creating the new thread, or zero for the 02226 * initial main thread; higher-priority thread will run more frequently 02227 * than lower-priority threads (but lower-priority threads can also run). 02228 * 02229 * This is just hint for Ruby thread scheduler. It may be ignored on some 02230 * platform. 02231 * 02232 * Thread.current.priority #=> 0 02233 */ 02234 02235 static VALUE 02236 rb_thread_priority(VALUE thread) 02237 { 02238 rb_thread_t *th; 02239 GetThreadPtr(thread, th); 02240 return INT2NUM(th->priority); 02241 } 02242 02243 02244 /* 02245 * call-seq: 02246 * thr.priority= integer -> thr 02247 * 02248 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads 02249 * will run more frequently than lower-priority threads (but lower-priority 02250 * threads can also run). 02251 * 02252 * This is just hint for Ruby thread scheduler. It may be ignored on some 02253 * platform. 02254 * 02255 * count1 = count2 = 0 02256 * a = Thread.new do 02257 * loop { count1 += 1 } 02258 * end 02259 * a.priority = -1 02260 * 02261 * b = Thread.new do 02262 * loop { count2 += 1 } 02263 * end 02264 * b.priority = -2 02265 * sleep 1 #=> 1 02266 * count1 #=> 622504 02267 * count2 #=> 5832 02268 */ 02269 02270 static VALUE 02271 rb_thread_priority_set(VALUE thread, VALUE prio) 02272 { 02273 rb_thread_t *th; 02274 int priority; 02275 GetThreadPtr(thread, th); 02276 02277 rb_secure(4); 02278 02279 #if USE_NATIVE_THREAD_PRIORITY 02280 th->priority = NUM2INT(prio); 02281 native_thread_apply_priority(th); 02282 #else 02283 priority = NUM2INT(prio); 02284 if (priority > RUBY_THREAD_PRIORITY_MAX) { 02285 priority = RUBY_THREAD_PRIORITY_MAX; 02286 } 02287 else if (priority < RUBY_THREAD_PRIORITY_MIN) { 02288 priority = RUBY_THREAD_PRIORITY_MIN; 02289 } 02290 th->priority = priority; 02291 #endif 02292 return INT2NUM(th->priority); 02293 } 02294 02295 /* for IO */ 02296 02297 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) 02298 02299 /* 02300 * several Unix platforms support file descriptors bigger than FD_SETSIZE 02301 * in select(2) system call. 02302 * 02303 * - Linux 2.2.12 (?) 02304 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25) 02305 * select(2) documents how to allocate fd_set dynamically. 02306 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0 02307 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19) 02308 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) 02309 * select(2) documents how to allocate fd_set dynamically. 02310 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 02311 * - HP-UX documents how to allocate fd_set dynamically. 02312 * http://docs.hp.com/en/B2355-60105/select.2.html 02313 * - Solaris 8 has select_large_fdset 02314 * 02315 * When fd_set is not big enough to hold big file descriptors, 02316 * it should be allocated dynamically. 02317 * Note that this assumes fd_set is structured as bitmap. 02318 * 02319 * rb_fd_init allocates the memory. 02320 * rb_fd_term free the memory. 02321 * rb_fd_set may re-allocates bitmap. 02322 * 02323 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE. 02324 */ 02325 02326 void 02327 rb_fd_init(rb_fdset_t *fds) 02328 { 02329 fds->maxfd = 0; 02330 fds->fdset = ALLOC(fd_set); 02331 FD_ZERO(fds->fdset); 02332 } 02333 02334 void 02335 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02336 { 02337 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02338 02339 if (size < sizeof(fd_set)) 02340 size = sizeof(fd_set); 02341 dst->maxfd = src->maxfd; 02342 dst->fdset = xmalloc(size); 02343 memcpy(dst->fdset, src->fdset, size); 02344 } 02345 02346 void 02347 rb_fd_term(rb_fdset_t *fds) 02348 { 02349 if (fds->fdset) xfree(fds->fdset); 02350 fds->maxfd = 0; 02351 fds->fdset = 0; 02352 } 02353 02354 void 02355 rb_fd_zero(rb_fdset_t *fds) 02356 { 02357 if (fds->fdset) 02358 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); 02359 } 02360 02361 static void 02362 rb_fd_resize(int n, rb_fdset_t *fds) 02363 { 02364 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); 02365 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); 02366 02367 if (m < sizeof(fd_set)) m = sizeof(fd_set); 02368 if (o < sizeof(fd_set)) o = sizeof(fd_set); 02369 02370 if (m > o) { 02371 fds->fdset = xrealloc(fds->fdset, m); 02372 memset((char *)fds->fdset + o, 0, m - o); 02373 } 02374 if (n >= fds->maxfd) fds->maxfd = n + 1; 02375 } 02376 02377 void 02378 rb_fd_set(int n, rb_fdset_t *fds) 02379 { 02380 rb_fd_resize(n, fds); 02381 FD_SET(n, fds->fdset); 02382 } 02383 02384 void 02385 rb_fd_clr(int n, rb_fdset_t *fds) 02386 { 02387 if (n >= fds->maxfd) return; 02388 FD_CLR(n, fds->fdset); 02389 } 02390 02391 int 02392 rb_fd_isset(int n, const rb_fdset_t *fds) 02393 { 02394 if (n >= fds->maxfd) return 0; 02395 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */ 02396 } 02397 02398 void 02399 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) 02400 { 02401 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); 02402 02403 if (size < sizeof(fd_set)) size = sizeof(fd_set); 02404 dst->maxfd = max; 02405 dst->fdset = xrealloc(dst->fdset, size); 02406 memcpy(dst->fdset, src, size); 02407 } 02408 02409 static void 02410 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02411 { 02412 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02413 02414 if (size > sizeof(fd_set)) { 02415 rb_raise(rb_eArgError, "too large fdsets"); 02416 } 02417 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set)); 02418 } 02419 02420 void 02421 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) 02422 { 02423 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02424 02425 if (size < sizeof(fd_set)) 02426 size = sizeof(fd_set); 02427 dst->maxfd = src->maxfd; 02428 dst->fdset = xrealloc(dst->fdset, size); 02429 memcpy(dst->fdset, src->fdset, size); 02430 } 02431 02432 int 02433 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) 02434 { 02435 fd_set *r = NULL, *w = NULL, *e = NULL; 02436 if (readfds) { 02437 rb_fd_resize(n - 1, readfds); 02438 r = rb_fd_ptr(readfds); 02439 } 02440 if (writefds) { 02441 rb_fd_resize(n - 1, writefds); 02442 w = rb_fd_ptr(writefds); 02443 } 02444 if (exceptfds) { 02445 rb_fd_resize(n - 1, exceptfds); 02446 e = rb_fd_ptr(exceptfds); 02447 } 02448 return select(n, r, w, e, timeout); 02449 } 02450 02451 #undef FD_ZERO 02452 #undef FD_SET 02453 #undef FD_CLR 02454 #undef FD_ISSET 02455 02456 #define FD_ZERO(f) rb_fd_zero(f) 02457 #define FD_SET(i, f) rb_fd_set((i), (f)) 02458 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02459 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02460 02461 #elif defined(_WIN32) 02462 02463 void 02464 rb_fd_init(rb_fdset_t *set) 02465 { 02466 set->capa = FD_SETSIZE; 02467 set->fdset = ALLOC(fd_set); 02468 FD_ZERO(set->fdset); 02469 } 02470 02471 void 02472 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02473 { 02474 rb_fd_init(dst); 02475 rb_fd_dup(dst, src); 02476 } 02477 02478 static void 02479 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02480 { 02481 int max = rb_fd_max(src); 02482 02483 /* we assume src is the result of select() with dst, so dst should be 02484 * larger or equal than src. */ 02485 if (max > FD_SETSIZE || max > dst->fd_count) { 02486 rb_raise(rb_eArgError, "too large fdsets"); 02487 } 02488 02489 memcpy(dst->fd_array, src->fdset->fd_array, max); 02490 dst->fd_count = max; 02491 } 02492 02493 void 02494 rb_fd_term(rb_fdset_t *set) 02495 { 02496 xfree(set->fdset); 02497 set->fdset = NULL; 02498 set->capa = 0; 02499 } 02500 02501 void 02502 rb_fd_set(int fd, rb_fdset_t *set) 02503 { 02504 unsigned int i; 02505 SOCKET s = rb_w32_get_osfhandle(fd); 02506 02507 for (i = 0; i < set->fdset->fd_count; i++) { 02508 if (set->fdset->fd_array[i] == s) { 02509 return; 02510 } 02511 } 02512 if (set->fdset->fd_count >= (unsigned)set->capa) { 02513 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; 02514 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); 02515 } 02516 set->fdset->fd_array[set->fdset->fd_count++] = s; 02517 } 02518 02519 #undef FD_ZERO 02520 #undef FD_SET 02521 #undef FD_CLR 02522 #undef FD_ISSET 02523 02524 #define FD_ZERO(f) rb_fd_zero(f) 02525 #define FD_SET(i, f) rb_fd_set((i), (f)) 02526 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02527 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02528 02529 #else 02530 #define rb_fd_rcopy(d, s) (*(d) = *(s)) 02531 #endif 02532 02533 #if defined(__CYGWIN__) 02534 static long 02535 cmp_tv(const struct timeval *a, const struct timeval *b) 02536 { 02537 long d = (a->tv_sec - b->tv_sec); 02538 return (d != 0) ? d : (a->tv_usec - b->tv_usec); 02539 } 02540 02541 static int 02542 subtract_tv(struct timeval *rest, const struct timeval *wait) 02543 { 02544 if (rest->tv_sec < wait->tv_sec) { 02545 return 0; 02546 } 02547 while (rest->tv_usec < wait->tv_usec) { 02548 if (rest->tv_sec <= wait->tv_sec) { 02549 return 0; 02550 } 02551 rest->tv_sec -= 1; 02552 rest->tv_usec += 1000 * 1000; 02553 } 02554 rest->tv_sec -= wait->tv_sec; 02555 rest->tv_usec -= wait->tv_usec; 02556 return rest->tv_sec != 0 || rest->tv_usec != 0; 02557 } 02558 #endif 02559 02560 static int 02561 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, 02562 struct timeval *timeout) 02563 { 02564 int result, lerrno; 02565 rb_fdset_t UNINITIALIZED_VAR(orig_read); 02566 rb_fdset_t UNINITIALIZED_VAR(orig_write); 02567 rb_fdset_t UNINITIALIZED_VAR(orig_except); 02568 double limit = 0; 02569 struct timeval wait_rest; 02570 # if defined(__CYGWIN__) 02571 struct timeval start_time; 02572 # endif 02573 02574 if (timeout) { 02575 # if defined(__CYGWIN__) 02576 gettimeofday(&start_time, NULL); 02577 limit = (double)start_time.tv_sec + (double)start_time.tv_usec*1e-6; 02578 # else 02579 limit = timeofday(); 02580 # endif 02581 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; 02582 wait_rest = *timeout; 02583 timeout = &wait_rest; 02584 } 02585 02586 if (read) 02587 rb_fd_init_copy(&orig_read, read); 02588 if (write) 02589 rb_fd_init_copy(&orig_write, write); 02590 if (except) 02591 rb_fd_init_copy(&orig_except, except); 02592 02593 retry: 02594 lerrno = 0; 02595 02596 #if defined(__CYGWIN__) 02597 { 02598 int finish = 0; 02599 /* polling duration: 100ms */ 02600 struct timeval wait_100ms, *wait; 02601 wait_100ms.tv_sec = 0; 02602 wait_100ms.tv_usec = 100 * 1000; /* 100 ms */ 02603 02604 do { 02605 wait = (timeout == 0 || cmp_tv(&wait_100ms, timeout) < 0) ? &wait_100ms : timeout; 02606 BLOCKING_REGION({ 02607 do { 02608 result = rb_fd_select(n, read, write, except, wait); 02609 if (result < 0) lerrno = errno; 02610 if (result != 0) break; 02611 02612 if (read) 02613 rb_fd_dup(read, &orig_read); 02614 if (write) 02615 rb_fd_dup(write, &orig_write); 02616 if (except) 02617 rb_fd_dup(except, &orig_except); 02618 if (timeout) { 02619 struct timeval elapsed; 02620 gettimeofday(&elapsed, NULL); 02621 subtract_tv(&elapsed, &start_time); 02622 gettimeofday(&start_time, NULL); 02623 if (!subtract_tv(timeout, &elapsed)) { 02624 finish = 1; 02625 break; 02626 } 02627 if (cmp_tv(&wait_100ms, timeout) > 0) wait = timeout; 02628 } 02629 } while (__th->interrupt_flag == 0); 02630 }, 0, 0); 02631 } while (result == 0 && !finish); 02632 } 02633 #elif defined(_WIN32) 02634 { 02635 rb_thread_t *th = GET_THREAD(); 02636 BLOCKING_REGION({ 02637 result = native_fd_select(n, read, write, except, timeout, th); 02638 if (result < 0) lerrno = errno; 02639 }, ubf_select, th); 02640 } 02641 #else 02642 BLOCKING_REGION({ 02643 result = rb_fd_select(n, read, write, except, timeout); 02644 if (result < 0) lerrno = errno; 02645 }, ubf_select, GET_THREAD()); 02646 #endif 02647 02648 errno = lerrno; 02649 02650 if (result < 0) { 02651 switch (errno) { 02652 case EINTR: 02653 #ifdef ERESTART 02654 case ERESTART: 02655 #endif 02656 if (read) 02657 rb_fd_dup(read, &orig_read); 02658 if (write) 02659 rb_fd_dup(write, &orig_write); 02660 if (except) 02661 rb_fd_dup(except, &orig_except); 02662 02663 if (timeout) { 02664 double d = limit - timeofday(); 02665 02666 wait_rest.tv_sec = (unsigned int)d; 02667 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6); 02668 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0; 02669 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0; 02670 } 02671 02672 goto retry; 02673 default: 02674 break; 02675 } 02676 } 02677 02678 if (read) 02679 rb_fd_term(&orig_read); 02680 if (write) 02681 rb_fd_term(&orig_write); 02682 if (except) 02683 rb_fd_term(&orig_except); 02684 02685 return result; 02686 } 02687 02688 static void 02689 rb_thread_wait_fd_rw(int fd, int read) 02690 { 02691 int result = 0; 02692 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; 02693 02694 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); 02695 02696 if (fd < 0) { 02697 rb_raise(rb_eIOError, "closed stream"); 02698 } 02699 if (rb_thread_alone()) return; 02700 while (result <= 0) { 02701 result = rb_wait_for_single_fd(fd, events, NULL); 02702 02703 if (result < 0) { 02704 rb_sys_fail(0); 02705 } 02706 } 02707 02708 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); 02709 } 02710 02711 void 02712 rb_thread_wait_fd(int fd) 02713 { 02714 rb_thread_wait_fd_rw(fd, 1); 02715 } 02716 02717 int 02718 rb_thread_fd_writable(int fd) 02719 { 02720 rb_thread_wait_fd_rw(fd, 0); 02721 return TRUE; 02722 } 02723 02724 int 02725 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except, 02726 struct timeval *timeout) 02727 { 02728 rb_fdset_t fdsets[3]; 02729 rb_fdset_t *rfds = NULL; 02730 rb_fdset_t *wfds = NULL; 02731 rb_fdset_t *efds = NULL; 02732 int retval; 02733 02734 if (read) { 02735 rfds = &fdsets[0]; 02736 rb_fd_init(rfds); 02737 rb_fd_copy(rfds, read, max); 02738 } 02739 if (write) { 02740 wfds = &fdsets[1]; 02741 rb_fd_init(wfds); 02742 rb_fd_copy(wfds, write, max); 02743 } 02744 if (except) { 02745 efds = &fdsets[2]; 02746 rb_fd_init(efds); 02747 rb_fd_copy(efds, except, max); 02748 } 02749 02750 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout); 02751 02752 if (rfds) { 02753 rb_fd_rcopy(read, rfds); 02754 rb_fd_term(rfds); 02755 } 02756 if (wfds) { 02757 rb_fd_rcopy(write, wfds); 02758 rb_fd_term(wfds); 02759 } 02760 if (efds) { 02761 rb_fd_rcopy(except, efds); 02762 rb_fd_term(efds); 02763 } 02764 02765 return retval; 02766 } 02767 02768 int 02769 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, 02770 struct timeval *timeout) 02771 { 02772 if (!read && !write && !except) { 02773 if (!timeout) { 02774 rb_thread_sleep_forever(); 02775 return 0; 02776 } 02777 rb_thread_wait_for(*timeout); 02778 return 0; 02779 } 02780 02781 if (read) { 02782 rb_fd_resize(max - 1, read); 02783 } 02784 if (write) { 02785 rb_fd_resize(max - 1, write); 02786 } 02787 if (except) { 02788 rb_fd_resize(max - 1, except); 02789 } 02790 return do_select(max, read, write, except, timeout); 02791 } 02792 02793 /* 02794 * poll() is supported by many OSes, but so far Linux is the only 02795 * one we know of that supports using poll() in all places select() 02796 * would work. 02797 */ 02798 #if defined(HAVE_POLL) && defined(linux) 02799 # define USE_POLL 02800 #endif 02801 02802 #ifdef USE_POLL 02803 02804 /* The same with linux kernel. TODO: make platform independent definition. */ 02805 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 02806 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 02807 #define POLLEX_SET (POLLPRI) 02808 02809 #define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) 02810 #define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0) 02811 02812 #ifndef HAVE_PPOLL 02813 /* TODO: don't ignore sigmask */ 02814 int ppoll(struct pollfd *fds, nfds_t nfds, 02815 const struct timespec *ts, const sigset_t *sigmask) 02816 { 02817 int timeout_ms; 02818 02819 if (ts) { 02820 int tmp, tmp2; 02821 02822 if (ts->tv_sec > TIMET_MAX/1000) 02823 timeout_ms = -1; 02824 else { 02825 tmp = ts->tv_sec * 1000; 02826 tmp2 = ts->tv_nsec / (1000 * 1000); 02827 if (TIMET_MAX - tmp < tmp2) 02828 timeout_ms = -1; 02829 else 02830 timeout_ms = tmp + tmp2; 02831 } 02832 } else 02833 timeout_ms = -1; 02834 02835 return poll(fds, nfds, timeout_ms); 02836 } 02837 #endif 02838 02839 /* 02840 * returns a mask of events 02841 */ 02842 int 02843 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02844 { 02845 struct pollfd fds; 02846 int result, lerrno; 02847 double limit = 0; 02848 struct timespec ts; 02849 struct timespec *timeout = NULL; 02850 02851 if (tv) { 02852 ts.tv_sec = tv->tv_sec; 02853 ts.tv_nsec = tv->tv_usec * 1000; 02854 limit = timeofday(); 02855 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; 02856 timeout = &ts; 02857 } 02858 02859 fds.fd = fd; 02860 fds.events = (short)events; 02861 02862 retry: 02863 lerrno = 0; 02864 BLOCKING_REGION({ 02865 result = ppoll(&fds, 1, timeout, NULL); 02866 if (result < 0) lerrno = errno; 02867 }, ubf_select, GET_THREAD()); 02868 02869 if (result < 0) { 02870 errno = lerrno; 02871 switch (errno) { 02872 case EINTR: 02873 #ifdef ERESTART 02874 case ERESTART: 02875 #endif 02876 if (timeout) { 02877 double d = limit - timeofday(); 02878 02879 ts.tv_sec = (long)d; 02880 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9); 02881 if (ts.tv_sec < 0) 02882 ts.tv_sec = 0; 02883 if (ts.tv_nsec < 0) 02884 ts.tv_nsec = 0; 02885 } 02886 goto retry; 02887 } 02888 return -1; 02889 } 02890 02891 if (fds.revents & POLLNVAL) { 02892 errno = EBADF; 02893 return -1; 02894 } 02895 02896 /* 02897 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit. 02898 * Therefore we need fix it up. 02899 */ 02900 result = 0; 02901 if (fds.revents & POLLIN_SET) 02902 result |= RB_WAITFD_IN; 02903 if (fds.revents & POLLOUT_SET) 02904 result |= RB_WAITFD_OUT; 02905 if (fds.revents & POLLEX_SET) 02906 result |= RB_WAITFD_PRI; 02907 02908 return result; 02909 } 02910 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ 02911 static rb_fdset_t *init_set_fd(int fd, rb_fdset_t *fds) 02912 { 02913 rb_fd_init(fds); 02914 rb_fd_set(fd, fds); 02915 02916 return fds; 02917 } 02918 02919 struct select_args { 02920 union { 02921 int fd; 02922 int error; 02923 } as; 02924 rb_fdset_t *read; 02925 rb_fdset_t *write; 02926 rb_fdset_t *except; 02927 struct timeval *tv; 02928 }; 02929 02930 static VALUE 02931 select_single(VALUE ptr) 02932 { 02933 struct select_args *args = (struct select_args *)ptr; 02934 int r; 02935 02936 r = rb_thread_fd_select(args->as.fd + 1, 02937 args->read, args->write, args->except, args->tv); 02938 if (r == -1) 02939 args->as.error = errno; 02940 if (r > 0) { 02941 r = 0; 02942 if (args->read && rb_fd_isset(args->as.fd, args->read)) 02943 r |= RB_WAITFD_IN; 02944 if (args->write && rb_fd_isset(args->as.fd, args->write)) 02945 r |= RB_WAITFD_OUT; 02946 if (args->except && rb_fd_isset(args->as.fd, args->except)) 02947 r |= RB_WAITFD_PRI; 02948 } 02949 return (VALUE)r; 02950 } 02951 02952 static VALUE 02953 select_single_cleanup(VALUE ptr) 02954 { 02955 struct select_args *args = (struct select_args *)ptr; 02956 02957 if (args->read) rb_fd_term(args->read); 02958 if (args->write) rb_fd_term(args->write); 02959 if (args->except) rb_fd_term(args->except); 02960 02961 return (VALUE)-1; 02962 } 02963 02964 int 02965 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02966 { 02967 rb_fdset_t rfds, wfds, efds; 02968 struct select_args args; 02969 int r; 02970 VALUE ptr = (VALUE)&args; 02971 02972 args.as.fd = fd; 02973 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; 02974 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; 02975 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; 02976 args.tv = tv; 02977 02978 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); 02979 if (r == -1) 02980 errno = args.as.error; 02981 02982 return r; 02983 } 02984 #endif /* ! USE_POLL */ 02985 02986 /* 02987 * for GC 02988 */ 02989 02990 #ifdef USE_CONSERVATIVE_STACK_END 02991 void 02992 rb_gc_set_stack_end(VALUE **stack_end_p) 02993 { 02994 VALUE stack_end; 02995 *stack_end_p = &stack_end; 02996 } 02997 #endif 02998 02999 void 03000 rb_gc_save_machine_context(rb_thread_t *th) 03001 { 03002 FLUSH_REGISTER_WINDOWS; 03003 #ifdef __ia64 03004 th->machine_register_stack_end = rb_ia64_bsp(); 03005 #endif 03006 setjmp(th->machine_regs); 03007 } 03008 03009 /* 03010 * 03011 */ 03012 03013 void 03014 rb_threadptr_check_signal(rb_thread_t *mth) 03015 { 03016 /* mth must be main_thread */ 03017 if (rb_signal_buff_size() > 0) { 03018 /* wakeup main thread */ 03019 rb_threadptr_interrupt(mth); 03020 } 03021 } 03022 03023 static void 03024 timer_thread_function(void *arg) 03025 { 03026 rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ 03027 03028 /* for time slice */ 03029 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); 03030 03031 /* check signal */ 03032 rb_threadptr_check_signal(vm->main_thread); 03033 03034 #if 0 03035 /* prove profiler */ 03036 if (vm->prove_profile.enable) { 03037 rb_thread_t *th = vm->running_thread; 03038 03039 if (vm->during_gc) { 03040 /* GC prove profiling */ 03041 } 03042 } 03043 #endif 03044 } 03045 03046 void 03047 rb_thread_stop_timer_thread(int close_anyway) 03048 { 03049 if (timer_thread_id && native_stop_timer_thread(close_anyway)) { 03050 native_reset_timer_thread(); 03051 } 03052 } 03053 03054 void 03055 rb_thread_reset_timer_thread(void) 03056 { 03057 native_reset_timer_thread(); 03058 } 03059 03060 void 03061 rb_thread_start_timer_thread(void) 03062 { 03063 system_working = 1; 03064 rb_thread_create_timer_thread(); 03065 } 03066 03067 static int 03068 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) 03069 { 03070 int i; 03071 VALUE lines = (VALUE)val; 03072 03073 for (i = 0; i < RARRAY_LEN(lines); i++) { 03074 if (RARRAY_PTR(lines)[i] != Qnil) { 03075 RARRAY_PTR(lines)[i] = INT2FIX(0); 03076 } 03077 } 03078 return ST_CONTINUE; 03079 } 03080 03081 static void 03082 clear_coverage(void) 03083 { 03084 VALUE coverages = rb_get_coverages(); 03085 if (RTEST(coverages)) { 03086 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0); 03087 } 03088 } 03089 03090 static void 03091 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t)) 03092 { 03093 rb_thread_t *th = GET_THREAD(); 03094 rb_vm_t *vm = th->vm; 03095 VALUE thval = th->self; 03096 vm->main_thread = th; 03097 03098 gvl_atfork(th->vm); 03099 st_foreach(vm->living_threads, atfork, (st_data_t)th); 03100 st_clear(vm->living_threads); 03101 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id); 03102 vm->sleeper = 0; 03103 clear_coverage(); 03104 } 03105 03106 static int 03107 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th) 03108 { 03109 VALUE thval = key; 03110 rb_thread_t *th; 03111 GetThreadPtr(thval, th); 03112 03113 if (th != (rb_thread_t *)current_th) { 03114 rb_mutex_abandon_keeping_mutexes(th); 03115 rb_mutex_abandon_locking_mutex(th); 03116 thread_cleanup_func(th, TRUE); 03117 } 03118 return ST_CONTINUE; 03119 } 03120 03121 void 03122 rb_thread_atfork(void) 03123 { 03124 rb_thread_atfork_internal(terminate_atfork_i); 03125 GET_THREAD()->join_list_head = 0; 03126 03127 /* We don't want reproduce CVE-2003-0900. */ 03128 rb_reset_random_seed(); 03129 } 03130 03131 static int 03132 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th) 03133 { 03134 VALUE thval = key; 03135 rb_thread_t *th; 03136 GetThreadPtr(thval, th); 03137 03138 if (th != (rb_thread_t *)current_th) { 03139 thread_cleanup_func_before_exec(th); 03140 } 03141 return ST_CONTINUE; 03142 } 03143 03144 void 03145 rb_thread_atfork_before_exec(void) 03146 { 03147 rb_thread_atfork_internal(terminate_atfork_before_exec_i); 03148 } 03149 03150 struct thgroup { 03151 int enclosed; 03152 VALUE group; 03153 }; 03154 03155 static size_t 03156 thgroup_memsize(const void *ptr) 03157 { 03158 return ptr ? sizeof(struct thgroup) : 0; 03159 } 03160 03161 static const rb_data_type_t thgroup_data_type = { 03162 "thgroup", 03163 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 03164 }; 03165 03166 /* 03167 * Document-class: ThreadGroup 03168 * 03169 * <code>ThreadGroup</code> provides a means of keeping track of a number of 03170 * threads as a group. A <code>Thread</code> can belong to only one 03171 * <code>ThreadGroup</code> at a time; adding a thread to a new group will 03172 * remove it from any previous group. 03173 * 03174 * Newly created threads belong to the same group as the thread from which they 03175 * were created. 03176 */ 03177 03178 static VALUE 03179 thgroup_s_alloc(VALUE klass) 03180 { 03181 VALUE group; 03182 struct thgroup *data; 03183 03184 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); 03185 data->enclosed = 0; 03186 data->group = group; 03187 03188 return group; 03189 } 03190 03191 struct thgroup_list_params { 03192 VALUE ary; 03193 VALUE group; 03194 }; 03195 03196 static int 03197 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data) 03198 { 03199 VALUE thread = (VALUE)key; 03200 VALUE ary = ((struct thgroup_list_params *)data)->ary; 03201 VALUE group = ((struct thgroup_list_params *)data)->group; 03202 rb_thread_t *th; 03203 GetThreadPtr(thread, th); 03204 03205 if (th->thgroup == group) { 03206 rb_ary_push(ary, thread); 03207 } 03208 return ST_CONTINUE; 03209 } 03210 03211 /* 03212 * call-seq: 03213 * thgrp.list -> array 03214 * 03215 * Returns an array of all existing <code>Thread</code> objects that belong to 03216 * this group. 03217 * 03218 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>] 03219 */ 03220 03221 static VALUE 03222 thgroup_list(VALUE group) 03223 { 03224 VALUE ary = rb_ary_new(); 03225 struct thgroup_list_params param; 03226 03227 param.ary = ary; 03228 param.group = group; 03229 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param); 03230 return ary; 03231 } 03232 03233 03234 /* 03235 * call-seq: 03236 * thgrp.enclose -> thgrp 03237 * 03238 * Prevents threads from being added to or removed from the receiving 03239 * <code>ThreadGroup</code>. New threads can still be started in an enclosed 03240 * <code>ThreadGroup</code>. 03241 * 03242 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> 03243 * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> 03244 * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> 03245 * tg.add thr 03246 * 03247 * <em>produces:</em> 03248 * 03249 * ThreadError: can't move from the enclosed thread group 03250 */ 03251 03252 static VALUE 03253 thgroup_enclose(VALUE group) 03254 { 03255 struct thgroup *data; 03256 03257 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03258 data->enclosed = 1; 03259 03260 return group; 03261 } 03262 03263 03264 /* 03265 * call-seq: 03266 * thgrp.enclosed? -> true or false 03267 * 03268 * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also 03269 * ThreadGroup#enclose. 03270 */ 03271 03272 static VALUE 03273 thgroup_enclosed_p(VALUE group) 03274 { 03275 struct thgroup *data; 03276 03277 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03278 if (data->enclosed) 03279 return Qtrue; 03280 return Qfalse; 03281 } 03282 03283 03284 /* 03285 * call-seq: 03286 * thgrp.add(thread) -> thgrp 03287 * 03288 * Adds the given <em>thread</em> to this group, removing it from any other 03289 * group to which it may have previously belonged. 03290 * 03291 * puts "Initial group is #{ThreadGroup::Default.list}" 03292 * tg = ThreadGroup.new 03293 * t1 = Thread.new { sleep } 03294 * t2 = Thread.new { sleep } 03295 * puts "t1 is #{t1}" 03296 * puts "t2 is #{t2}" 03297 * tg.add(t1) 03298 * puts "Initial group now #{ThreadGroup::Default.list}" 03299 * puts "tg group now #{tg.list}" 03300 * 03301 * <em>produces:</em> 03302 * 03303 * Initial group is #<Thread:0x401bdf4c> 03304 * t1 is #<Thread:0x401b3c90> 03305 * t2 is #<Thread:0x401b3c18> 03306 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c> 03307 * tg group now #<Thread:0x401b3c90> 03308 */ 03309 03310 static VALUE 03311 thgroup_add(VALUE group, VALUE thread) 03312 { 03313 rb_thread_t *th; 03314 struct thgroup *data; 03315 03316 rb_secure(4); 03317 GetThreadPtr(thread, th); 03318 03319 if (OBJ_FROZEN(group)) { 03320 rb_raise(rb_eThreadError, "can't move to the frozen thread group"); 03321 } 03322 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03323 if (data->enclosed) { 03324 rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); 03325 } 03326 03327 if (!th->thgroup) { 03328 return Qnil; 03329 } 03330 03331 if (OBJ_FROZEN(th->thgroup)) { 03332 rb_raise(rb_eThreadError, "can't move from the frozen thread group"); 03333 } 03334 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); 03335 if (data->enclosed) { 03336 rb_raise(rb_eThreadError, 03337 "can't move from the enclosed thread group"); 03338 } 03339 03340 th->thgroup = group; 03341 return group; 03342 } 03343 03344 03345 /* 03346 * Document-class: Mutex 03347 * 03348 * Mutex implements a simple semaphore that can be used to coordinate access to 03349 * shared data from multiple concurrent threads. 03350 * 03351 * Example: 03352 * 03353 * require 'thread' 03354 * semaphore = Mutex.new 03355 * 03356 * a = Thread.new { 03357 * semaphore.synchronize { 03358 * # access shared resource 03359 * } 03360 * } 03361 * 03362 * b = Thread.new { 03363 * semaphore.synchronize { 03364 * # access shared resource 03365 * } 03366 * } 03367 * 03368 */ 03369 03370 #define GetMutexPtr(obj, tobj) \ 03371 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 03372 03373 #define mutex_mark NULL 03374 03375 static void 03376 mutex_free(void *ptr) 03377 { 03378 if (ptr) { 03379 rb_mutex_t *mutex = ptr; 03380 if (mutex->th) { 03381 /* rb_warn("free locked mutex"); */ 03382 const char *err = rb_mutex_unlock_th(mutex, mutex->th); 03383 if (err) rb_bug("%s", err); 03384 } 03385 native_mutex_destroy(&mutex->lock); 03386 native_cond_destroy(&mutex->cond); 03387 } 03388 ruby_xfree(ptr); 03389 } 03390 03391 static size_t 03392 mutex_memsize(const void *ptr) 03393 { 03394 return ptr ? sizeof(rb_mutex_t) : 0; 03395 } 03396 03397 static const rb_data_type_t mutex_data_type = { 03398 "mutex", 03399 {mutex_mark, mutex_free, mutex_memsize,}, 03400 }; 03401 03402 VALUE 03403 rb_obj_is_mutex(VALUE obj) 03404 { 03405 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { 03406 return Qtrue; 03407 } 03408 else { 03409 return Qfalse; 03410 } 03411 } 03412 03413 static VALUE 03414 mutex_alloc(VALUE klass) 03415 { 03416 VALUE volatile obj; 03417 rb_mutex_t *mutex; 03418 03419 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); 03420 native_mutex_initialize(&mutex->lock); 03421 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); 03422 return obj; 03423 } 03424 03425 /* 03426 * call-seq: 03427 * Mutex.new -> mutex 03428 * 03429 * Creates a new Mutex 03430 */ 03431 static VALUE 03432 mutex_initialize(VALUE self) 03433 { 03434 return self; 03435 } 03436 03437 VALUE 03438 rb_mutex_new(void) 03439 { 03440 return mutex_alloc(rb_cMutex); 03441 } 03442 03443 /* 03444 * call-seq: 03445 * mutex.locked? -> true or false 03446 * 03447 * Returns +true+ if this lock is currently held by some thread. 03448 */ 03449 VALUE 03450 rb_mutex_locked_p(VALUE self) 03451 { 03452 rb_mutex_t *mutex; 03453 GetMutexPtr(self, mutex); 03454 return mutex->th ? Qtrue : Qfalse; 03455 } 03456 03457 static void 03458 mutex_locked(rb_thread_t *th, VALUE self) 03459 { 03460 rb_mutex_t *mutex; 03461 GetMutexPtr(self, mutex); 03462 03463 if (th->keeping_mutexes) { 03464 mutex->next_mutex = th->keeping_mutexes; 03465 } 03466 th->keeping_mutexes = mutex; 03467 } 03468 03469 /* 03470 * call-seq: 03471 * mutex.try_lock -> true or false 03472 * 03473 * Attempts to obtain the lock and returns immediately. Returns +true+ if the 03474 * lock was granted. 03475 */ 03476 VALUE 03477 rb_mutex_trylock(VALUE self) 03478 { 03479 rb_mutex_t *mutex; 03480 VALUE locked = Qfalse; 03481 GetMutexPtr(self, mutex); 03482 03483 native_mutex_lock(&mutex->lock); 03484 if (mutex->th == 0) { 03485 mutex->th = GET_THREAD(); 03486 locked = Qtrue; 03487 03488 mutex_locked(GET_THREAD(), self); 03489 } 03490 native_mutex_unlock(&mutex->lock); 03491 03492 return locked; 03493 } 03494 03495 static int 03496 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) 03497 { 03498 int interrupted = 0; 03499 int err = 0; 03500 03501 mutex->cond_waiting++; 03502 for (;;) { 03503 if (!mutex->th) { 03504 mutex->th = th; 03505 break; 03506 } 03507 if (RUBY_VM_INTERRUPTED(th)) { 03508 interrupted = 1; 03509 break; 03510 } 03511 if (err == ETIMEDOUT) { 03512 interrupted = 2; 03513 break; 03514 } 03515 03516 if (timeout_ms) { 03517 struct timespec timeout_rel; 03518 struct timespec timeout; 03519 03520 timeout_rel.tv_sec = 0; 03521 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; 03522 timeout = native_cond_timeout(&mutex->cond, timeout_rel); 03523 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); 03524 } 03525 else { 03526 native_cond_wait(&mutex->cond, &mutex->lock); 03527 err = 0; 03528 } 03529 } 03530 mutex->cond_waiting--; 03531 03532 return interrupted; 03533 } 03534 03535 static void 03536 lock_interrupt(void *ptr) 03537 { 03538 rb_mutex_t *mutex = (rb_mutex_t *)ptr; 03539 native_mutex_lock(&mutex->lock); 03540 if (mutex->cond_waiting > 0) 03541 native_cond_broadcast(&mutex->cond); 03542 native_mutex_unlock(&mutex->lock); 03543 } 03544 03545 /* 03546 * At maximum, only one thread can use cond_timedwait and watch deadlock 03547 * periodically. Multiple polling thread (i.e. concurrent deadlock check) 03548 * introduces new race conditions. [Bug #6278] [ruby-core:44275] 03549 */ 03550 rb_thread_t *patrol_thread = NULL; 03551 03552 /* 03553 * call-seq: 03554 * mutex.lock -> self 03555 * 03556 * Attempts to grab the lock and waits if it isn't available. 03557 * Raises +ThreadError+ if +mutex+ was locked by the current thread. 03558 */ 03559 VALUE 03560 rb_mutex_lock(VALUE self) 03561 { 03562 03563 if (rb_mutex_trylock(self) == Qfalse) { 03564 rb_mutex_t *mutex; 03565 rb_thread_t *th = GET_THREAD(); 03566 GetMutexPtr(self, mutex); 03567 03568 if (mutex->th == GET_THREAD()) { 03569 rb_raise(rb_eThreadError, "deadlock; recursive locking"); 03570 } 03571 03572 while (mutex->th != th) { 03573 int interrupted; 03574 enum rb_thread_status prev_status = th->status; 03575 int timeout_ms = 0; 03576 struct rb_unblock_callback oldubf; 03577 03578 set_unblock_function(th, lock_interrupt, mutex, &oldubf); 03579 th->status = THREAD_STOPPED_FOREVER; 03580 th->locking_mutex = self; 03581 03582 native_mutex_lock(&mutex->lock); 03583 th->vm->sleeper++; 03584 /* 03585 * Carefully! while some contended threads are in lock_func(), 03586 * vm->sleepr is unstable value. we have to avoid both deadlock 03587 * and busy loop. 03588 */ 03589 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && 03590 !patrol_thread) { 03591 timeout_ms = 100; 03592 patrol_thread = th; 03593 } 03594 03595 GVL_UNLOCK_BEGIN(); 03596 interrupted = lock_func(th, mutex, timeout_ms); 03597 native_mutex_unlock(&mutex->lock); 03598 GVL_UNLOCK_END(); 03599 03600 if (patrol_thread == th) 03601 patrol_thread = NULL; 03602 03603 reset_unblock_function(th, &oldubf); 03604 03605 th->locking_mutex = Qfalse; 03606 if (mutex->th && interrupted == 2) { 03607 rb_check_deadlock(th->vm); 03608 } 03609 if (th->status == THREAD_STOPPED_FOREVER) { 03610 th->status = prev_status; 03611 } 03612 th->vm->sleeper--; 03613 03614 if (mutex->th == th) mutex_locked(th, self); 03615 03616 if (interrupted) { 03617 RUBY_VM_CHECK_INTS(); 03618 } 03619 } 03620 } 03621 return self; 03622 } 03623 03624 static const char * 03625 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) 03626 { 03627 const char *err = NULL; 03628 rb_mutex_t *th_mutex; 03629 03630 native_mutex_lock(&mutex->lock); 03631 03632 if (mutex->th == 0) { 03633 err = "Attempt to unlock a mutex which is not locked"; 03634 } 03635 else if (mutex->th != th) { 03636 err = "Attempt to unlock a mutex which is locked by another thread"; 03637 } 03638 else { 03639 mutex->th = 0; 03640 if (mutex->cond_waiting > 0) 03641 native_cond_signal(&mutex->cond); 03642 } 03643 03644 native_mutex_unlock(&mutex->lock); 03645 03646 if (!err) { 03647 th_mutex = th->keeping_mutexes; 03648 if (th_mutex == mutex) { 03649 th->keeping_mutexes = mutex->next_mutex; 03650 } 03651 else { 03652 while (1) { 03653 rb_mutex_t *tmp_mutex; 03654 tmp_mutex = th_mutex->next_mutex; 03655 if (tmp_mutex == mutex) { 03656 th_mutex->next_mutex = tmp_mutex->next_mutex; 03657 break; 03658 } 03659 th_mutex = tmp_mutex; 03660 } 03661 } 03662 mutex->next_mutex = NULL; 03663 } 03664 03665 return err; 03666 } 03667 03668 /* 03669 * call-seq: 03670 * mutex.unlock -> self 03671 * 03672 * Releases the lock. 03673 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. 03674 */ 03675 VALUE 03676 rb_mutex_unlock(VALUE self) 03677 { 03678 const char *err; 03679 rb_mutex_t *mutex; 03680 GetMutexPtr(self, mutex); 03681 03682 err = rb_mutex_unlock_th(mutex, GET_THREAD()); 03683 if (err) rb_raise(rb_eThreadError, "%s", err); 03684 03685 return self; 03686 } 03687 03688 static void 03689 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th) 03690 { 03691 if (th->keeping_mutexes) { 03692 rb_mutex_abandon_all(th->keeping_mutexes); 03693 } 03694 th->keeping_mutexes = NULL; 03695 } 03696 03697 static void 03698 rb_mutex_abandon_locking_mutex(rb_thread_t *th) 03699 { 03700 rb_mutex_t *mutex; 03701 03702 if (!th->locking_mutex) return; 03703 03704 GetMutexPtr(th->locking_mutex, mutex); 03705 if (mutex->th == th) 03706 rb_mutex_abandon_all(mutex); 03707 th->locking_mutex = Qfalse; 03708 } 03709 03710 static void 03711 rb_mutex_abandon_all(rb_mutex_t *mutexes) 03712 { 03713 rb_mutex_t *mutex; 03714 03715 while (mutexes) { 03716 mutex = mutexes; 03717 mutexes = mutex->next_mutex; 03718 mutex->th = 0; 03719 mutex->next_mutex = 0; 03720 } 03721 } 03722 03723 static VALUE 03724 rb_mutex_sleep_forever(VALUE time) 03725 { 03726 rb_thread_sleep_deadly(); 03727 return Qnil; 03728 } 03729 03730 static VALUE 03731 rb_mutex_wait_for(VALUE time) 03732 { 03733 const struct timeval *t = (struct timeval *)time; 03734 rb_thread_wait_for(*t); 03735 return Qnil; 03736 } 03737 03738 VALUE 03739 rb_mutex_sleep(VALUE self, VALUE timeout) 03740 { 03741 time_t beg, end; 03742 struct timeval t; 03743 03744 if (!NIL_P(timeout)) { 03745 t = rb_time_interval(timeout); 03746 } 03747 rb_mutex_unlock(self); 03748 beg = time(0); 03749 if (NIL_P(timeout)) { 03750 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); 03751 } 03752 else { 03753 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); 03754 } 03755 end = time(0) - beg; 03756 return INT2FIX(end); 03757 } 03758 03759 /* 03760 * call-seq: 03761 * mutex.sleep(timeout = nil) -> number 03762 * 03763 * Releases the lock and sleeps +timeout+ seconds if it is given and 03764 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by 03765 * the current thread. 03766 */ 03767 static VALUE 03768 mutex_sleep(int argc, VALUE *argv, VALUE self) 03769 { 03770 VALUE timeout; 03771 03772 rb_scan_args(argc, argv, "01", &timeout); 03773 return rb_mutex_sleep(self, timeout); 03774 } 03775 03776 /* 03777 * call-seq: 03778 * mutex.synchronize { ... } -> result of the block 03779 * 03780 * Obtains a lock, runs the block, and releases the lock when the block 03781 * completes. See the example under +Mutex+. 03782 */ 03783 03784 VALUE 03785 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) 03786 { 03787 rb_mutex_lock(mutex); 03788 return rb_ensure(func, arg, rb_mutex_unlock, mutex); 03789 } 03790 03791 /* 03792 * Document-class: Barrier 03793 */ 03794 static void 03795 barrier_mark(void *ptr) 03796 { 03797 rb_gc_mark((VALUE)ptr); 03798 } 03799 03800 static const rb_data_type_t barrier_data_type = { 03801 "barrier", 03802 {barrier_mark, 0, 0,}, 03803 }; 03804 03805 static VALUE 03806 barrier_alloc(VALUE klass) 03807 { 03808 return TypedData_Wrap_Struct(klass, &barrier_data_type, (void *)mutex_alloc(0)); 03809 } 03810 03811 #define GetBarrierPtr(obj) ((VALUE)rb_check_typeddata((obj), &barrier_data_type)) 03812 03813 VALUE 03814 rb_barrier_new(void) 03815 { 03816 VALUE barrier = barrier_alloc(rb_cBarrier); 03817 rb_mutex_lock((VALUE)DATA_PTR(barrier)); 03818 return barrier; 03819 } 03820 03821 VALUE 03822 rb_barrier_wait(VALUE self) 03823 { 03824 VALUE mutex = GetBarrierPtr(self); 03825 rb_mutex_t *m; 03826 03827 if (!mutex) return Qfalse; 03828 GetMutexPtr(mutex, m); 03829 if (m->th == GET_THREAD()) return Qfalse; 03830 rb_mutex_lock(mutex); 03831 if (DATA_PTR(self)) return Qtrue; 03832 rb_mutex_unlock(mutex); 03833 return Qfalse; 03834 } 03835 03836 VALUE 03837 rb_barrier_release(VALUE self) 03838 { 03839 return rb_mutex_unlock(GetBarrierPtr(self)); 03840 } 03841 03842 VALUE 03843 rb_barrier_destroy(VALUE self) 03844 { 03845 VALUE mutex = GetBarrierPtr(self); 03846 DATA_PTR(self) = 0; 03847 return rb_mutex_unlock(mutex); 03848 } 03849 03850 /* variables for recursive traversals */ 03851 static ID recursive_key; 03852 03853 /* 03854 * Returns the current "recursive list" used to detect recursion. 03855 * This list is a hash table, unique for the current thread and for 03856 * the current __callee__. 03857 */ 03858 03859 static VALUE 03860 recursive_list_access(void) 03861 { 03862 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key); 03863 VALUE sym = ID2SYM(rb_frame_this_func()); 03864 VALUE list; 03865 if (NIL_P(hash) || TYPE(hash) != T_HASH) { 03866 hash = rb_hash_new(); 03867 OBJ_UNTRUST(hash); 03868 rb_thread_local_aset(rb_thread_current(), recursive_key, hash); 03869 list = Qnil; 03870 } 03871 else { 03872 list = rb_hash_aref(hash, sym); 03873 } 03874 if (NIL_P(list) || TYPE(list) != T_HASH) { 03875 list = rb_hash_new(); 03876 OBJ_UNTRUST(list); 03877 rb_hash_aset(hash, sym, list); 03878 } 03879 return list; 03880 } 03881 03882 /* 03883 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already 03884 * in the recursion list. 03885 * Assumes the recursion list is valid. 03886 */ 03887 03888 static VALUE 03889 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) 03890 { 03891 #if SIZEOF_LONG == SIZEOF_VOIDP 03892 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) 03893 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP 03894 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ 03895 rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) 03896 #endif 03897 03898 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); 03899 if (pair_list == Qundef) 03900 return Qfalse; 03901 if (paired_obj_id) { 03902 if (TYPE(pair_list) != T_HASH) { 03903 if (!OBJ_ID_EQL(paired_obj_id, pair_list)) 03904 return Qfalse; 03905 } 03906 else { 03907 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) 03908 return Qfalse; 03909 } 03910 } 03911 return Qtrue; 03912 } 03913 03914 /* 03915 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. 03916 * For a single obj_id, it sets list[obj_id] to Qtrue. 03917 * For a pair, it sets list[obj_id] to paired_obj_id if possible, 03918 * otherwise list[obj_id] becomes a hash like: 03919 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } 03920 * Assumes the recursion list is valid. 03921 */ 03922 03923 static void 03924 recursive_push(VALUE list, VALUE obj, VALUE paired_obj) 03925 { 03926 VALUE pair_list; 03927 03928 if (!paired_obj) { 03929 rb_hash_aset(list, obj, Qtrue); 03930 } 03931 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { 03932 rb_hash_aset(list, obj, paired_obj); 03933 } 03934 else { 03935 if (TYPE(pair_list) != T_HASH){ 03936 VALUE other_paired_obj = pair_list; 03937 pair_list = rb_hash_new(); 03938 OBJ_UNTRUST(pair_list); 03939 rb_hash_aset(pair_list, other_paired_obj, Qtrue); 03940 rb_hash_aset(list, obj, pair_list); 03941 } 03942 rb_hash_aset(pair_list, paired_obj, Qtrue); 03943 } 03944 } 03945 03946 /* 03947 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. 03948 * For a pair, if list[obj_id] is a hash, then paired_obj_id is 03949 * removed from the hash and no attempt is made to simplify 03950 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id 03951 * Assumes the recursion list is valid. 03952 */ 03953 03954 static void 03955 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) 03956 { 03957 if (paired_obj) { 03958 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); 03959 if (pair_list == Qundef) { 03960 VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func())); 03961 VALUE thrname = rb_inspect(rb_thread_current()); 03962 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s", 03963 StringValuePtr(symname), StringValuePtr(thrname)); 03964 } 03965 if (TYPE(pair_list) == T_HASH) { 03966 rb_hash_delete(pair_list, paired_obj); 03967 if (!RHASH_EMPTY_P(pair_list)) { 03968 return; /* keep hash until is empty */ 03969 } 03970 } 03971 } 03972 rb_hash_delete(list, obj); 03973 } 03974 03975 struct exec_recursive_params { 03976 VALUE (*func) (VALUE, VALUE, int); 03977 VALUE list; 03978 VALUE obj; 03979 VALUE objid; 03980 VALUE pairid; 03981 VALUE arg; 03982 }; 03983 03984 static VALUE 03985 exec_recursive_i(VALUE tag, struct exec_recursive_params *p) 03986 { 03987 VALUE result = Qundef; 03988 int state; 03989 03990 recursive_push(p->list, p->objid, p->pairid); 03991 PUSH_TAG(); 03992 if ((state = EXEC_TAG()) == 0) { 03993 result = (*p->func)(p->obj, p->arg, FALSE); 03994 } 03995 POP_TAG(); 03996 recursive_pop(p->list, p->objid, p->pairid); 03997 if (state) 03998 JUMP_TAG(state); 03999 return result; 04000 } 04001 04002 /* 04003 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04004 * current method is called recursively on obj, or on the pair <obj, pairid> 04005 * If outer is 0, then the innermost func will be called with recursive set 04006 * to Qtrue, otherwise the outermost func will be called. In the latter case, 04007 * all inner func are short-circuited by throw. 04008 * Implementation details: the value thrown is the recursive list which is 04009 * proper to the current method and unlikely to be catched anywhere else. 04010 * list[recursive_key] is used as a flag for the outermost call. 04011 */ 04012 04013 static VALUE 04014 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) 04015 { 04016 VALUE result = Qundef; 04017 struct exec_recursive_params p; 04018 int outermost; 04019 p.list = recursive_list_access(); 04020 p.objid = rb_obj_id(obj); 04021 p.obj = obj; 04022 p.pairid = pairid; 04023 p.arg = arg; 04024 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); 04025 04026 if (recursive_check(p.list, p.objid, pairid)) { 04027 if (outer && !outermost) { 04028 rb_throw_obj(p.list, p.list); 04029 } 04030 return (*func)(obj, arg, TRUE); 04031 } 04032 else { 04033 p.func = func; 04034 04035 if (outermost) { 04036 recursive_push(p.list, ID2SYM(recursive_key), 0); 04037 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p); 04038 recursive_pop(p.list, ID2SYM(recursive_key), 0); 04039 if (result == p.list) { 04040 result = (*func)(obj, arg, TRUE); 04041 } 04042 } 04043 else { 04044 result = exec_recursive_i(0, &p); 04045 } 04046 } 04047 *(volatile struct exec_recursive_params *)&p; 04048 return result; 04049 } 04050 04051 /* 04052 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04053 * current method is called recursively on obj 04054 */ 04055 04056 VALUE 04057 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04058 { 04059 return exec_recursive(func, obj, 0, arg, 0); 04060 } 04061 04062 /* 04063 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04064 * current method is called recursively on the ordered pair <obj, paired_obj> 04065 */ 04066 04067 VALUE 04068 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 04069 { 04070 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); 04071 } 04072 04073 /* 04074 * If recursion is detected on the current method and obj, the outermost 04075 * func will be called with (obj, arg, Qtrue). All inner func will be 04076 * short-circuited using throw. 04077 */ 04078 04079 VALUE 04080 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04081 { 04082 return exec_recursive(func, obj, 0, arg, 1); 04083 } 04084 04085 /* tracer */ 04086 #define RUBY_EVENT_REMOVED 0x1000000 04087 04088 enum { 04089 EVENT_RUNNING_NOTHING, 04090 EVENT_RUNNING_TRACE = 1, 04091 EVENT_RUNNING_THREAD = 2, 04092 EVENT_RUNNING_VM = 4, 04093 EVENT_RUNNING_EVENT_MASK = EVENT_RUNNING_VM|EVENT_RUNNING_THREAD 04094 }; 04095 04096 static VALUE thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always, int pop_p); 04097 04098 struct event_call_args { 04099 rb_thread_t *th; 04100 VALUE klass; 04101 VALUE self; 04102 VALUE proc; 04103 ID id; 04104 rb_event_flag_t event; 04105 }; 04106 04107 static rb_event_hook_t * 04108 alloc_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04109 { 04110 rb_event_hook_t *hook = ALLOC(rb_event_hook_t); 04111 hook->func = func; 04112 hook->flag = events; 04113 hook->data = data; 04114 return hook; 04115 } 04116 04117 static void 04118 thread_reset_event_flags(rb_thread_t *th) 04119 { 04120 rb_event_hook_t *hook = th->event_hooks; 04121 rb_event_flag_t flag = th->event_flags & RUBY_EVENT_VM; 04122 04123 while (hook) { 04124 if (!(flag & RUBY_EVENT_REMOVED)) 04125 flag |= hook->flag; 04126 hook = hook->next; 04127 } 04128 th->event_flags = flag; 04129 } 04130 04131 static void 04132 rb_threadptr_add_event_hook(rb_thread_t *th, 04133 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04134 { 04135 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04136 hook->next = th->event_hooks; 04137 th->event_hooks = hook; 04138 thread_reset_event_flags(th); 04139 } 04140 04141 static rb_thread_t * 04142 thval2thread_t(VALUE thval) 04143 { 04144 rb_thread_t *th; 04145 GetThreadPtr(thval, th); 04146 return th; 04147 } 04148 04149 void 04150 rb_thread_add_event_hook(VALUE thval, 04151 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04152 { 04153 rb_threadptr_add_event_hook(thval2thread_t(thval), func, events, data); 04154 } 04155 04156 static int 04157 set_threads_event_flags_i(st_data_t key, st_data_t val, st_data_t flag) 04158 { 04159 VALUE thval = key; 04160 rb_thread_t *th; 04161 GetThreadPtr(thval, th); 04162 04163 if (flag) { 04164 th->event_flags |= RUBY_EVENT_VM; 04165 } 04166 else { 04167 th->event_flags &= (~RUBY_EVENT_VM); 04168 } 04169 return ST_CONTINUE; 04170 } 04171 04172 static void 04173 set_threads_event_flags(int flag) 04174 { 04175 st_foreach(GET_VM()->living_threads, set_threads_event_flags_i, (st_data_t) flag); 04176 } 04177 04178 static inline int 04179 exec_event_hooks(const rb_event_hook_t *hook, rb_event_flag_t flag, VALUE self, ID id, VALUE klass) 04180 { 04181 int removed = 0; 04182 for (; hook; hook = hook->next) { 04183 if (hook->flag & RUBY_EVENT_REMOVED) { 04184 removed++; 04185 continue; 04186 } 04187 if (flag & hook->flag) { 04188 (*hook->func)(flag, hook->data, self, id, klass); 04189 } 04190 } 04191 return removed; 04192 } 04193 04194 static int remove_defered_event_hook(rb_event_hook_t **root); 04195 04196 static VALUE 04197 thread_exec_event_hooks(VALUE args, int running) 04198 { 04199 struct event_call_args *argp = (struct event_call_args *)args; 04200 rb_thread_t *th = argp->th; 04201 rb_event_flag_t flag = argp->event; 04202 VALUE self = argp->self; 04203 ID id = argp->id; 04204 VALUE klass = argp->klass; 04205 const rb_event_flag_t wait_event = th->event_flags; 04206 int removed; 04207 04208 if (self == rb_mRubyVMFrozenCore) return 0; 04209 04210 if ((wait_event & flag) && !(running & EVENT_RUNNING_THREAD)) { 04211 th->tracing |= EVENT_RUNNING_THREAD; 04212 removed = exec_event_hooks(th->event_hooks, flag, self, id, klass); 04213 th->tracing &= ~EVENT_RUNNING_THREAD; 04214 if (removed) { 04215 remove_defered_event_hook(&th->event_hooks); 04216 } 04217 } 04218 if (wait_event & RUBY_EVENT_VM) { 04219 if (th->vm->event_hooks == NULL) { 04220 th->event_flags &= (~RUBY_EVENT_VM); 04221 } 04222 else if (!(running & EVENT_RUNNING_VM)) { 04223 th->tracing |= EVENT_RUNNING_VM; 04224 removed = exec_event_hooks(th->vm->event_hooks, flag, self, id, klass); 04225 th->tracing &= ~EVENT_RUNNING_VM; 04226 if (removed) { 04227 remove_defered_event_hook(&th->vm->event_hooks); 04228 } 04229 } 04230 } 04231 return 0; 04232 } 04233 04234 void 04235 rb_threadptr_exec_event_hooks(rb_thread_t *th, rb_event_flag_t flag, VALUE self, ID id, VALUE klass, int pop_p) 04236 { 04237 const VALUE errinfo = th->errinfo; 04238 struct event_call_args args; 04239 args.th = th; 04240 args.event = flag; 04241 args.self = self; 04242 args.id = id; 04243 args.klass = klass; 04244 args.proc = 0; 04245 thread_suppress_tracing(th, EVENT_RUNNING_EVENT_MASK, thread_exec_event_hooks, (VALUE)&args, FALSE, pop_p); 04246 th->errinfo = errinfo; 04247 } 04248 04249 void 04250 rb_add_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04251 { 04252 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04253 rb_vm_t *vm = GET_VM(); 04254 04255 hook->next = vm->event_hooks; 04256 vm->event_hooks = hook; 04257 04258 set_threads_event_flags(1); 04259 } 04260 04261 static int 04262 defer_remove_event_hook(rb_event_hook_t *hook, rb_event_hook_func_t func) 04263 { 04264 while (hook) { 04265 if (func == 0 || hook->func == func) { 04266 hook->flag |= RUBY_EVENT_REMOVED; 04267 } 04268 hook = hook->next; 04269 } 04270 return -1; 04271 } 04272 04273 static int 04274 remove_event_hook(rb_event_hook_t **root, rb_event_hook_func_t func) 04275 { 04276 rb_event_hook_t *hook = *root, *next; 04277 04278 while (hook) { 04279 next = hook->next; 04280 if (func == 0 || hook->func == func || (hook->flag & RUBY_EVENT_REMOVED)) { 04281 *root = next; 04282 xfree(hook); 04283 } 04284 else { 04285 root = &hook->next; 04286 } 04287 hook = next; 04288 } 04289 return -1; 04290 } 04291 04292 static int 04293 remove_defered_event_hook(rb_event_hook_t **root) 04294 { 04295 rb_event_hook_t *hook = *root, *next; 04296 04297 while (hook) { 04298 next = hook->next; 04299 if (hook->flag & RUBY_EVENT_REMOVED) { 04300 *root = next; 04301 xfree(hook); 04302 } 04303 else { 04304 root = &hook->next; 04305 } 04306 hook = next; 04307 } 04308 return -1; 04309 } 04310 04311 static int 04312 rb_threadptr_remove_event_hook(rb_thread_t *th, rb_event_hook_func_t func) 04313 { 04314 int ret; 04315 if (th->tracing & EVENT_RUNNING_THREAD) { 04316 ret = defer_remove_event_hook(th->event_hooks, func); 04317 } 04318 else { 04319 ret = remove_event_hook(&th->event_hooks, func); 04320 } 04321 thread_reset_event_flags(th); 04322 return ret; 04323 } 04324 04325 int 04326 rb_thread_remove_event_hook(VALUE thval, rb_event_hook_func_t func) 04327 { 04328 return rb_threadptr_remove_event_hook(thval2thread_t(thval), func); 04329 } 04330 04331 static rb_event_hook_t * 04332 search_live_hook(rb_event_hook_t *hook) 04333 { 04334 while (hook) { 04335 if (!(hook->flag & RUBY_EVENT_REMOVED)) 04336 return hook; 04337 hook = hook->next; 04338 } 04339 return NULL; 04340 } 04341 04342 static int 04343 running_vm_event_hooks(st_data_t key, st_data_t val, st_data_t data) 04344 { 04345 rb_thread_t *th = thval2thread_t((VALUE)key); 04346 if (!(th->tracing & EVENT_RUNNING_VM)) return ST_CONTINUE; 04347 *(rb_thread_t **)data = th; 04348 return ST_STOP; 04349 } 04350 04351 static rb_thread_t * 04352 vm_event_hooks_running_thread(rb_vm_t *vm) 04353 { 04354 rb_thread_t *found = NULL; 04355 st_foreach(vm->living_threads, running_vm_event_hooks, (st_data_t)&found); 04356 return found; 04357 } 04358 04359 int 04360 rb_remove_event_hook(rb_event_hook_func_t func) 04361 { 04362 rb_vm_t *vm = GET_VM(); 04363 rb_event_hook_t *hook = search_live_hook(vm->event_hooks); 04364 int ret; 04365 04366 if (vm_event_hooks_running_thread(vm)) { 04367 ret = defer_remove_event_hook(vm->event_hooks, func); 04368 } 04369 else { 04370 ret = remove_event_hook(&vm->event_hooks, func); 04371 } 04372 04373 if (hook && !search_live_hook(vm->event_hooks)) { 04374 set_threads_event_flags(0); 04375 } 04376 04377 return ret; 04378 } 04379 04380 static int 04381 clear_trace_func_i(st_data_t key, st_data_t val, st_data_t flag) 04382 { 04383 rb_thread_t *th; 04384 GetThreadPtr((VALUE)key, th); 04385 rb_threadptr_remove_event_hook(th, 0); 04386 return ST_CONTINUE; 04387 } 04388 04389 void 04390 rb_clear_trace_func(void) 04391 { 04392 st_foreach(GET_VM()->living_threads, clear_trace_func_i, (st_data_t) 0); 04393 rb_remove_event_hook(0); 04394 } 04395 04396 static void call_trace_func(rb_event_flag_t, VALUE data, VALUE self, ID id, VALUE klass); 04397 04398 /* 04399 * If recursion is detected on the current method, obj and paired_obj, 04400 * the outermost func will be called with (obj, arg, Qtrue). All inner 04401 * func will be short-circuited using throw. 04402 */ 04403 04404 VALUE 04405 rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 04406 { 04407 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1); 04408 } 04409 04410 /* 04411 * call-seq: 04412 * set_trace_func(proc) -> proc 04413 * set_trace_func(nil) -> nil 04414 * 04415 * Establishes _proc_ as the handler for tracing, or disables 04416 * tracing if the parameter is +nil+. _proc_ takes up 04417 * to six parameters: an event name, a filename, a line number, an 04418 * object id, a binding, and the name of a class. _proc_ is 04419 * invoked whenever an event occurs. Events are: <code>c-call</code> 04420 * (call a C-language routine), <code>c-return</code> (return from a 04421 * C-language routine), <code>call</code> (call a Ruby method), 04422 * <code>class</code> (start a class or module definition), 04423 * <code>end</code> (finish a class or module definition), 04424 * <code>line</code> (execute code on a new line), <code>raise</code> 04425 * (raise an exception), and <code>return</code> (return from a Ruby 04426 * method). Tracing is disabled within the context of _proc_. 04427 * 04428 * class Test 04429 * def test 04430 * a = 1 04431 * b = 2 04432 * end 04433 * end 04434 * 04435 * set_trace_func proc { |event, file, line, id, binding, classname| 04436 * printf "%8s %s:%-2d %10s %8s\n", event, file, line, id, classname 04437 * } 04438 * t = Test.new 04439 * t.test 04440 * 04441 * line prog.rb:11 false 04442 * c-call prog.rb:11 new Class 04443 * c-call prog.rb:11 initialize Object 04444 * c-return prog.rb:11 initialize Object 04445 * c-return prog.rb:11 new Class 04446 * line prog.rb:12 false 04447 * call prog.rb:2 test Test 04448 * line prog.rb:3 test Test 04449 * line prog.rb:4 test Test 04450 * return prog.rb:4 test Test 04451 */ 04452 04453 static VALUE 04454 set_trace_func(VALUE obj, VALUE trace) 04455 { 04456 rb_remove_event_hook(call_trace_func); 04457 04458 if (NIL_P(trace)) { 04459 GET_THREAD()->tracing = EVENT_RUNNING_NOTHING; 04460 return Qnil; 04461 } 04462 04463 if (!rb_obj_is_proc(trace)) { 04464 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04465 } 04466 04467 rb_add_event_hook(call_trace_func, RUBY_EVENT_ALL, trace); 04468 return trace; 04469 } 04470 04471 static void 04472 thread_add_trace_func(rb_thread_t *th, VALUE trace) 04473 { 04474 if (!rb_obj_is_proc(trace)) { 04475 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04476 } 04477 04478 rb_threadptr_add_event_hook(th, call_trace_func, RUBY_EVENT_ALL, trace); 04479 } 04480 04481 /* 04482 * call-seq: 04483 * thr.add_trace_func(proc) -> proc 04484 * 04485 * Adds _proc_ as a handler for tracing. 04486 * See <code>Thread#set_trace_func</code> and +set_trace_func+. 04487 */ 04488 04489 static VALUE 04490 thread_add_trace_func_m(VALUE obj, VALUE trace) 04491 { 04492 rb_thread_t *th; 04493 GetThreadPtr(obj, th); 04494 thread_add_trace_func(th, trace); 04495 return trace; 04496 } 04497 04498 /* 04499 * call-seq: 04500 * thr.set_trace_func(proc) -> proc 04501 * thr.set_trace_func(nil) -> nil 04502 * 04503 * Establishes _proc_ on _thr_ as the handler for tracing, or 04504 * disables tracing if the parameter is +nil+. 04505 * See +set_trace_func+. 04506 */ 04507 04508 static VALUE 04509 thread_set_trace_func_m(VALUE obj, VALUE trace) 04510 { 04511 rb_thread_t *th; 04512 GetThreadPtr(obj, th); 04513 rb_threadptr_remove_event_hook(th, call_trace_func); 04514 04515 if (NIL_P(trace)) { 04516 th->tracing = EVENT_RUNNING_NOTHING; 04517 return Qnil; 04518 } 04519 thread_add_trace_func(th, trace); 04520 return trace; 04521 } 04522 04523 static const char * 04524 get_event_name(rb_event_flag_t event) 04525 { 04526 switch (event) { 04527 case RUBY_EVENT_LINE: 04528 return "line"; 04529 case RUBY_EVENT_CLASS: 04530 return "class"; 04531 case RUBY_EVENT_END: 04532 return "end"; 04533 case RUBY_EVENT_CALL: 04534 return "call"; 04535 case RUBY_EVENT_RETURN: 04536 return "return"; 04537 case RUBY_EVENT_C_CALL: 04538 return "c-call"; 04539 case RUBY_EVENT_C_RETURN: 04540 return "c-return"; 04541 case RUBY_EVENT_RAISE: 04542 return "raise"; 04543 default: 04544 return "unknown"; 04545 } 04546 } 04547 04548 static VALUE 04549 call_trace_proc(VALUE args, int tracing) 04550 { 04551 struct event_call_args *p = (struct event_call_args *)args; 04552 const char *srcfile = rb_sourcefile(); 04553 VALUE eventname = rb_str_new2(get_event_name(p->event)); 04554 VALUE filename = srcfile ? rb_str_new2(srcfile) : Qnil; 04555 VALUE argv[6]; 04556 int line = rb_sourceline(); 04557 ID id = 0; 04558 VALUE klass = 0; 04559 04560 if (p->klass != 0) { 04561 id = p->id; 04562 klass = p->klass; 04563 } 04564 else { 04565 rb_thread_method_id_and_class(p->th, &id, &klass); 04566 } 04567 if (id == ID_ALLOCATOR) 04568 return Qnil; 04569 if (klass) { 04570 if (TYPE(klass) == T_ICLASS) { 04571 klass = RBASIC(klass)->klass; 04572 } 04573 else if (FL_TEST(klass, FL_SINGLETON)) { 04574 klass = rb_iv_get(klass, "__attached__"); 04575 } 04576 } 04577 04578 argv[0] = eventname; 04579 argv[1] = filename; 04580 argv[2] = INT2FIX(line); 04581 argv[3] = id ? ID2SYM(id) : Qnil; 04582 argv[4] = (p->self && srcfile) ? rb_binding_new() : Qnil; 04583 argv[5] = klass ? klass : Qnil; 04584 04585 return rb_proc_call_with_block(p->proc, 6, argv, Qnil); 04586 } 04587 04588 static void 04589 call_trace_func(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04590 { 04591 struct event_call_args args; 04592 04593 args.th = GET_THREAD(); 04594 args.event = event; 04595 args.proc = proc; 04596 args.self = self; 04597 args.id = id; 04598 args.klass = klass; 04599 ruby_suppress_tracing(call_trace_proc, (VALUE)&args, FALSE); 04600 } 04601 04602 VALUE 04603 ruby_suppress_tracing(VALUE (*func)(VALUE, int), VALUE arg, int always) 04604 { 04605 rb_thread_t *th = GET_THREAD(); 04606 return thread_suppress_tracing(th, EVENT_RUNNING_TRACE, func, arg, always, 0); 04607 } 04608 04609 static VALUE 04610 thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always, int pop_p) 04611 { 04612 int state, tracing = th->tracing, running = tracing & ev; 04613 volatile int raised; 04614 volatile int outer_state; 04615 VALUE result = Qnil; 04616 04617 if (running == ev && !always) { 04618 return Qnil; 04619 } 04620 else { 04621 th->tracing |= ev; 04622 } 04623 04624 raised = rb_threadptr_reset_raised(th); 04625 outer_state = th->state; 04626 th->state = 0; 04627 04628 PUSH_TAG(); 04629 if ((state = EXEC_TAG()) == 0) { 04630 result = (*func)(arg, running); 04631 } 04632 04633 if (raised) { 04634 rb_threadptr_set_raised(th); 04635 } 04636 POP_TAG(); 04637 04638 th->tracing = tracing; 04639 if (state) { 04640 if (pop_p) { 04641 th->cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(th->cfp); 04642 } 04643 JUMP_TAG(state); 04644 } 04645 th->state = outer_state; 04646 04647 return result; 04648 } 04649 04650 /* 04651 * call-seq: 04652 * thr.backtrace -> array 04653 * 04654 * Returns the current back trace of the _thr_. 04655 */ 04656 04657 static VALUE 04658 rb_thread_backtrace_m(VALUE thval) 04659 { 04660 return rb_thread_backtrace(thval); 04661 } 04662 04663 /* 04664 * Document-class: ThreadError 04665 * 04666 * Raised when an invalid operation is attempted on a thread. 04667 * 04668 * For example, when no other thread has been started: 04669 * 04670 * Thread.stop 04671 * 04672 * <em>raises the exception:</em> 04673 * 04674 * ThreadError: stopping only thread 04675 */ 04676 04677 /* 04678 * +Thread+ encapsulates the behavior of a thread of 04679 * execution, including the main thread of the Ruby script. 04680 * 04681 * In the descriptions of the methods in this class, the parameter _sym_ 04682 * refers to a symbol, which is either a quoted string or a 04683 * +Symbol+ (such as <code>:name</code>). 04684 */ 04685 04686 void 04687 Init_Thread(void) 04688 { 04689 #undef rb_intern 04690 #define rb_intern(str) rb_intern_const(str) 04691 04692 VALUE cThGroup; 04693 rb_thread_t *th = GET_THREAD(); 04694 04695 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); 04696 rb_define_singleton_method(rb_cThread, "start", thread_start, -2); 04697 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); 04698 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); 04699 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); 04700 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); 04701 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); 04702 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); 04703 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); 04704 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); 04705 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); 04706 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); 04707 #if THREAD_DEBUG < 0 04708 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); 04709 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); 04710 #endif 04711 04712 rb_define_method(rb_cThread, "initialize", thread_initialize, -2); 04713 rb_define_method(rb_cThread, "raise", thread_raise_m, -1); 04714 rb_define_method(rb_cThread, "join", thread_join_m, -1); 04715 rb_define_method(rb_cThread, "value", thread_value, 0); 04716 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0); 04717 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0); 04718 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0); 04719 rb_define_method(rb_cThread, "run", rb_thread_run, 0); 04720 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); 04721 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); 04722 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); 04723 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); 04724 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); 04725 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); 04726 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); 04727 rb_define_method(rb_cThread, "status", rb_thread_status, 0); 04728 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); 04729 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0); 04730 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0); 04731 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); 04732 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); 04733 rb_define_method(rb_cThread, "group", rb_thread_group, 0); 04734 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, 0); 04735 04736 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); 04737 04738 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); 04739 OBJ_TAINT(closed_stream_error); 04740 OBJ_FREEZE(closed_stream_error); 04741 04742 cThGroup = rb_define_class("ThreadGroup", rb_cObject); 04743 rb_define_alloc_func(cThGroup, thgroup_s_alloc); 04744 rb_define_method(cThGroup, "list", thgroup_list, 0); 04745 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0); 04746 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); 04747 rb_define_method(cThGroup, "add", thgroup_add, 1); 04748 04749 { 04750 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); 04751 rb_define_const(cThGroup, "Default", th->thgroup); 04752 } 04753 04754 rb_cMutex = rb_define_class("Mutex", rb_cObject); 04755 rb_define_alloc_func(rb_cMutex, mutex_alloc); 04756 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); 04757 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); 04758 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); 04759 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); 04760 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); 04761 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); 04762 04763 recursive_key = rb_intern("__recursive_key__"); 04764 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); 04765 04766 /* trace */ 04767 rb_define_global_function("set_trace_func", set_trace_func, 1); 04768 rb_define_method(rb_cThread, "set_trace_func", thread_set_trace_func_m, 1); 04769 rb_define_method(rb_cThread, "add_trace_func", thread_add_trace_func_m, 1); 04770 04771 /* init thread core */ 04772 { 04773 /* main thread setting */ 04774 { 04775 /* acquire global vm lock */ 04776 gvl_init(th->vm); 04777 gvl_acquire(th->vm, th); 04778 native_mutex_initialize(&th->interrupt_lock); 04779 } 04780 } 04781 04782 rb_thread_create_timer_thread(); 04783 04784 /* suppress warnings on cygwin, mingw and mswin.*/ 04785 (void)native_mutex_trylock; 04786 } 04787 04788 int 04789 ruby_native_thread_p(void) 04790 { 04791 rb_thread_t *th = ruby_thread_from_native(); 04792 04793 return th != 0; 04794 } 04795 04796 static int 04797 check_deadlock_i(st_data_t key, st_data_t val, int *found) 04798 { 04799 VALUE thval = key; 04800 rb_thread_t *th; 04801 GetThreadPtr(thval, th); 04802 04803 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { 04804 *found = 1; 04805 } 04806 else if (th->locking_mutex) { 04807 rb_mutex_t *mutex; 04808 GetMutexPtr(th->locking_mutex, mutex); 04809 04810 native_mutex_lock(&mutex->lock); 04811 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { 04812 *found = 1; 04813 } 04814 native_mutex_unlock(&mutex->lock); 04815 } 04816 04817 return (*found) ? ST_STOP : ST_CONTINUE; 04818 } 04819 04820 #ifdef DEBUG_DEADLOCK_CHECK 04821 static int 04822 debug_i(st_data_t key, st_data_t val, int *found) 04823 { 04824 VALUE thval = key; 04825 rb_thread_t *th; 04826 GetThreadPtr(thval, th); 04827 04828 printf("th:%p %d %d", th, th->status, th->interrupt_flag); 04829 if (th->locking_mutex) { 04830 rb_mutex_t *mutex; 04831 GetMutexPtr(th->locking_mutex, mutex); 04832 04833 native_mutex_lock(&mutex->lock); 04834 printf(" %p %d\n", mutex->th, mutex->cond_waiting); 04835 native_mutex_unlock(&mutex->lock); 04836 } 04837 else 04838 puts(""); 04839 04840 return ST_CONTINUE; 04841 } 04842 #endif 04843 04844 static void 04845 rb_check_deadlock(rb_vm_t *vm) 04846 { 04847 int found = 0; 04848 04849 if (vm_living_thread_num(vm) > vm->sleeper) return; 04850 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); 04851 if (patrol_thread && patrol_thread != GET_THREAD()) return; 04852 04853 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); 04854 04855 if (!found) { 04856 VALUE argv[2]; 04857 argv[0] = rb_eFatal; 04858 argv[1] = rb_str_new2("deadlock detected"); 04859 #ifdef DEBUG_DEADLOCK_CHECK 04860 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread); 04861 st_foreach(vm->living_threads, debug_i, (st_data_t)0); 04862 #endif 04863 vm->sleeper--; 04864 rb_threadptr_raise(vm->main_thread, 2, argv); 04865 } 04866 } 04867 04868 static void 04869 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04870 { 04871 VALUE coverage = GET_THREAD()->cfp->iseq->coverage; 04872 if (coverage && RBASIC(coverage)->klass == 0) { 04873 long line = rb_sourceline() - 1; 04874 long count; 04875 if (RARRAY_PTR(coverage)[line] == Qnil) { 04876 return; 04877 } 04878 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1; 04879 if (POSFIXABLE(count)) { 04880 RARRAY_PTR(coverage)[line] = LONG2FIX(count); 04881 } 04882 } 04883 } 04884 04885 VALUE 04886 rb_get_coverages(void) 04887 { 04888 return GET_VM()->coverages; 04889 } 04890 04891 void 04892 rb_set_coverages(VALUE coverages) 04893 { 04894 GET_VM()->coverages = coverages; 04895 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); 04896 } 04897 04898 void 04899 rb_reset_coverages(void) 04900 { 04901 GET_VM()->coverages = Qfalse; 04902 rb_remove_event_hook(update_coverage); 04903 } 04904 04905
1.7.6.1