16 #ifdef THREAD_SIMPLE_IMPL 23 #ifndef WIN32_LEAN_AND_MEAN 24 #define WIN32_LEAN_AND_MEAN 1 29 bool ThreadSimpleManager::_pointers_initialized;
30 ThreadSimpleManager *ThreadSimpleManager::_global_ptr;
36 ThreadSimpleManager() :
37 _simple_thread_epoch_timeslice
38 (
"simple-thread-epoch-timeslice", 0.05,
39 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, " 40 "in seconds, that should be considered the " 41 "typical timeslice for one epoch (to run all threads once).")),
42 _simple_thread_volunteer_delay
43 (
"simple-thread-volunteer-delay", 0.0,
44 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, " 45 "in seconds, for which a task that voluntarily yields should " 47 _simple_thread_yield_sleep
48 (
"simple-thread-yield-sleep", 0.001,
49 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, " 50 "in seconds, for which the process should be put to sleep when " 51 "yielding the timeslice to the system.")),
53 (
"simple-thread-window", 1.0,
54 PRC_DESC(
"When SIMPLE_THREADS is defined, this defines the amount of time, " 55 "in seconds, over which to average all the threads' runtimes, " 56 "for the purpose of scheduling threads.")),
57 _simple_thread_low_weight
58 (
"simple-thread-low-weight", 0.2,
59 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative " 60 "amount of time that is given to threads with priority TP_low.")),
61 _simple_thread_normal_weight
62 (
"simple-thread-normal-weight", 1.0,
63 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative " 64 "amount of time that is given to threads with priority TP_normal.")),
65 _simple_thread_high_weight
66 (
"simple-thread-high-weight", 5.0,
67 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative " 68 "amount of time that is given to threads with priority TP_high.")),
69 _simple_thread_urgent_weight
70 (
"simple-thread-urgent-weight", 10.0,
71 PRC_DESC(
"When SIMPLE_THREADS is defined, this determines the relative " 72 "amount of time that is given to threads with priority TP_urgent."))
74 _tick_scale = 1000000.0;
76 _current_thread =
nullptr;
78 _waiting_for_exit =
nullptr;
95 void ThreadSimpleManager::
96 enqueue_ready(ThreadSimpleImpl *thread,
bool volunteer) {
100 _next_ready.push_back(thread);
106 double now = get_current_time();
107 thread->_wake_time = now + _simple_thread_volunteer_delay;
108 _volunteers.push_back(thread);
109 push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime());
118 void ThreadSimpleManager::
119 enqueue_sleep(ThreadSimpleImpl *thread,
double seconds) {
120 if (thread_cat->is_debug()) {
122 << *_current_thread->_parent_obj <<
" sleeping for " 123 << seconds <<
" seconds\n";
126 double now = get_current_time();
127 thread->_wake_time = now + seconds;
128 _sleeping.push_back(thread);
129 push_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime());
137 void ThreadSimpleManager::
138 enqueue_block(ThreadSimpleImpl *thread, BlockerSimple *blocker) {
139 _blocked[blocker].push_back(thread);
140 blocker->_flags |= BlockerSimple::F_has_waiters;
147 bool ThreadSimpleManager::
148 unblock_one(BlockerSimple *blocker) {
149 Blocked::iterator bi = _blocked.find(blocker);
150 if (bi != _blocked.end()) {
151 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
153 FifoThreads &threads = (*bi).second;
154 nassertr(!threads.empty(),
false);
155 ThreadSimpleImpl *thread = threads.front();
157 _ready.push_back(thread);
158 if (threads.empty()) {
159 blocker->_flags &= ~BlockerSimple::F_has_waiters;
173 bool ThreadSimpleManager::
174 unblock_all(BlockerSimple *blocker) {
175 Blocked::iterator bi = _blocked.find(blocker);
176 if (bi != _blocked.end()) {
177 nassertr(blocker->_flags & BlockerSimple::F_has_waiters,
false);
179 FifoThreads &threads = (*bi).second;
180 nassertr(!threads.empty(),
false);
181 while (!threads.empty()) {
182 ThreadSimpleImpl *thread = threads.front();
184 _ready.push_back(thread);
186 blocker->_flags &= ~BlockerSimple::F_has_waiters;
199 void ThreadSimpleManager::
200 enqueue_finished(ThreadSimpleImpl *thread) {
201 _finished.push_back(thread);
208 void ThreadSimpleManager::
209 preempt(ThreadSimpleImpl *thread) {
210 FifoThreads::iterator ti;
211 ti = find(_ready.begin(), _ready.end(), thread);
212 if (ti != _ready.end()) {
214 _ready.push_front(thread);
226 void ThreadSimpleManager::
230 while (!_finished.empty() && _finished.front() != _current_thread) {
231 ThreadSimpleImpl *finished_thread = _finished.front();
232 _finished.pop_front();
240 _current_thread->_python_state = thread_state_swap(
nullptr);
241 #endif // HAVE_PYTHON 245 if (pstats_callback !=
nullptr) {
250 save_thread_context(_current_thread->_context, st_choose_next_context,
this);
255 if (pstats_callback !=
nullptr) {
256 pstats_callback->
activate_hook(_current_thread->_parent_obj);
261 thread_state_swap(_current_thread->_python_state);
262 #endif // HAVE_PYTHON 270 void ThreadSimpleManager::
272 if (!_current_thread->_parent_obj->is_exact_type(MainThread::get_class_type())) {
273 if (thread_cat->is_debug()) {
275 <<
"Ignoring prepare_for_exit called from " 276 << *(_current_thread->_parent_obj) <<
"\n";
281 if (thread_cat->is_debug()) {
283 <<
"prepare_for_exit\n";
286 nassertv(_waiting_for_exit ==
nullptr);
287 _waiting_for_exit = _current_thread;
291 kill_non_joinable(_ready);
293 Blocked::iterator bi = _blocked.begin();
294 while (bi != _blocked.end()) {
295 Blocked::iterator bnext = bi;
297 BlockerSimple *blocker = (*bi).first;
298 FifoThreads &threads = (*bi).second;
299 kill_non_joinable(threads);
300 if (threads.empty()) {
301 blocker->_flags &= ~BlockerSimple::F_has_waiters;
307 kill_non_joinable(_sleeping);
308 kill_non_joinable(_volunteers);
313 while (!_finished.empty() && _finished.front() != _current_thread) {
314 ThreadSimpleImpl *finished_thread = _finished.front();
315 _finished.pop_front();
324 void ThreadSimpleManager::
325 set_current_thread(ThreadSimpleImpl *current_thread) {
326 nassertv(_current_thread ==
nullptr);
327 _current_thread = current_thread;
334 void ThreadSimpleManager::
335 remove_thread(ThreadSimpleImpl *thread) {
336 TickRecords new_records;
337 TickRecords::iterator ri;
338 for (ri = _tick_records.begin(); ri != _tick_records.end(); ++ri) {
339 if ((*ri)._thread != thread) {
341 new_records.push_back(*ri);
344 nassertv(_total_ticks >= (*ri)._tick_count);
345 _total_ticks -= (*ri)._tick_count;
349 _tick_records.swap(new_records);
356 void ThreadSimpleManager::
357 system_sleep(
double seconds) {
359 Sleep((
int)(seconds * 1000 + 0.5));
372 tv.tv_sec = time_t(seconds);
373 tv.tv_usec = long((seconds - (
double)tv.tv_sec) * 1000000.0 + 0.5);
374 select(0,
nullptr,
nullptr,
nullptr, &tv);
381 void ThreadSimpleManager::
382 write_status(std::ostream &out)
const {
383 out <<
"Currently running: " << *_current_thread->_parent_obj <<
"\n";
386 FifoThreads::const_iterator ti;
387 Sleeping::const_iterator si;
388 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
389 out <<
" " << *(*ti)->_parent_obj;
391 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
392 out <<
" " << *(*ti)->_parent_obj;
394 for (si = _volunteers.begin(); si != _volunteers.end(); ++si) {
395 out <<
" " << *(*si)->_parent_obj;
399 double now = get_current_time();
403 Sleeping s2 = _sleeping;
404 sort(s2.begin(), s2.end(), CompareStartTime());
405 for (si = s2.begin(); si != s2.end(); ++si) {
406 out <<
" " << *(*si)->_parent_obj <<
"(" << (*si)->_wake_time - now
411 Blocked::const_iterator bi;
412 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
413 BlockerSimple *blocker = (*bi).first;
414 const FifoThreads &threads = (*bi).second;
415 out <<
"On blocker " << blocker <<
":\n";
416 FifoThreads::const_iterator ti;
417 for (ti = threads.begin(); ti != threads.end(); ++ti) {
418 ThreadSimpleImpl *thread = (*ti);
419 out <<
" " << *thread->_parent_obj;
422 thread->_parent_obj->output_blocker(out);
424 #endif // DEBUG_THREADS 434 void ThreadSimpleManager::
436 if (!_pointers_initialized) {
441 if (thread_cat->is_debug()) {
452 system_sleep(_global_ptr->_simple_thread_yield_sleep);
459 double ThreadSimpleManager::
460 get_current_time()
const {
461 return _clock->get_short_raw_time();
467 void ThreadSimpleManager::
469 if (!_pointers_initialized) {
470 _pointers_initialized =
true;
471 _global_ptr =
new ThreadSimpleManager;
479 void ThreadSimpleManager::
480 st_choose_next_context(
struct ThreadContext *from_context,
void *data) {
481 ThreadSimpleManager *
self = (ThreadSimpleManager *)data;
482 self->choose_next_context(from_context);
488 void ThreadSimpleManager::
489 choose_next_context(
struct ThreadContext *from_context) {
490 double now = get_current_time();
492 do_timeslice_accounting(_current_thread, now);
493 _current_thread =
nullptr;
495 if (!_sleeping.empty() || !_volunteers.empty()) {
496 if (_ready.empty() && _next_ready.empty()) {
499 wake_all_sleepers(_volunteers);
504 now = get_current_time();
506 wake_sleepers(_sleeping, now);
507 wake_sleepers(_volunteers, now);
510 bool new_epoch = !_ready.empty() && _next_ready.empty();
515 while (_ready.empty()) {
516 if (!_next_ready.empty()) {
518 _ready.swap(_next_ready);
520 if (new_epoch && !_tick_records.empty()) {
524 if (thread_cat->is_debug()) {
526 <<
"All threads exceeded budget.\n";
528 TickRecord &record = _tick_records.front();
529 _total_ticks -= record._tick_count;
531 if (record._thread->_run_ticks >= record._tick_count) {
533 record._thread->_run_ticks -= record._tick_count;
537 record._thread->_run_ticks = 0;
539 _tick_records.pop_front();
543 }
else if (!_volunteers.empty()) {
546 if (thread_cat->is_debug()) {
548 <<
"Waking volunteers.\n";
553 now = get_current_time();
554 wake_all_sleepers(_volunteers);
555 wake_sleepers(_sleeping, now);
557 }
else if (!_sleeping.empty()) {
559 double wait = _sleeping.front()->_wake_time - now;
561 if (thread_cat->is_debug()) {
563 <<
"Sleeping all threads " << wait <<
" seconds\n";
567 now = get_current_time();
568 wake_sleepers(_sleeping, now);
569 wake_sleepers(_volunteers, now);
573 if (_waiting_for_exit !=
nullptr) {
577 _ready.push_back(_waiting_for_exit);
578 _waiting_for_exit =
nullptr;
584 if (!_blocked.empty()) {
586 <<
"Deadlock! All threads blocked.\n";
595 <<
"All threads disappeared!\n";
600 ThreadSimpleImpl *chosen_thread = _ready.front();
603 double timeslice = determine_timeslice(chosen_thread);
604 if (timeslice > 0.0) {
606 chosen_thread->_start_time = now;
607 chosen_thread->_stop_time = now + timeslice;
608 _current_thread = chosen_thread;
614 _next_ready.push_back(chosen_thread);
618 if (thread_cat->is_debug()) {
619 size_t blocked_count = 0;
620 Blocked::const_iterator bi;
621 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
622 const FifoThreads &threads = (*bi).second;
623 blocked_count += threads.size();
626 double timeslice = _current_thread->_stop_time - _current_thread->_start_time;
628 <<
"Switching to " << *_current_thread->_parent_obj
629 <<
" for " << timeslice <<
" s (" 630 << _ready.size() <<
" + " << _next_ready.size()
631 <<
" + " << _volunteers.size()
632 <<
" other threads ready, " << blocked_count
633 <<
" blocked, " << _sleeping.size() <<
" sleeping)\n";
636 switch_to_thread_context(from_context, _current_thread->_context);
647 void ThreadSimpleManager::
648 do_timeslice_accounting(ThreadSimpleImpl *thread,
double now) {
649 double elapsed = now - thread->_start_time;
650 if (thread_cat.is_debug()) {
652 << *thread->_parent_obj <<
" ran for " << elapsed <<
" s of " 653 << thread->_stop_time - thread->_start_time <<
" requested.\n";
658 elapsed = std::max(elapsed, 0.0);
660 unsigned int ticks = (
unsigned int)(elapsed * _tick_scale + 0.5);
661 thread->_run_ticks += ticks;
664 unsigned int ticks_window = (
unsigned int)(_simple_thread_window * _tick_scale + 0.5);
665 while (_total_ticks > ticks_window) {
666 nassertv(!_tick_records.empty());
667 TickRecord &record = _tick_records.front();
668 _total_ticks -= record._tick_count;
669 if (record._thread->_run_ticks >= record._tick_count) {
671 record._thread->_run_ticks -= record._tick_count;
675 record._thread->_run_ticks = 0;
677 _tick_records.pop_front();
682 record._tick_count = ticks;
683 record._thread = thread;
684 _tick_records.push_back(record);
685 _total_ticks += ticks;
693 void ThreadSimpleManager::
694 wake_sleepers(ThreadSimpleManager::Sleeping &sleepers,
double now) {
695 while (!sleepers.empty() && sleepers.front()->_wake_time <= now) {
696 ThreadSimpleImpl *thread = sleepers.front();
697 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
699 _ready.push_back(thread);
707 void ThreadSimpleManager::
708 wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) {
709 while (!sleepers.empty()) {
710 ThreadSimpleImpl *thread = sleepers.front();
711 pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
713 _ready.push_back(thread);
720 void ThreadSimpleManager::
722 Blocked::const_iterator bi;
723 for (bi = _blocked.begin(); bi != _blocked.end(); ++bi) {
724 BlockerSimple *blocker = (*bi).first;
725 const FifoThreads &threads = (*bi).second;
727 <<
"On blocker " << blocker <<
":\n";
728 FifoThreads::const_iterator ti;
729 for (ti = threads.begin(); ti != threads.end(); ++ti) {
730 ThreadSimpleImpl *thread = (*ti);
732 <<
" " << *thread->_parent_obj;
734 thread_cat.info(
false) <<
" (";
735 thread->_parent_obj->output_blocker(thread_cat.info(
false));
736 thread_cat.info(
false) <<
")";
737 #endif // DEBUG_THREADS 738 thread_cat.info(
false) <<
"\n";
748 double ThreadSimpleManager::
749 determine_timeslice(ThreadSimpleImpl *chosen_thread) {
750 if (_ready.empty() && _next_ready.empty()) {
752 return _simple_thread_epoch_timeslice;
756 unsigned int total_ticks = chosen_thread->_run_ticks;
757 double total_weight = chosen_thread->_priority_weight;
759 FifoThreads::const_iterator ti;
760 for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
761 total_ticks += (*ti)->_run_ticks;
762 total_weight += (*ti)->_priority_weight;
764 for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
765 total_ticks += (*ti)->_run_ticks;
766 total_weight += (*ti)->_priority_weight;
769 nassertr(total_weight != 0.0, 0.0);
770 double budget_ratio = chosen_thread->_priority_weight / total_weight;
772 if (total_ticks == 0) {
774 return budget_ratio * _simple_thread_epoch_timeslice;
777 double run_ratio = (double)chosen_thread->_run_ticks / (
double)total_ticks;
778 double remaining_ratio = budget_ratio - run_ratio;
780 if (thread_cat->is_debug()) {
782 << *chosen_thread->_parent_obj <<
" accrued " 783 << chosen_thread->_run_ticks / _tick_scale <<
" s of " 784 << total_ticks / _tick_scale <<
"; budget is " 785 << budget_ratio * total_ticks / _tick_scale <<
".\n";
786 if (remaining_ratio <= 0.0) {
788 <<
"Exceeded budget.\n";
792 return remaining_ratio * _simple_thread_epoch_timeslice;
799 void ThreadSimpleManager::
800 kill_non_joinable(ThreadSimpleManager::FifoThreads &threads) {
801 FifoThreads new_threads;
802 FifoThreads::iterator ti;
803 for (ti = threads.begin(); ti != threads.end(); ++ti) {
804 ThreadSimpleImpl *thread = (*ti);
805 if (thread->_joinable) {
806 new_threads.push_back(thread);
808 if (thread_cat->is_debug()) {
810 <<
"Killing " << *thread->_parent_obj <<
"\n";
812 thread->_status = ThreadSimpleImpl::TS_killed;
813 enqueue_finished(thread);
817 threads.swap(new_threads);
824 void ThreadSimpleManager::
825 kill_non_joinable(ThreadSimpleManager::Sleeping &threads) {
826 Sleeping new_threads;
827 Sleeping::iterator ti;
828 for (ti = threads.begin(); ti != threads.end(); ++ti) {
829 ThreadSimpleImpl *thread = (*ti);
830 if (thread->_joinable) {
831 new_threads.push_back(thread);
833 if (thread_cat->is_debug()) {
835 <<
"Killing " << *thread->_parent_obj <<
"\n";
837 thread->_status = ThreadSimpleImpl::TS_killed;
838 enqueue_finished(thread);
841 make_heap(new_threads.begin(), new_threads.end(), CompareStartTime());
842 threads.swap(new_threads);
845 #endif // THREAD_SIMPLE_IMPL static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
static void force_yield()
Suspends the current thread for the rest of the current epoch.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
virtual void activate_hook(Thread *thread)
Called when the thread is activated (resumes execution).
get_main_thread
Returns a pointer to the "main" Thread object–this is the Thread that started the whole process.
virtual void deactivate_hook(Thread *thread)
Called when the thread is deactivated (swapped for another running thread).
void unref_delete(RefCountType *ptr)
This global helper function will unref the given ReferenceCount object, and if the reference count re...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.