libc: synchronization primitives based on monitor

The libc monitor facility enables the execution of monitor jobs by the
main thread when the monitor pool was charged. In comparison to the
current suspend/resume_all mechanism the main thread iterates over all
job functions in contrast to waking up all threads to check their
conditions by themselves. Threads are only woken up if the completion
condition was met.

This commit is the result of a collaboration with Christian Prochaska.
Many thanks for your support, Christian.

Fixes #3550
This commit is contained in:
Christian Helmuth 2020-02-05 15:47:09 +01:00 committed by Norman Feske
parent 6aebd5dd95
commit ff5175ec76
14 changed files with 1118 additions and 512 deletions

View File

@ -1,3 +1,4 @@
___mb_cur_max D 50
___runetype T
___tolower T
___toupper T
@ -7,9 +8,9 @@ __error T
__flt_rounds T
__fpclassifyd T
__fpclassifyf T
__has_sse D 4
__h_errno T
__h_errno_set T
__has_sse D 4
__inet_addr T
__inet_aton T
__inet_nsap_ntoa T
@ -18,7 +19,6 @@ __inet_ntop T
__inet_pton T
__isthreaded B 4
__mb_cur_max D 8
___mb_cur_max D 50
__res_init T
__res_query T
__res_state T
@ -85,8 +85,9 @@ chroot W
clearerr T
clearerr_unlocked T
clock T
clock_gettime W
clock_getres W
clock_gettime W
clock_nanosleep W
close T
closedir T
closelog T
@ -194,6 +195,7 @@ free T
freeaddrinfo T
freebsd7___semctl W
freebsd7_semctl T
freeifaddrs T
freelocale T
freopen T
fscanf T
@ -264,7 +266,6 @@ gethostbyname W
gethostid T
gethostname T
getifaddrs T
freeifaddrs T
getline T
getloadavg T
getlogin T
@ -456,7 +457,6 @@ mrand48 T
msync T
munmap T
nanosleep W
clock_nanosleep W
newlocale T
nextwctype T
nftw T
@ -480,7 +480,6 @@ pclose T
perror T
pipe T
poll W
ppoll W
popen T
posix2time T
posix_fadvise T
@ -506,16 +505,17 @@ posix_spawnattr_setschedpolicy T
posix_spawnattr_setsigdefault T
posix_spawnattr_setsigmask T
posix_spawnp T
ppoll W
pread T
printf T
pselect W
psignal T
pthread_atfork T
pthread_attr_destroy T
pthread_attr_get_np T
pthread_attr_getdetachstate T
pthread_attr_getguardsize T
pthread_attr_getinheritsched T
pthread_attr_get_np T
pthread_attr_getschedparam T
pthread_attr_getschedpolicy T
pthread_attr_getscope T
@ -556,6 +556,7 @@ pthread_main_np T
pthread_mutex_destroy T
pthread_mutex_init T
pthread_mutex_lock T
pthread_mutex_timedlock T
pthread_mutex_trylock T
pthread_mutex_unlock T
pthread_mutexattr_destroy T
@ -630,18 +631,18 @@ seed48 T
seekdir T
select W
sem_close T
semctl T
sem_destroy T
sem_getvalue T
semget W
sem_init T
sem_open T
semop W
sem_post T
sem_timedwait T
sem_trywait T
sem_unlink T
sem_wait T
semctl T
semget W
semop W
send T
sendmsg W
sendto T
@ -849,8 +850,8 @@ unvis T
uselocale T
user_from_uid T
usleep W
utimes W
utime W
utimes W
vasprintf T
vdprintf T
verr T

View File

@ -3,7 +3,7 @@
<requires> <timer/> </requires>
<events>
<timeout meaning="failed" sec="70" />
<timeout meaning="failed" sec="90" />
<log meaning="succeeded">--- returning from main ---</log>
<log meaning="failed">Error: </log>
<log meaning="failed">child "test-pthread" exited</log>

View File

