Make empth_wakeup() and empth_terminate() wake up empth_sleep(), and

empth_sleep() return whether that happened:
[EMPTH_LWP] (lwpWakeupSleep): New, factored out of lwpSelect().
[EMPTH_LWP] (lwpSelect): Use it.
[EMPTH_LWP] (lwpWakeup): New.  Call lwpWakeupFd() if sleeping in
lwpSleepFd(), lwpWakeupSleep() if sleeping in lwpSleepUntil().
[EMPTH_LWP] (lwpTerminate, empth_wakeup): Use it rather than
lwpWakeupFd().
[EMPTH_LWP] (lwpWakeupFd): Internal linkage.
[EMPTH_LWP] (lwpSleepUntil): Reset member runtime, so that lwpWakeup()
can test it reliably.  Return how sleep woke up.
[EMPTH_LWP] (empth_sleep): Return value of lwpSleepUntil().
[EMPTH_POSIX] (EMPTH_INTR): New.
[EMPTH_POSIX] (empth_wakeup): Set state to it.
[EMPTH_POSIX] (empth_restorectx): Clear state.
[EMPTH_POSIX] (empth_sleep): Don't re-seleep when state is not clear,
i.e. thread was woken up prematurely.  Return how sleep woke up.
[EMPTH_W32] (empth_sleep): Implement by waiting on hThreadEvent with a
timeout rather than a straight Sleep().  Return how sleep woke up.
This commit is contained in:
Markus Armbruster 2007-02-08 11:26:43 +00:00
parent fe2de3d743
commit cea39829af
7 changed files with 80 additions and 45 deletions

View file

