/* Map file descriptor to thread sleeping in lwpSleepFd() */
static struct lwpProc **LwpFdwait;
-/* Threads sleeping in lwpSleepUntil(), in no particular order */
+/*
+ * Threads sleeping until a wakeup time, in lwpSleepUntil() or
+ * lwpSleepFd(), in no particular order
+ */
static struct lwpQueue LwpDelayq;
/* The thread executing lwpSelect() */
LwpSelProc = proc;
}
-void
-lwpSleepFd(int fd, int mask)
+int
+lwpSleepFd(int fd, int mask, struct timeval *timeout)
{
lwpStatus(LwpCurrent, "sleeping on fd %d for %d", fd, mask);
- if (CANT_HAPPEN(fd > FD_SETSIZE))
- return;
+ if (CANT_HAPPEN(fd > FD_SETSIZE)) {
+ errno = EBADF;
+ return -1;
+ }
if (LwpFdwait[fd] != 0) {
lwpStatus(LwpCurrent,
"multiple sleeps attempted on file descriptor %d", fd);
- return;
+ errno = EBADF;
+ return -1;
}
if (mask & LWP_FD_READ)
FD_SET(fd, &LwpReadfds);
if (mask & LWP_FD_WRITE)
FD_SET(fd, &LwpWritefds);
-
LwpNfds++;
if (LwpMaxfd == 0 && LwpDelayq.head == 0) {
lwpReady(LwpSelProc);
}
lwpStatus(LwpCurrent, "going to wait on fd %d", fd);
+
+ if (timeout) {
+ LwpCurrent->runtime = time(NULL) + timeout->tv_sec +
+ (timeout->tv_usec > 0);
+ lwpAddTail(&LwpDelayq, LwpCurrent);
+ } else
+ LwpCurrent->runtime = (time_t)-1;
+
if (fd > LwpMaxfd)
LwpMaxfd = fd;
LwpFdwait[fd] = LwpCurrent;
LwpCurrent->fd = fd;
+ LwpCurrent->fd_ready = 0;
lwpReschedule();
+ return LwpCurrent->fd_ready != 0;
}
+/*
+ * Wake up PROC if it is sleeping in lwpSleepFd().
+ * Must be followed by lwpWakeupSleep() before the next lwpReschedule().
+ */
static void
lwpWakeupFd(struct lwpProc *proc)
{
- if (proc->fd < 0)
+ if (CANT_HAPPEN(proc->fd < 0 || proc->fd > LwpMaxfd))
return;
lwpStatus(proc, "awakening; was sleeping on fd %d", proc->fd);
+ if (proc->runtime != (time_t)-1) {
+ /* is in LwpDelayq; leave the job to lwpWakeupSleep() */
+ proc->runtime = 0;
+ return;
+ }
FD_CLR(proc->fd, &LwpReadfds);
FD_CLR(proc->fd, &LwpWritefds);
LwpNfds--;
lwpReady(proc);
}
+/*
+ * Wake up threads in LwpDelayq whose time has come.
+ */
void
lwpWakeupSleep(void)
{
while (NULL != (proc = lwpGetFirst(&LwpDelayq))) {
if (now >= proc->runtime) {
lwpStatus(proc, "sleep done");
- lwpReady(proc);
+ proc->runtime = (time_t)-1;
+ if (proc->fd >= 0)
+ lwpWakeupFd(proc);
+ else
+ lwpReady(proc);
} else {
lwpAddTail(&save, proc);
}
{
if (proc->fd >= 0)
lwpWakeupFd(proc);
- else if (proc->runtime != (time_t)-1) {
+ else if (proc->runtime != (time_t)-1)
proc->runtime = 0;
- lwpWakeupSleep();
- }
+ lwpWakeupSleep();
}
int
continue;
}
- lwpWakeupSleep();
if (n > 0) {
/* file descriptor activity */
for (fd = 0; fd <= LwpMaxfd; fd++) {
continue;
if (FD_ISSET(fd, &readmask)) {
lwpStatus(LwpFdwait[fd], "input ready");
+ LwpFdwait[fd]->fd_ready = 1;
lwpWakeupFd(LwpFdwait[fd]);
continue;
}
if (FD_ISSET(fd, &writemask)) {
lwpStatus(LwpFdwait[fd], "output ready");
+ LwpFdwait[fd]->fd_ready = 1;
lwpWakeupFd(LwpFdwait[fd]);
continue;
}
}
}
+ lwpWakeupSleep();
lwpStatus(us, "fd dispatch completed");
lwpReady(LwpCurrent);
lwpReschedule();