@ -5,7 +5,7 @@
*/
/*
* Copyright (C) 2016-2017 Genode Labs GmbH
* Copyright (C) 2016-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -30,6 +30,7 @@ namespace Libc {
struct Resume;
struct Suspend;
struct Monitor;
struct Select;
struct Current_time;
struct Clone_connection;
@ -104,10 +105,10 @@ namespace Libc {
void init_socket_fs(Suspend &);
/**
* Allow thread.cc to access the 'Genode::Env' (needed for the
* implementation of condition variables with timeout)
* Pthread/semaphore support
*/
void init_pthread_support(Genode::Env &env, Suspend &, Resume &);
void init_pthread_support(Monitor &, Suspend &, Resume &);
void init_semaphore_support(Monitor &);
struct Config_accessor : Interface
{

View File

@ -7,7 +7,7 @@
*/
/*
* Copyright (C) 2016-2019 Genode Labs GmbH
* Copyright (C) 2016-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -39,6 +39,7 @@
#include <internal/kernel_timer_accessor.h>
#include <internal/watch.h>
#include <internal/signal.h>
#include <internal/monitor.h>
namespace Libc { class Kernel; }
@ -58,6 +59,7 @@ struct Libc::Kernel final : Vfs::Io_response_handler,
Reset_malloc_heap,
Resume,
Suspend,
Monitor,
Select,
Kernel_routine_scheduler,
Current_time,
@ -218,6 +220,16 @@ struct Libc::Kernel final : Vfs::Io_response_handler,
Pthread_pool _pthreads { _timer_accessor };
Monitor::Pool _monitors { };
Reconstructible<Io_signal_handler<Kernel>> _execute_monitors {
_env.ep(), *this, &Kernel::_monitors_handler };
void _monitors_handler()
{
_monitors.execute_monitors();
}
Constructible<Clone_connection> _clone_connection { };
struct Resumer
@ -466,6 +478,71 @@ struct Libc::Kernel final : Vfs::Io_response_handler,
}
}
/**
* Monitor interface
*/
bool _monitor(Genode::Lock &mutex, Function &fn, uint64_t timeout_ms) override
{
if (_main_context()) {
struct Job : Monitor::Job
{
Kernel &_kernel;
uint64_t _timeout_ms;
bool _timeout_valid { _timeout_ms != 0 };
struct Check : Suspend_functor
{
bool const &completed;
Check(bool const &completed) : completed(completed) { }
bool suspend() override
{
return !completed;
}
} check { _completed };
Job(Monitor::Function &fn, Kernel &kernel,
Timer_accessor &timer_accessor, uint64_t timeout_ms)
:
Monitor::Job(fn, timer_accessor, 0 /* timeout handled by suspend */),
_kernel(kernel), _timeout_ms(timeout_ms)
{ }
void wait_for_completion() override
{
do {
_timeout_ms = _kernel._suspend_main(check, _timeout_ms);
_expired = _timeout_valid && !_timeout_ms;
} while (!completed() && !expired());
}
void complete() override
{
_completed = true;
_kernel._resume_main();
}
} job { fn, *this, _timer_accessor, timeout_ms };
_monitors.monitor(mutex, job);
return job.completed();
} else {
Monitor::Job job { fn, _timer_accessor, timeout_ms };
_monitors.monitor(mutex, job);
return job.completed();
}
}
void _charge_monitors() override
{
if (_monitors.charge_monitors())
Signal_transmitter(*_execute_monitors).submit();
}
/**
* Current_time interface
*/

View File

@ -0,0 +1,171 @@
/*
* \brief Monitored execution in main context
* \author Christian Helmuth
* \author Christian Prochaska
* \date 2020-01-09
*/
/*
* Copyright (C) 2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
#ifndef _LIBC__INTERNAL__MONITOR_H_
#define _LIBC__INTERNAL__MONITOR_H_
/* Genode includes */
#include <base/registry.h>
/* libc-internal includes */
#include <internal/timer.h>
#include <internal/types.h>
namespace Libc { class Monitor; };
class Libc::Monitor : Interface
{
public:
struct Job;
struct Pool;
protected:
struct Function : Interface { virtual bool execute() = 0; };
virtual bool _monitor(Genode::Lock &, Function &, uint64_t) = 0;
virtual void _charge_monitors() = 0;
public:
/**
* Block until monitored execution succeeds or timeout expires
*
* The mutex must be locked when calling the monitor. It is released
* during wait for completion and re-aquired before the function
* returns. This behavior is comparable to condition variables.
*
* Returns true if execution completed, false on timeout.
*/
template <typename FN>
bool monitor(Genode::Lock &mutex, FN const &fn, uint64_t timeout_ms = 0)
{
struct _Function : Function
{
FN const &fn;
bool execute() override { return fn(); }
_Function(FN const &fn) : fn(fn) { }
} function { fn };
return _monitor(mutex, function, timeout_ms);
}
/**
* Charge monitor to execute the monitored function
*/
void charge_monitors() { _charge_monitors(); }
};
struct Libc::Monitor::Job : Timeout_handler
{
private:
Monitor::Function &_fn;
protected:
bool _completed { false };
bool _expired { false };
Lock _blockade { Lock::LOCKED };
Constructible<Timeout> _timeout;
public:
Job(Monitor::Function &fn,
Timer_accessor &timer_accessor, uint64_t timeout_ms)
: _fn(fn)
{
if (timeout_ms) {
_timeout.construct(timer_accessor, *this);
_timeout->start(timeout_ms);
}
}
bool completed() const { return _completed; }
bool expired() const { return _expired; }
bool execute() { return _fn.execute(); }
virtual void wait_for_completion() { _blockade.lock(); }
virtual void complete()
{
_completed = true;
_blockade.unlock();
}
/**
* Timeout_handler interface
*/
void handle_timeout() override
{
_expired = true;
_blockade.unlock();
}
};
struct Libc::Monitor::Pool
{
private:
Registry<Job> _jobs;
Lock _mutex;
bool _execution_pending { false };
public:
void monitor(Genode::Lock &mutex, Job &job)
{
Registry<Job>::Element element { _jobs, job };
mutex.unlock();
job.wait_for_completion();
mutex.lock();
}
bool charge_monitors()
{
Lock::Guard guard { _mutex };
bool const charged = !_execution_pending;
_execution_pending = true;
return charged;
}
void execute_monitors()
{
{
Lock::Guard guard { _mutex };
if (!_execution_pending) return;
_execution_pending = false;
}
_jobs.for_each([&] (Job &job) {
if (!job.completed() && !job.expired() && job.execute()) {
job.complete();
}
});
}
};
#endif /* _LIBC__INTERNAL__MONITOR_H_ */

View File

@ -244,6 +244,4 @@ struct pthread : Libc::Pthread
};
namespace Libc { void init_pthread_support(Env &env); }
#endif /* _LIBC__INTERNAL__PTHREAD_H_ */

View File

@ -0,0 +1,72 @@
/*
* \brief Libc-internal time utilities
* \author Christian Helmuth
* \date 2020-01-29
*/
/*
* Copyright (C) 2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
#ifndef _LIBC__INTERNAL__TIME_H_
#define _LIBC__INTERNAL__TIME_H_
/* libc includes */
#include <time.h>
/* libc-internal includes */
#include <internal/types.h>
namespace Libc {
inline uint64_t calculate_relative_timeout_ms(timespec abs_now, timespec abs_timeout);
}
/**
* Calculate relative timeout in milliseconds from 'abs_now' to 'abs_timeout'
*
* Returns 0 if timeout already expired.
*/
Libc::uint64_t Libc::calculate_relative_timeout_ms(timespec abs_now, timespec abs_timeout)
{
enum { S_IN_MS = 1000ULL, S_IN_NS = 1000 * 1000 * 1000ULL };
if (abs_now.tv_nsec >= S_IN_NS) {
abs_now.tv_sec += abs_now.tv_nsec / S_IN_NS;
abs_now.tv_nsec = abs_now.tv_nsec % S_IN_NS;
}
if (abs_timeout.tv_nsec >= S_IN_NS) {
abs_timeout.tv_sec += abs_timeout.tv_nsec / S_IN_NS;
abs_timeout.tv_nsec = abs_timeout.tv_nsec % S_IN_NS;
}
/* check whether absolute timeout is in the past */
if (abs_now.tv_sec > abs_timeout.tv_sec)
return 0;
uint64_t diff_ms = (abs_timeout.tv_sec - abs_now.tv_sec) * S_IN_MS;
uint64_t diff_ns = 0;
if (abs_timeout.tv_nsec >= abs_now.tv_nsec)
diff_ns = abs_timeout.tv_nsec - abs_now.tv_nsec;
else {
/* check whether absolute timeout is in the past */
if (diff_ms == 0)
return 0;
diff_ns = S_IN_NS - abs_now.tv_nsec + abs_timeout.tv_nsec;
diff_ms -= S_IN_MS;
}
diff_ms += diff_ns / 1000 / 1000;
/* if there is any diff then let the timeout be at least 1 MS */
if (diff_ms == 0 && diff_ns != 0)
return 1;
return diff_ms;
}
#endif /* _LIBC__INTERNAL__TIME_H_ */