@ -181,7 +181,7 @@ void empth_terminate(empth_t *thread);
void empth_select(int fd, int flags); void empth_select(int fd, int flags);
/* /*
* Awaken THREAD if it is sleeping in empth_select(). * Awaken THREAD if it is sleeping in empth_select() or empth_sleep().
* Note: This must not awaken threads sleeping in other functions. * Note: This must not awaken threads sleeping in other functions.
* Does not yield the processor. * Does not yield the processor.
*/ */
@ -189,9 +189,10 @@ void empth_wakeup(empth_t *thread);
/* /*
* Put current thread to sleep until the time is UNTIL. * Put current thread to sleep until the time is UNTIL.
* May sleep somehwat longer, but never shorter. * Return 0 if it slept until that time.
* Return -1 if woken up early, by empth_wakeup().
*/ */
void empth_sleep(time_t until); int empth_sleep(time_t until);
/* /*
* Wait for signal, return the signal number. * Wait for signal, return the signal number.

View file

@ -47,8 +47,8 @@ void lwpExit(void);
void lwpTerminate(struct lwpProc * p); void lwpTerminate(struct lwpProc * p);
void lwpYield(void); void lwpYield(void);
void lwpSleepFd(int fd, int flags); void lwpSleepFd(int fd, int flags);
void lwpSleepUntil(time_t until); int lwpSleepUntil(time_t until);
void lwpWakeupFd(struct lwpProc * p); void lwpWakeup(struct lwpProc *);
int lwpSigWait(sigset_t *set, int *sig); int lwpSigWait(sigset_t *set, int *sig);
void *lwpGetUD(struct lwpProc * p); void *lwpGetUD(struct lwpProc * p);
void lwpSetUD(struct lwpProc * p, char *ud); void lwpSetUD(struct lwpProc * p, char *ud);

View file

@ -101,13 +101,13 @@ empth_select(int fd, int flags)
void void
empth_wakeup(empth_t *a) empth_wakeup(empth_t *a)
{ {
lwpWakeupFd(a); lwpWakeup(a);
} }
void int
empth_sleep(time_t until) empth_sleep(time_t until)
{ {
lwpSleepUntil(until); return lwpSleepUntil(until);
} }
int int

View file

@ -599,21 +599,25 @@ empth_wakeup(empth_t *pThread)
* *
* Put the given thread to sleep... * Put the given thread to sleep...
*/ */
void int
empth_sleep(time_t until) empth_sleep(time_t until)
{ {
long lSec; long lSec;
empth_t *pThread = TlsGetValue(dwTLSIndex);
int iReturn = 0;
loc_BlockThisThread(); if ((lSec = until - time(0)) > 0) {
loc_BlockThisThread();
loc_debug("going to sleep %ld sec", lSec);
while ((lSec = until - time(0)) > 0) { if (WaitForSingleObject(pThread->hThreadEvent, lSec * 1000L) !=
loc_debug("going to sleep %ld sec", lSec); WAIT_TIMEOUT)
Sleep(lSec * 1000L); iReturn = -1;
loc_debug("sleep done. Waiting to run.");
loc_RunThisThread(NULL);
} }
return iReturn;
loc_debug("sleep done. Waiting to run.");
loc_RunThisThread(NULL);
} }
/************************ /************************

View file

@ -58,6 +58,7 @@
#include "prototypes.h" #include "prototypes.h"
#define EMPTH_KILLED 1 #define EMPTH_KILLED 1
#define EMPTH_INTR 2
struct empth_t { struct empth_t {
char *name; /* thread name */ char *name; /* thread name */
@ -252,6 +253,7 @@ empth_restorectx(void)
empth_status("i am dead"); empth_status("i am dead");
empth_exit(); empth_exit();
} }
ctx_ptr->state = 0;
empth_status("context restored"); empth_status("context restored");
} }
@ -355,33 +357,37 @@ empth_alarm(int sig)
{ {
/* /*
* Nothing to do --- we handle this signal just to let * Nothing to do --- we handle this signal just to let
* empth_wakeup() interrupt system calls. * empth_wakeup() and empth_terminate() interrupt system calls.
*/ */
empth_status("got alarm signal");
} }
void void
empth_wakeup(empth_t *a) empth_wakeup(empth_t *a)
{ {
empth_status("waking up thread %s", a->name); empth_status("waking up thread %s", a->name);
if (a->state == 0)
a->state = EMPTH_INTR;
pthread_kill(a->id, SIGALRM); pthread_kill(a->id, SIGALRM);
} }
void int
empth_sleep(time_t until) empth_sleep(time_t until)
{ {
empth_t *ctx = pthread_getspecific(ctx_key);
struct timeval tv; struct timeval tv;
int res;
empth_status("going to sleep %ld sec", until - time(0)); empth_status("going to sleep %ld sec", until - time(0));
pthread_mutex_unlock(&mtx_ctxsw); pthread_mutex_unlock(&mtx_ctxsw);
tv.tv_sec = until - time(NULL);
tv.tv_usec = 0;
do { do {
select(0, NULL, NULL, NULL, &tv); tv.tv_sec = until - time(NULL);
} while ((tv.tv_sec = until - time(NULL)) > 0); tv.tv_usec = 0;
res = select(0, NULL, NULL, NULL, &tv);
} while (res < 0 && ctx->state == 0);
empth_status("sleep done. Waiting for lock"); empth_status("sleep done. Waiting for lock");
pthread_mutex_lock(&mtx_ctxsw); pthread_mutex_lock(&mtx_ctxsw);
empth_restorectx(); empth_restorectx();
return res;
} }
int int

View file