View File

@ -1,262 +0,0 @@
/*
* \brief Semaphore implementation with timeout facility
* \author Stefan Kalkowski
* \date 2010-03-05
*
* This semaphore implementation allows to block on a semaphore for a
* given time instead of blocking indefinetely.
*
* For the timeout functionality the alarm framework is used.
*/
/*
* Copyright (C) 2010-2017 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
#ifndef _LIBC__INTERNAL__TIMED_SEMAPHORE_H_
#define _LIBC__INTERNAL__TIMED_SEMAPHORE_H_
#include <base/thread.h>
#include <base/semaphore.h>
#include <base/alarm.h>
#include <timer_session/connection.h>
namespace Libc {
using namespace Genode;
class Timeout_entrypoint;
class Timed_semaphore;
/**
* Exception types
*/
class Timeout_exception : public Exception { };
class Nonblocking_exception : public Exception { };
}
/**
* Alarm thread, which counts jiffies and triggers timeout events.
*/
class Libc::Timeout_entrypoint : private Entrypoint
{
private:
enum { JIFFIES_STEP_MS = 10 };
Alarm_scheduler _alarm_scheduler { };
Timer::Connection _timer;
Signal_handler<Timeout_entrypoint> _timer_handler;
void _handle_timer() { _alarm_scheduler.handle(_timer.elapsed_ms()); }
static size_t constexpr STACK_SIZE = 2048*sizeof(long);
public:
Timeout_entrypoint(Genode::Env &env)
:
Entrypoint(env, STACK_SIZE, "alarm-timer", Affinity::Location()),
_timer(env),
_timer_handler(*this, *this, &Timeout_entrypoint::_handle_timer)
{
_timer.sigh(_timer_handler);
_timer.trigger_periodic(JIFFIES_STEP_MS*1000);
}
Alarm::Time time(void) { return _timer.elapsed_ms(); }
void schedule_absolute(Alarm &alarm, Alarm::Time timeout)
{
_alarm_scheduler.schedule_absolute(&alarm, timeout);
}
void discard(Alarm &alarm) { _alarm_scheduler.discard(&alarm); }
};
/**
* Semaphore with timeout on down operation.
*/
class Libc::Timed_semaphore : public Semaphore
{
private:
typedef Semaphore::Element Element;
Timeout_entrypoint &_timeout_ep;
/**
* Aborts blocking on the semaphore, raised when a timeout occured.
*
* \param element the waiting-queue element associated with a timeout.
* \return true if a thread was aborted/woken up
*/
bool _abort(Element &element)
{
Lock::Guard lock_guard(Semaphore::_meta_lock);
/* potentially, the queue is empty */
if (++Semaphore::_cnt <= 0) {
/*
* Iterate through the queue and find the thread,
* with the corresponding timeout.
*/
Element *first = nullptr;
Semaphore::_queue.dequeue([&first] (Element &e) {
first = &e; });
Element *e = first;
while (e) {
/*
* Wakeup the thread.
*/
if (&element == e) {
e->wake_up();
return true;
}
/*
* Noninvolved threads are enqueued again.
*/
Semaphore::_queue.enqueue(*e);
e = nullptr;
Semaphore::_queue.dequeue([&e] (Element &next) {
e = &next; });
/*
* Maybe, the alarm was triggered just after the corresponding
* thread was already dequeued, that's why we have to track
* whether we processed the whole queue.
*/
if (e == first)
break;
}
}
/* The right element was not found, so decrease counter again */
--Semaphore::_cnt;
return false;
}
/**
* Represents a timeout associated with the blocking
* operation on a semaphore.
*/
class Timeout : public Alarm
{
private:
Timed_semaphore &_sem; /* semaphore we block on */
Element &_element; /* queue element timeout belongs to */
bool _triggered { false };
Time const _start;
public:
Timeout(Time start, Timed_semaphore &s, Element &e)
: _sem(s), _element(e), _triggered(false), _start(start)
{ }
bool triggered(void) { return _triggered; }
Time start() { return _start; }
protected:
bool on_alarm(uint64_t) override
{
_triggered = _sem._abort(_element);
return false;
}
};
public:
/**
* Constructor
*
* \param n initial counter value of the semphore
*/
Timed_semaphore(Timeout_entrypoint &timeout_ep, int n = 0)
: Semaphore(n), _timeout_ep(timeout_ep) { }
/**
* Decrements semaphore and blocks when it's already zero.
*
* \param t after t milliseconds of blocking a Timeout_exception is thrown.
* if t is zero do not block, instead raise an
* Nonblocking_exception.
* \return milliseconds the caller was blocked
*/
Alarm::Time down(Alarm::Time t)
{
Semaphore::_meta_lock.lock();
if (--Semaphore::_cnt < 0) {
/* If t==0 we shall not block */
if (t == 0) {
++_cnt;
Semaphore::_meta_lock.unlock();
throw Nonblocking_exception();
}
/*
* Create semaphore queue element representing the thread
* in the wait queue.
*/
Element queue_element;
Semaphore::_queue.enqueue(queue_element);
Semaphore::_meta_lock.unlock();
/* Create the timeout */
Alarm::Time const curr_time = _timeout_ep.time();
Timeout timeout(curr_time, *this, queue_element);
_timeout_ep.schedule_absolute(timeout, curr_time + t);
/*
* The thread is going to block on a local lock now,
* waiting for getting waked from another thread
* calling 'up()'
* */
queue_element.block();
/* Deactivate timeout */
_timeout_ep.discard(timeout);
/*
* When we were only woken up, because of a timeout,
* throw an exception.
*/
if (timeout.triggered())
throw Timeout_exception();
/* return blocking time */
return _timeout_ep.time() - timeout.start();
} else {
Semaphore::_meta_lock.unlock();
}
return 0;
}
/********************************
** Base class implementations **
********************************/
void down() { Semaphore::down(); }
void up() { Semaphore::up(); }
};
#endif /* _LIBC__INTERNAL__TIMED_SEMAPHORE_H_ */

View File

@ -17,6 +17,9 @@
/* Genode includes */
#include <timer_session/connection.h>
/* libc-internal includes */
#include <internal/types.h>
namespace Libc {
class Timer;
class Timer_accessor;

View File

@ -14,6 +14,7 @@
#ifndef _LIBC__INTERNAL__TYPES_H_
#define _LIBC__INTERNAL__TYPES_H_
/* Genode includes */
#include <base/log.h>
#include <util/string.h>

View File

@ -7,7 +7,7 @@
*/
/*
* Copyright (C) 2016-2019 Genode Labs GmbH
* Copyright (C) 2016-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -376,7 +376,8 @@ Libc::Kernel::Kernel(Genode::Env &env, Genode::Allocator &heap)
{
atexit(close_file_descriptors_on_exit);
init_pthread_support(env, *this, *this);
init_semaphore_support(*this);
init_pthread_support(*this, *this, *this);
_env.ep().register_io_progress_handler(*this);

View File

@ -1,12 +1,13 @@
/*
* \brief POSIX thread implementation
* \author Christian Prochaska
* \author Christian Helmuth
* \date 2012-03-12
*
*/
/*
* Copyright (C) 2012-2017 Genode Labs GmbH
* Copyright (C) 2012-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -22,47 +23,35 @@
/* libc includes */
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h> /* malloc, free */
/* libc-internal includes */
#include <internal/pthread.h>
#include <internal/timed_semaphore.h>
#include <internal/init.h>
#include <internal/suspend.h>
#include <internal/resume.h>
#include <internal/monitor.h>
#include <internal/time.h>
using namespace Libc;
static Genode::Env *_env_ptr; /* solely needed to spawn the timeout thread for the
timed semaphore */
static Thread *_main_thread_ptr;
static Suspend *_suspend_ptr;
static Thread *_main_thread_ptr;
static Resume *_resume_ptr;
static Suspend *_suspend_ptr;
static Monitor *_monitor_ptr;
void Libc::init_pthread_support(Genode::Env &env, Suspend &suspend, Resume &resume)
void Libc::init_pthread_support(Monitor &monitor, Suspend &suspend, Resume &resume)
{
_env_ptr = &env;
_main_thread_ptr = Thread::myself();
_monitor_ptr = &monitor;
_suspend_ptr = &suspend;
_resume_ptr = &resume;
}
static Libc::Timeout_entrypoint &_global_timeout_ep()
{
class Missing_call_of_init_pthread_support { };
if (!_env_ptr)
throw Missing_call_of_init_pthread_support();
static Timeout_entrypoint timeout_ep { *_env_ptr };
return timeout_ep;
}
/*************
** Pthread **
*************/
@ -199,23 +188,35 @@ struct pthread_mutex_attr { pthread_mutextype type; };
*/
struct pthread_mutex
{
pthread_t _owner { nullptr };
pthread_t _owner { nullptr };
unsigned _applicants { 0 };
Lock _data_mutex;
Lock _monitor_mutex;
struct Missing_call_of_init_pthread_support : Exception { };
void _suspend(Suspend_functor &func)
struct Applicant
{
if (!_suspend_ptr)
throw Missing_call_of_init_pthread_support();
_suspend_ptr->suspend(func);
}
pthread_mutex &m;
void _resume_all()
Applicant(pthread_mutex &m) : m(m)
{
Lock::Guard lock_guard(m._data_mutex);
++m._applicants;
}
~Applicant()
{
Lock::Guard lock_guard(m._data_mutex);
--m._applicants;
}
};
Monitor & _monitor()
{
if (!_resume_ptr)
if (!_monitor_ptr)
throw Missing_call_of_init_pthread_support();
_resume_ptr->resume_all();
return *_monitor_ptr;
}
pthread_mutex() { }
@ -227,57 +228,95 @@ struct pthread_mutex
* described IEEE Std 1003.1 POSIX.1-2017
* https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_lock.html
*/
virtual int lock() = 0;
virtual int trylock() = 0;
virtual int unlock() = 0;
virtual int lock() = 0;
virtual int timedlock(timespec const &) = 0;
virtual int trylock() = 0;
virtual int unlock() = 0;
};
struct Libc::Pthread_mutex_normal : pthread_mutex
{
int lock() override final
{
struct Try_lock : Suspend_functor
{
bool retry { false }; /* have to try after resume */
Pthread_mutex_normal &_mutex;
Try_lock(Pthread_mutex_normal &mutex) : _mutex(mutex) { }
bool suspend() override
{
retry = _mutex.trylock() == EBUSY;
return retry;
}
} try_lock(*this);
do { _suspend(try_lock); } while (try_lock.retry);
return 0;
}
int trylock() override final
int _try_lock(pthread_t thread)
{
Lock::Guard lock_guard(_data_mutex);
if (!_owner) {
_owner = pthread_self();
_owner = thread;
return 0;
}
return EBUSY;
}
int lock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
pthread_t const myself = pthread_self();
/* fast path without lock contention */
if (_try_lock(myself) == 0)
return 0;
{
Applicant guard { *this };
_monitor().monitor(_monitor_mutex,
[&] { return _try_lock(myself) == 0; });
}
return 0;
}
int timedlock(timespec const &abs_timeout) override final
{
Lock::Guard monitor_guard(_monitor_mutex);
pthread_t const myself = pthread_self();
/* fast path without lock contention - does not check abstimeout according to spec */
if (_try_lock(myself) == 0)
return 0;
timespec abs_now;
clock_gettime(CLOCK_REALTIME, &abs_now);
uint64_t const timeout_ms = calculate_relative_timeout_ms(abs_now, abs_timeout);
if (!timeout_ms)
return ETIMEDOUT;
{
Applicant guard { *this };
auto fn = [&] { return _try_lock(myself) == 0; };
if (_monitor().monitor(_monitor_mutex, fn, timeout_ms))
return 0;
else
return ETIMEDOUT;
}
return 0;
}
int trylock() override final
{
return _try_lock(pthread_self());
}
int unlock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
Lock::Guard lock_guard(_data_mutex);
if (_owner != pthread_self())
return EPERM;
_owner = nullptr;
_resume_all();
if (_applicants)
_monitor().charge_monitors();
return 0;
}
@ -286,52 +325,56 @@ struct Libc::Pthread_mutex_normal : pthread_mutex
struct Libc::Pthread_mutex_errorcheck : pthread_mutex
{
int lock() override final
{
/*
* We can't use trylock() as it returns EBUSY also for the
* EDEADLK case.
*/
struct Try_lock : Suspend_functor
{
bool retry { false }; /* have to try after resume */
int result { 0 };
enum Try_lock_result { SUCCESS, BUSY, DEADLOCK };
Pthread_mutex_errorcheck &_mutex;
Try_lock(Pthread_mutex_errorcheck &mutex) : _mutex(mutex) { }
bool suspend() override
{
Lock::Guard lock_guard(_mutex._data_mutex);
if (!_mutex._owner) {
_mutex._owner = pthread_self();
retry = false;
result = 0;
} else if (_mutex._owner == pthread_self()) {
retry = false;
result = EDEADLK;
} else {
retry = true;
}
return retry;
}
} try_lock(*this);
do { _suspend(try_lock); } while (try_lock.retry);
return try_lock.result;
}
int trylock() override final
Try_lock_result _try_lock(pthread_t thread)
{
Lock::Guard lock_guard(_data_mutex);
if (!_owner) {
_owner = pthread_self();
return 0;
_owner = thread;
return SUCCESS;
}
return _owner == thread ? DEADLOCK : BUSY;
}
int lock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
pthread_t const myself = pthread_self();
/* fast path without lock contention */
switch (_try_lock(myself)) {
case SUCCESS: return 0;
case DEADLOCK: return EDEADLK;
case BUSY: [[fallthrough]];
}
{
Applicant guard { *this };
_monitor().monitor(_monitor_mutex, [&] {
/* DEADLOCK already handled above - just check for SUCCESS */
return _try_lock(myself) == SUCCESS;
});
}
return 0;
}
int timedlock(timespec const &) override final
{
return ENOSYS;
}
int trylock() override final
{
switch (_try_lock(pthread_self())) {
case SUCCESS: return 0;
case DEADLOCK: return EDEADLK;
case BUSY: return EBUSY;
}
return EBUSY;
@ -339,13 +382,16 @@ struct Libc::Pthread_mutex_errorcheck : pthread_mutex
int unlock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
Lock::Guard lock_guard(_data_mutex);
if (_owner != pthread_self())
return EPERM;
_owner = nullptr;
_resume_all();
if (_applicants)
_monitor().charge_monitors();
return 0;
}
@ -356,37 +402,15 @@ struct Libc::Pthread_mutex_recursive : pthread_mutex
{
unsigned _nesting_level { 0 };
int lock() override final
{
struct Try_lock : Suspend_functor
{
bool retry { false }; /* have to try after resume */
Pthread_mutex_recursive &_mutex;
Try_lock(Pthread_mutex_recursive &mutex) : _mutex(mutex) { }
bool suspend() override
{
retry = _mutex.trylock() == EBUSY;
return retry;
}
} try_lock(*this);
do { _suspend(try_lock); } while (try_lock.retry);
return 0;
}
int trylock() override final
int _try_lock(pthread_t thread)
{
Lock::Guard lock_guard(_data_mutex);
if (!_owner) {
_owner = pthread_self();
_owner = thread;
_nesting_level = 1;
return 0;
} else if (_owner == pthread_self()) {
} else if (_owner == thread) {
++_nesting_level;
return 0;
}
@ -394,8 +418,39 @@ struct Libc::Pthread_mutex_recursive : pthread_mutex
return EBUSY;
}
int lock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
pthread_t const myself = pthread_self();
/* fast path without lock contention */
if (_try_lock(myself) == 0)
return 0;
{
Applicant guard { *this };
_monitor().monitor(_monitor_mutex,
[&] { return _try_lock(myself) == 0; });
}
return 0;
}
int timedlock(timespec const &) override final
{
return ENOSYS;
}
int trylock() override final
{
return _try_lock(pthread_self());
}
int unlock() override final
{
Lock::Guard monitor_guard(_monitor_mutex);
Lock::Guard lock_guard(_data_mutex);
if (_owner != pthread_self())
@ -404,7 +459,8 @@ struct Libc::Pthread_mutex_recursive : pthread_mutex
--_nesting_level;
if (_nesting_level == 0) {
_owner = nullptr;
_resume_all();
if (_applicants)
_monitor().charge_monitors();
}
return 0;
@ -680,6 +736,20 @@ extern "C" {
}
int pthread_mutex_timedlock(pthread_mutex_t *mutex,
struct timespec const *abstimeout)
{
if (!mutex)
return EINVAL;
if (*mutex == PTHREAD_MUTEX_INITIALIZER)
pthread_mutex_init(mutex, nullptr);
/* abstime must be non-null according to the spec */
return (*mutex)->timedlock(*abstimeout);
}
int pthread_mutex_unlock(pthread_mutex_t *mutex)
{
if (!mutex)
@ -702,13 +772,25 @@ extern "C" {
struct pthread_cond
{
int num_waiters;
int num_signallers;
Lock counter_lock;
Timed_semaphore signal_sem { _global_timeout_ep() };
Semaphore handshake_sem;
int num_waiters;
int num_signallers;
pthread_mutex_t counter_mutex;
sem_t signal_sem;
sem_t handshake_sem;
pthread_cond() : num_waiters(0), num_signallers(0) { }
pthread_cond() : num_waiters(0), num_signallers(0)
{
pthread_mutex_init(&counter_mutex, nullptr);
sem_init(&signal_sem, 0, 0);
sem_init(&handshake_sem, 0, 0);
}
~pthread_cond()
{
sem_destroy(&handshake_sem);
sem_destroy(&signal_sem);
pthread_mutex_destroy(&counter_mutex);
}
};
@ -783,47 +865,6 @@ extern "C" {
}
static uint64_t timeout_ms(struct timespec currtime,
struct timespec abstimeout)
{
enum { S_IN_MS = 1000, S_IN_NS = 1000 * 1000 * 1000 };
if (currtime.tv_nsec >= S_IN_NS) {
currtime.tv_sec += currtime.tv_nsec / S_IN_NS;
currtime.tv_nsec = currtime.tv_nsec % S_IN_NS;
}
if (abstimeout.tv_nsec >= S_IN_NS) {
abstimeout.tv_sec += abstimeout.tv_nsec / S_IN_NS;
abstimeout.tv_nsec = abstimeout.tv_nsec % S_IN_NS;
}
/* check whether absolute timeout is in the past */
if (currtime.tv_sec > abstimeout.tv_sec)
return 0;
uint64_t diff_ms = (abstimeout.tv_sec - currtime.tv_sec) * S_IN_MS;
uint64_t diff_ns = 0;
if (abstimeout.tv_nsec >= currtime.tv_nsec)
diff_ns = abstimeout.tv_nsec - currtime.tv_nsec;
else {
/* check whether absolute timeout is in the past */
if (diff_ms == 0)
return 0;
diff_ns = S_IN_NS - currtime.tv_nsec + abstimeout.tv_nsec;
diff_ms -= S_IN_MS;
}
diff_ms += diff_ns / 1000 / 1000;
/* if there is any diff then let the timeout be at least 1 MS */
if (diff_ms == 0 && diff_ns != 0)
return 1;
return diff_ms;
}
int pthread_cond_timedwait(pthread_cond_t *__restrict cond,
pthread_mutex_t *__restrict mutex,
const struct timespec *__restrict abstime)
@ -838,39 +879,29 @@ extern "C" {
pthread_cond *c = *cond;
c->counter_lock.lock();
pthread_mutex_lock(&c->counter_mutex);
c->num_waiters++;
c->counter_lock.unlock();
pthread_mutex_unlock(&c->counter_mutex);
pthread_mutex_unlock(mutex);
if (!abstime)
c->signal_sem.down();
else {
struct timespec currtime;
clock_gettime(CLOCK_REALTIME, &currtime);
Alarm::Time timeout = timeout_ms(currtime, *abstime);
try {
c->signal_sem.down(timeout);
} catch (Timeout_exception) {
result = ETIMEDOUT;
} catch (Nonblocking_exception) {
errno = ETIMEDOUT;
result = ETIMEDOUT;
}
if (!abstime) {
if (sem_wait(&c->signal_sem) == -1)
result = errno;
} else {
if (sem_timedwait(&c->signal_sem, abstime) == -1)
result = errno;
}
c->counter_lock.lock();
pthread_mutex_lock(&c->counter_mutex);
if (c->num_signallers > 0) {
if (result == ETIMEDOUT) /* timeout occured */
c->signal_sem.down();
c->handshake_sem.up();
sem_wait(&c->signal_sem);
sem_post(&c->handshake_sem);
--c->num_signallers;
}
c->num_waiters--;
c->counter_lock.unlock();
pthread_mutex_unlock(&c->counter_mutex);
pthread_mutex_lock(mutex);
@ -881,7 +912,7 @@ extern "C" {
int pthread_cond_wait(pthread_cond_t *__restrict cond,
pthread_mutex_t *__restrict mutex)
{
return pthread_cond_timedwait(cond, mutex, 0);
return pthread_cond_timedwait(cond, mutex, nullptr);
}
@ -892,16 +923,16 @@ extern "C" {
pthread_cond *c = *cond;
c->counter_lock.lock();
pthread_mutex_lock(&c->counter_mutex);
if (c->num_waiters > c->num_signallers) {
++c->num_signallers;
c->signal_sem.up();
c->counter_lock.unlock();
c->handshake_sem.down();
++c->num_signallers;
sem_post(&c->signal_sem);
pthread_mutex_unlock(&c->counter_mutex);
sem_wait(&c->handshake_sem);
} else
c->counter_lock.unlock();
pthread_mutex_unlock(&c->counter_mutex);
return 0;
return 0;
}
@ -912,17 +943,17 @@ extern "C" {
pthread_cond *c = *cond;
c->counter_lock.lock();
pthread_mutex_lock(&c->counter_mutex);
if (c->num_waiters > c->num_signallers) {
int still_waiting = c->num_waiters - c->num_signallers;
c->num_signallers = c->num_waiters;
for (int i = 0; i < still_waiting; i++)
c->signal_sem.up();
c->counter_lock.unlock();
sem_post(&c->signal_sem);
pthread_mutex_unlock(&c->counter_mutex);
for (int i = 0; i < still_waiting; i++)
c->handshake_sem.down();
sem_wait(&c->handshake_sem);
} else
c->counter_lock.unlock();
pthread_mutex_unlock(&c->counter_mutex);
return 0;
}

View File

@ -1,12 +1,13 @@
/*
* \brief POSIX semaphore implementation
* \author Christian Prochaska
* \author Christian Helmuth
* \date 2012-03-12
*
*/
/*
* Copyright (C) 2012-2017 Genode Labs GmbH
* Copyright (C) 2012-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -16,32 +17,154 @@
#include <base/log.h>
#include <base/semaphore.h>
#include <semaphore.h>
/* libc includes */
#include <libc/allocator.h>
/* libc includes */
#include <errno.h>
#include <time.h>
/* libc-internal includes */
#include <internal/monitor.h>
#include <internal/errno.h>
#include <internal/types.h>
#include <internal/time.h>
#include <internal/init.h>
using namespace Libc;
static Monitor *_monitor_ptr;
void Libc::init_semaphore_support(Monitor &monitor)
{
_monitor_ptr = &monitor;
}
extern "C" {
/*
* This class is named 'struct sem' because the 'sem_t' type is
* defined as 'struct sem*' in 'semaphore.h'
*/
struct sem : Genode::Semaphore
struct sem
{
sem(int value) : Semaphore(value) { }
int _count;
unsigned _applicants { 0 };
Lock _data_mutex;
Lock _monitor_mutex;
struct Missing_call_of_init_pthread_support : Exception { };
struct Applicant
{
sem &s;
Applicant(sem &s) : s(s)
{
Lock::Guard lock_guard(s._data_mutex);
++s._applicants;
}
~Applicant()
{
Lock::Guard lock_guard(s._data_mutex);
--s._applicants;
}
};
Monitor & _monitor()
{
if (!_monitor_ptr)
throw Missing_call_of_init_pthread_support();
return *_monitor_ptr;
}
sem(int value) : _count(value) { }
int trydown()
{
Lock::Guard lock_guard(_data_mutex);
if (_count > 0) {
_count--;
return 0;
}
return EBUSY;
}
int down()
{
Lock::Guard monitor_guard(_monitor_mutex);
/* fast path without contention */
if (trydown() == 0)
return 0;
{
Applicant guard { *this };
auto fn = [&] { return trydown() == 0; };
(void)_monitor().monitor(_monitor_mutex, fn);
}
return 0;
}
int down_timed(timespec const &abs_timeout)
{
Lock::Guard monitor_guard(_monitor_mutex);
/* fast path without wait - does not check abstimeout according to spec */
if (trydown() == 0)
return 0;
timespec abs_now;
clock_gettime(CLOCK_REALTIME, &abs_now);
uint64_t const timeout_ms = calculate_relative_timeout_ms(abs_now, abs_timeout);
if (!timeout_ms)
return ETIMEDOUT;
{
Applicant guard { *this };
auto fn = [&] { return trydown() == 0; };
if (_monitor().monitor(_monitor_mutex, fn, timeout_ms))
return 0;
else
return ETIMEDOUT;
}
}
int up()
{
Lock::Guard monitor_guard(_monitor_mutex);
Lock::Guard lock_guard(_data_mutex);
_count++;
if (_applicants)
_monitor().charge_monitors();
return 0;
}
int count()
{
return _count;
}
};
int sem_close(sem_t *)
{
warning(__func__, " not implemented");
return -1;
return Errno(ENOSYS);
}
@ -55,7 +178,7 @@ extern "C" {
int sem_getvalue(sem_t * __restrict sem, int * __restrict sval)
{
*sval = (*sem)->cnt();
*sval = (*sem)->count();
return 0;
}
@ -77,35 +200,44 @@ extern "C" {
int sem_post(sem_t *sem)
{
(*sem)->up();
if (int res = (*sem)->up())
return Errno(res);
return 0;
}
int sem_timedwait(sem_t * __restrict, const struct timespec * __restrict)
int sem_timedwait(sem_t * __restrict sem, const struct timespec * __restrict abstime)
{
warning(__func__, " not implemented");
return -1;
/* abstime must be non-null according to the spec */
if (int res = (*sem)->down_timed(*abstime))
return Errno(res);
return 0;
}
int sem_trywait(sem_t *)
int sem_trywait(sem_t *sem)
{
warning(__func__, " not implemented");
return -1;
if (int res = (*sem)->trydown())
return Errno(res);
return 0;
}
int sem_unlink(const char *)
{
warning(__func__, " not implemented");
return -1;
return Errno(ENOSYS);
}
int sem_wait(sem_t *sem)
{
(*sem)->down();
if (int res = (*sem)->down())
return Errno(res);
return 0;
}

View File

@ -6,7 +6,7 @@
*/
/*
* Copyright (C) 2012-2019 Genode Labs GmbH
* Copyright (C) 2012-2020 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
@ -19,9 +19,12 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
/* Genode includes */
#include <base/log.h>
#include <base/sleep.h>
#include <base/thread.h>
@ -49,10 +52,14 @@ static void *thread_func(void *arg)
sem_post(&thread_args->thread_finished_sem);
/* sleep forever */
sem_t sleep_sem;
sem_init(&sleep_sem, 0, 0);
sem_wait(&sleep_sem);
/*
* sleep forever
*
* The thread is going to be cancelled, but cancellation points are
* not implemented yet in most blocking libc functions, so
* Genode::sleep_forever() is called here for now.
*/
Genode::sleep_forever();
return 0;
}
@ -115,10 +122,29 @@ static inline void compare_semaphore_values(int reported_value, int expected_val
}
static timespec add_timespec(timespec const &a, timespec const &b)
{
enum { NSEC_PER_SEC = 1'000'000'000ull };
long sec = a.tv_sec + b.tv_sec;
long nsec = a.tv_nsec + b.tv_nsec;
while (nsec >= NSEC_PER_SEC) {
nsec -= NSEC_PER_SEC;
sec++;
}
return timespec { sec, nsec };
}
static timespec add_timespec_ms(timespec const &a, long msec)
{
return add_timespec(a, timespec { 0, msec*1'000'000 });
}
struct Test_mutex_data
{
sem_t main_thread_ready_sem;
sem_t test_thread_ready_sem;
pthread_mutex_t normal_mutex;
pthread_mutex_t recursive_mutex;
pthread_mutex_t errorcheck_mutex;
@ -127,6 +153,8 @@ struct Test_mutex_data
sem_init(&main_thread_ready_sem, 0, 0);
sem_init(&test_thread_ready_sem, 0, 0);
pthread_mutex_init(&normal_mutex, nullptr);
pthread_mutexattr_t recursive_mutex_attr;
pthread_mutexattr_init(&recursive_mutex_attr);
pthread_mutexattr_settype(&recursive_mutex_attr, PTHREAD_MUTEX_RECURSIVE);
@ -144,6 +172,7 @@ struct Test_mutex_data
{
pthread_mutex_destroy(&errorcheck_mutex);
pthread_mutex_destroy(&recursive_mutex);
pthread_mutex_destroy(&normal_mutex);
}
};
@ -271,6 +300,54 @@ static void *thread_mutex_func(void *arg)
/* wake up main thread */
sem_post(&test_mutex_data->test_thread_ready_sem);
/* wait for main thread */
sem_wait(&test_mutex_data->main_thread_ready_sem);
/************************************
** test normal mutex with timeout **
************************************/
timespec abstimeout { 0, 0 };
/* lock normal mutex */
if (pthread_mutex_lock(&test_mutex_data->normal_mutex) != 0) {
printf("Error: could not lock normal mutex\n");
exit(-1);
}
/* wake up main thread */
sem_post(&test_mutex_data->test_thread_ready_sem);
/* wait for main thread */
sem_wait(&test_mutex_data->main_thread_ready_sem);
/* unlock normal mutex */
if (pthread_mutex_unlock(&test_mutex_data->normal_mutex) != 0) {
printf("Error: could not lock normal mutex\n");
exit(-1);
}
/* wake up main thread */
sem_post(&test_mutex_data->test_thread_ready_sem);
/* wait for main thread */
sem_wait(&test_mutex_data->main_thread_ready_sem);
/* try to lock locked mutex with timeout */
clock_gettime(CLOCK_REALTIME, &abstimeout);
abstimeout = add_timespec_ms(abstimeout, 500);
if (pthread_mutex_timedlock(&test_mutex_data->normal_mutex, &abstimeout) != ETIMEDOUT) {
printf("Error: locking of normal mutex did not time out in test thread\n");
exit(-1);
}
/* wake up main thread */
sem_post(&test_mutex_data->test_thread_ready_sem);
return nullptr;
}
@ -336,6 +413,47 @@ static void test_mutex()
exit(-1);
}
/************************************
** test normal mutex with timeout **
************************************/
timespec abstimeout { 0, 0 };
/* wake up test thread */
sem_post(&test_mutex_data.main_thread_ready_sem);
/* wait for test thread - normal mutex should still be locked */
sem_wait(&test_mutex_data.test_thread_ready_sem);
/* try to lock locked mutex with timeout */
clock_gettime(CLOCK_REALTIME, &abstimeout);
abstimeout = add_timespec_ms(abstimeout, 500);
if (pthread_mutex_timedlock(&test_mutex_data.normal_mutex, &abstimeout) != ETIMEDOUT) {
printf("Error: locking of normal mutex did not time out in main thread\n");
exit(-1);
}
/* wake up test thread */
sem_post(&test_mutex_data.main_thread_ready_sem);
/* wait for test thread - normal mutex should still be locked */
sem_wait(&test_mutex_data.test_thread_ready_sem);
/* lock normal mutex */
if (pthread_mutex_lock(&test_mutex_data.normal_mutex) != 0) {
printf("Error: could not lock normal mutex\n");
exit(-1);
}
/* wake up test thread */
sem_post(&test_mutex_data.main_thread_ready_sem);
/* wait for test thread - normal mutex should still be locked */
sem_wait(&test_mutex_data.test_thread_ready_sem);
pthread_join(t, NULL);
}
@ -542,6 +660,267 @@ static void test_lock_and_sleep()
}
struct Cond
{
pthread_cond_t _cond;
Cond() { pthread_cond_init(&_cond, nullptr); }
~Cond() { pthread_cond_destroy(&_cond); }
pthread_cond_t * cond() { return &_cond; }
};
struct Test_cond
{
Mutex<PTHREAD_MUTEX_NORMAL> _mutex;
Cond _cond;
enum class State {
PING, PONG, SHUTDOWN, END
} _shared_state { State::PING };
static void *signaller_fn(void *arg)
{
((Test_cond *)arg)->signaller();
return nullptr;
}
void signaller()
{
printf("signaller: started\n");
unsigned num_events = 0;
bool test_done = false;
while (!test_done) {
pthread_mutex_lock(_mutex.mutex());
switch (_shared_state) {
case State::PING:
_shared_state = State::PONG;
++num_events;
pthread_cond_signal(_cond.cond());
break;
case State::PONG:
_shared_state = State::PING;
++num_events;
pthread_cond_signal(_cond.cond());
break;
case State::SHUTDOWN:
printf("signaller: shutting down\n");
_shared_state = State::END;
++num_events;
pthread_cond_broadcast(_cond.cond());
test_done = true;
break;
case State::END:
break;
}
pthread_mutex_unlock(_mutex.mutex());
usleep(1000);
}
printf("signaller: finished after %u state changes\n", num_events);
}
static void *waiter_fn(void *arg)
{
((Test_cond *)arg)->waiter();
return nullptr;
}
void waiter(bool main_thread = false)
{
char const * const note = main_thread ? "(main thread)" : "";
printf("waiter%s: started\n", note);
unsigned pings = 0, pongs = 0;
unsigned long iterations = 0;
bool test_done = false;
while (!test_done) {
pthread_mutex_lock(_mutex.mutex());
auto handle_state = [&] {
unsigned const num_events = pings + pongs;
if (num_events == 2000) {
printf("waiter%s: request shutdown\n", note);
_shared_state = State::SHUTDOWN;
} else if (num_events % 2 == 0) {
pthread_cond_wait(_cond.cond(), _mutex.mutex());
}
};
switch (_shared_state) {
case State::PING:
++pings;
handle_state();
break;
case State::PONG:
++pongs;
handle_state();
break;
case State::SHUTDOWN:
pthread_cond_wait(_cond.cond(), _mutex.mutex());
break;
case State::END:
test_done = true;
break;
}
pthread_mutex_unlock(_mutex.mutex());
usleep(3000);
++iterations;
}
printf("waiter%s: finished (pings=%u, pongs=%u, iterations=%lu)\n",
note, pings, pongs, iterations);
}
Test_cond()
{
printf("main thread: test without timeouts\n");
pthread_t signaller_id;
if (pthread_create(&signaller_id, 0, signaller_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
pthread_t waiter1_id;
if (pthread_create(&waiter1_id, 0, waiter_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
pthread_t waiter2_id;
if (pthread_create(&waiter2_id, 0, waiter_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
waiter(true);
pthread_join(signaller_id, nullptr);
pthread_join(waiter1_id, nullptr);
pthread_join(waiter2_id, nullptr);
}
};
struct Test_cond_timed
{
Mutex<PTHREAD_MUTEX_NORMAL> _mutex;
Cond _cond;
enum class State { RUN, END } _shared_state { State::RUN };
enum { ROUNDS = 10 };
static void *signaller_fn(void *arg)
{
((Test_cond_timed *)arg)->signaller();
return nullptr;
}
void signaller()
{
printf("signaller: started\n");
bool loop = true;
for (unsigned i = 1; loop; ++i) {
usleep(249*1000);
pthread_mutex_lock(_mutex.mutex());
if (i == ROUNDS) {
_shared_state = State::END;
loop = false;
}
pthread_cond_broadcast(_cond.cond());
pthread_mutex_unlock(_mutex.mutex());
}
printf("signaller: finished\n");
}
static void *waiter_fn(void *arg)
{
((Test_cond_timed *)arg)->waiter();
return nullptr;
}
void waiter(bool main_thread = false)
{
char const * const note = main_thread ? "(main thread)" : "";
printf("waiter%s: started\n", note);
for (bool loop = true; loop; ) {
pthread_mutex_lock(_mutex.mutex());
timespec ts { 0, 0 };
clock_gettime(CLOCK_REALTIME, &ts);
int ret;
do {
if (_shared_state == State::END) {
loop = false;
break;
}
ts = add_timespec_ms(ts, 250);
ret = pthread_cond_timedwait(_cond.cond(), _mutex.mutex(), &ts);
if (ret)
printf("waiter%s: pthread_cond_timedwait: %s\n", note, strerror(ret));
} while (ret != 0);
pthread_mutex_unlock(_mutex.mutex());
}
printf("waiter%s: finished\n", note);
}
Test_cond_timed()
{
printf("main thread: test with timeouts\n");
pthread_t signaller_id;
if (pthread_create(&signaller_id, 0, signaller_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
pthread_t waiter1_id;
if (pthread_create(&waiter1_id, 0, waiter_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
pthread_t waiter2_id;
if (pthread_create(&waiter2_id, 0, waiter_fn, this) != 0) {
printf("error: pthread_create() failed\n");
exit(-1);
}
waiter(true);
pthread_join(signaller_id, nullptr);
pthread_join(waiter1_id, nullptr);
pthread_join(waiter2_id, nullptr);
}
};
static void test_cond()
{
printf("main thread: test condition variables\n");
{ Test_cond test; }
{ Test_cond_timed test; }
}
static void test_interplay()
{
enum { NUM_THREADS = 2 };
@ -635,6 +1014,7 @@ int main(int argc, char **argv)
test_mutex();
test_mutex_stress();
test_lock_and_sleep();
test_cond();
printf("--- returning from main ---\n");
return 0;