@ -29,7 +29,7 @@
* lwp.c: lightweight process creation, destruction and manipulation * lwp.c: lightweight process creation, destruction and manipulation
* *
* Known contributors to this file: * Known contributors to this file:
* Markus Armbruster, 2004-2006 * Markus Armbruster, 2004-2007
*/ */
#include <config.h> #include <config.h>
@ -145,7 +145,7 @@ lwpCreate(int priority, void (*entry)(void *), int stacksz,
newp->argv = argv; newp->argv = argv;
newp->ud = ud; newp->ud = ud;
newp->dead = 0; newp->dead = 0;
newp->runtime = -1; newp->runtime = (time_t)-1;
newp->fd = -1; newp->fd = -1;
if (LWP_MAX_PRIO <= priority) if (LWP_MAX_PRIO <= priority)
priority = LWP_MAX_PRIO - 1; priority = LWP_MAX_PRIO - 1;
@ -166,7 +166,7 @@ lwpCreate(int priority, void (*entry)(void *), int stacksz,
return newp; return newp;
} }
void static void
lwpDestroy(struct lwpProc *proc) lwpDestroy(struct lwpProc *proc)
{ {
if (proc->flags & LWP_STACKCHECK) { if (proc->flags & LWP_STACKCHECK) {
@ -244,8 +244,7 @@ lwpTerminate(struct lwpProc *p)
{ {
lwpStatus(p, "terminating process"); lwpStatus(p, "terminating process");
p->dead = 1; p->dead = 1;
if (p->fd >= 0) lwpWakeup(p);
lwpWakeupFd(p);
} }
/* /*

View file

@ -30,6 +30,7 @@
* *
* Known contributors to this file: * Known contributors to this file:
* Dave Pare, 1994 * Dave Pare, 1994
* Markus Armbruster, 2007
*/ */
#include <config.h> #include <config.h>
@ -107,7 +108,7 @@ lwpSleepFd(int fd, int mask)
lwpReschedule(); lwpReschedule();
} }
void static void
lwpWakeupFd(struct lwpProc *proc) lwpWakeupFd(struct lwpProc *proc)
{ {
if (proc->fd < 0) if (proc->fd < 0)
@ -122,9 +123,44 @@ lwpWakeupFd(struct lwpProc *proc)
lwpReady(proc); lwpReady(proc);
} }
static void
lwpWakeupSleep(void)
{
time_t now;
struct lwpQueue save;
struct lwpProc *proc;
if (LwpDelayq.head) {
now = time(NULL);
save.tail = save.head = 0;
while (NULL != (proc = lwpGetFirst(&LwpDelayq))) {
if (now >= proc->runtime) {
lwpStatus(proc, "sleep done");
lwpReady(proc);
} else {
lwpAddTail(&save, proc);
}
}
LwpDelayq = save;
}
}
void void
lwpWakeup(struct lwpProc *proc)
{
if (proc->fd >= 0)
lwpWakeupFd(proc);
else if (proc->runtime != (time_t)-1) {
proc->runtime = 0;
lwpWakeupSleep();
}
}
int
lwpSleepUntil(time_t until) lwpSleepUntil(time_t until)
{ {
int res;
lwpStatus(LwpCurrent, "sleeping for %ld sec", lwpStatus(LwpCurrent, "sleeping for %ld sec",
(long)(until - time(NULL))); (long)(until - time(NULL)));
LwpCurrent->runtime = until; LwpCurrent->runtime = until;
@ -134,6 +170,9 @@ lwpSleepUntil(time_t until)
} }
lwpAddTail(&LwpDelayq, LwpCurrent); lwpAddTail(&LwpDelayq, LwpCurrent);
lwpReschedule(); lwpReschedule();
res = LwpCurrent->runtime ? 0 : -1;
LwpCurrent->runtime = (time_t)-1;
return res;
} }
/*ARGSUSED*/ /*ARGSUSED*/
@ -149,7 +188,6 @@ lwpSelect(void *arg)
time_t delta; time_t delta;
struct lwpProc *proc; struct lwpProc *proc;
struct timeval tv; struct timeval tv;
struct lwpQueue save;
lwpStatus(us, "starting select loop"); lwpStatus(us, "starting select loop");
FD_ZERO(&readmask); FD_ZERO(&readmask);
@ -194,20 +232,7 @@ lwpSelect(void *arg)
continue; continue;
} }
if (LwpDelayq.head) { lwpWakeupSleep();
/* sleeping proecss activity */
time(&now);
save.tail = save.head = 0;
while (NULL != (proc = lwpGetFirst(&LwpDelayq))) {
if (now >= proc->runtime) {
lwpStatus(proc, "sleep done");
lwpReady(proc);
} else {
lwpAddTail(&save, proc);
}
}
LwpDelayq = save;
}
if (n > 0) { if (n > 0) {
/* file descriptor activity */ /* file descriptor activity */
for (fd = 0; fd <= LwpMaxfd; fd++) { for (fd = 0; fd <= LwpMaxfd; fd++) {