From 08b94556826ec883ab68c736ad8494118511446c Mon Sep 17 00:00:00 2001 From: Ron Koenderink Date: Sun, 1 Feb 2009 06:22:26 -0600 Subject: [PATCH] Reimplement max_idle without a separate thread Remove the KillIdle thread. Add timeout to struct iop, initialized in io_open(). Obey it in io_input() by passing it to empth_select(). If empth_select() times out, report that back through io_input() to recvclient() and player_login(). If player_login() receives a timeout indication, print a message and terminate the session. If recvclient() receives a timeout indication, flash a message to the player and initiate a shut down the player's session. Create WIN32 sys/time.h to define struct timeval. This creates some conflicts with WIN32 windows.h definitions. Including windows.h in show.c and info.c creates conflicts, so remove that. Modify service.c to include sys/socket.h instead of windows.h to remove the conflict with sys/time.h. --- include/empio.h | 3 +- include/empthread.h | 6 +- include/lwp.h | 2 +- src/lib/commands/info.c | 2 - src/lib/empthread/io.c | 42 ++++++++---- src/lib/empthread/lwp.c | 6 +- src/lib/empthread/ntthread.c | 26 +++++++- src/lib/empthread/pthread.c | 79 +++++++++++------------ src/lib/lwp/lwpint.h | 1 + src/lib/lwp/sel.c | 58 +++++++++++++---- src/lib/player/accept.c | 8 ++- src/lib/player/login.c | 11 +++- src/lib/player/recvclient.c | 9 ++- src/lib/subs/show.c | 4 -- src/lib/w32/service.c | 7 +- src/{server/idle.c => lib/w32/sys/time.h} | 45 ++----------- src/server/main.c | 1 - 17 files changed, 179 insertions(+), 131 deletions(-) rename src/{server/idle.c => lib/w32/sys/time.h} (60%) diff --git a/include/empio.h b/include/empio.h index 2ca28badc..1fadde411 100644 --- a/include/empio.h +++ b/include/empio.h @@ -33,6 +33,7 @@ #ifndef EMPIO_H #define EMPIO_H +#include #define IO_READ 0x1 #define IO_WRITE 0x2 @@ -46,7 +47,7 @@ #define IO_NOWAIT 0 #define IO_WAIT 1 -extern struct iop *io_open(int, int, int); +extern struct iop *io_open(int, int, int, struct timeval); extern void io_init(void); extern int io_noblocking(struct iop *, int); extern void io_close(struct iop *); diff --git a/include/empthread.h b/include/empthread.h index dc7fd3b80..75639b0eb 100644 --- a/include/empthread.h +++ b/include/empthread.h @@ -48,6 +48,7 @@ #ifndef EMPTHREAD_H #define EMPTHREAD_H +#include #include #ifdef EMPTH_LWP @@ -168,10 +169,13 @@ void empth_terminate(empth_t *thread); * If FLAGS & EMPTH_FD_READ, wake up if FD is ready for input. * If FLAGS & EMPTH_FD_WRITE, wake up if FD is ready for output. * At most one thread may sleep on the same file descriptor. + * TIMEOUT, if non-null, limits the sleep time. + * Return one when the FD is ready, zero on timeout, -1 on error with + * errno set. * Note: Currently, Empire sleeps only on network I/O, i.e. FD is a * socket. Implementations should not rely on that. */ -void empth_select(int fd, int flags); +int empth_select(int fd, int flags, struct timeval *timeout); /* * Awaken THREAD if it is sleeping in empth_select() or empth_sleep(). diff --git a/include/lwp.h b/include/lwp.h index d63f4f8fc..902243656 100644 --- a/include/lwp.h +++ b/include/lwp.h @@ -45,7 +45,7 @@ struct lwpProc *lwpCreate(int prio, void (*)(void *), int size, void lwpExit(void); void lwpTerminate(struct lwpProc * p); void lwpYield(void); -void lwpSleepFd(int fd, int flags); +int lwpSleepFd(int fd, int flags, struct timeval *timeout); int lwpSleepUntil(time_t until); void lwpWakeup(struct lwpProc *); int lwpSigWait(sigset_t *set, int *sig); diff --git a/src/lib/commands/info.c b/src/lib/commands/info.c index 3a2a43bb5..1e9318365 100644 --- a/src/lib/commands/info.c +++ b/src/lib/commands/info.c @@ -43,8 +43,6 @@ #include #if !defined(_WIN32) #include -#else -#include #endif #include "commands.h" #include "optlist.h" diff --git a/src/lib/empthread/io.c b/src/lib/empthread/io.c index 9b629e244..85cad537d 100644 --- a/src/lib/empthread/io.c +++ b/src/lib/empthread/io.c @@ -61,6 +61,7 @@ struct iop { struct ioqueue *output; int flags; int bufsize; + struct timeval input_timeout; }; void @@ -69,7 +70,7 @@ io_init(void) } struct iop * -io_open(int fd, int flags, int bufsize) +io_open(int fd, int flags, int bufsize, struct timeval timeout) { struct iop *iop; @@ -83,6 +84,7 @@ io_open(int fd, int flags, int bufsize) iop->input = 0; iop->output = 0; iop->flags = 0; + iop->input_timeout = timeout; iop->bufsize = bufsize; if ((flags & IO_READ) && (flags & IO_NEWSOCK) == 0) iop->input = ioq_create(bufsize); @@ -106,31 +108,47 @@ io_close(struct iop *iop) free(iop); } +/* + * Return number of bytes read on success, zero on timeout or EOF, -1 + * on error, with errno set appropriately. In particular, return -1 + * with errno set to EAGAIN or EWOULDBLOCK when no data is available + * for non-blocking input (WAITFORINPUT false). Use io_eof() to + * distinguish timeout from EOF. + */ int io_input(struct iop *iop, int waitforinput) { char buf[IO_BUFSIZE]; int cc; + int res; + struct timeval timeout = iop->input_timeout; /* Not a read IOP */ - if ((iop->flags & IO_READ) == 0) + if ((iop->flags & IO_READ) == 0) { + errno = EBADF; return -1; + } /* IOP is markes as in error. */ - if (iop->flags & IO_ERROR) + if (iop->flags & IO_ERROR) { + errno = EBADF; return -1; + } /* Wait for the file to have input. */ if (waitforinput) { - empth_select(iop->fd, EMPTH_FD_READ); + res = empth_select(iop->fd, EMPTH_FD_READ, &timeout); + if (res < 0) { + iop->flags |= IO_ERROR; + return -1; + } else if (res == 0) + return 0; } + /* Do the actual read. */ cc = read(iop->fd, buf, sizeof(buf)); if (cc < 0) { - /* would block, so nothing to read. */ - if (errno == EAGAIN || errno == EWOULDBLOCK) - return 0; - - /* Some form of file error occurred... */ - iop->flags |= IO_ERROR; + if (errno != EAGAIN && errno != EWOULDBLOCK) + /* Some form of file error occurred... */ + iop->flags |= IO_ERROR; return -1; } @@ -190,7 +208,7 @@ io_output(struct iop *iop, int waitforoutput) if (waitforoutput != IO_NOWAIT) { /* This waits for the file to be ready for writing, */ /* and lets other threads run. */ - empth_select(iop->fd, EMPTH_FD_WRITE); + empth_select(iop->fd, EMPTH_FD_WRITE, NULL); } /* Do the actual write. */ @@ -272,7 +290,7 @@ io_output_all(struct iop *iop) * a malicous player could delay the update indefinitely */ while ((n = io_output(iop, IO_NOWAIT)) > 0 && !play_wrlock_wanted) - empth_select(iop->fd, EMPTH_FD_WRITE); + empth_select(iop->fd, EMPTH_FD_WRITE, NULL); return n; } diff --git a/src/lib/empthread/lwp.c b/src/lib/empthread/lwp.c index 190cc11cc..b55f29bdc 100644 --- a/src/lib/empthread/lwp.c +++ b/src/lib/empthread/lwp.c @@ -104,10 +104,10 @@ empth_terminate(empth_t *a) lwpTerminate(a); } -void -empth_select(int fd, int flags) +int +empth_select(int fd, int flags, struct timeval *timeout) { - lwpSleepFd(fd, flags); + return lwpSleepFd(fd, flags, timeout); } void diff --git a/src/lib/empthread/ntthread.c b/src/lib/empthread/ntthread.c index 68bd3ffb1..1279f732a 100644 --- a/src/lib/empthread/ntthread.c +++ b/src/lib/empthread/ntthread.c @@ -560,12 +560,14 @@ empth_terminate(empth_t *pThread) * * This would be one of the main functions used within gen\io.c */ -void -empth_select(int fd, int flags) +int +empth_select(int fd, int flags, struct timeval *timeout) { int handle; WSAEVENT hEventObject[2]; + DWORD result, msec; empth_t *pThread = TlsGetValue(dwTLSIndex); + int res; loc_debug("%s select on %d", flags == EMPTH_FD_READ ? "read" : "write", fd); @@ -586,13 +588,31 @@ empth_select(int fd, int flags) empth_exit(); } - WSAWaitForMultipleEvents(2, hEventObject, FALSE, WSA_INFINITE, FALSE); + if (timeout) + msec = timeout->tv_sec * 1000L + timeout->tv_usec / 1000L; + else + msec = WSA_INFINITE; + result = WSAWaitForMultipleEvents(2, hEventObject, FALSE, msec, + FALSE); + + switch (result) { + case WSA_WAIT_TIMEOUT: + res = 0; + break; + case WSA_WAIT_FAILED: + errno = WSAGetLastError(); + res = -1; + break; + default: + res = 1; + } WSAEventSelect(handle, hEventObject[0], 0); WSACloseEvent(hEventObject[0]); loc_RunThisThread(NULL); + return res; } /************************ diff --git a/src/lib/empthread/pthread.c b/src/lib/empthread/pthread.c index aa8b56949..57a887ff8 100644 --- a/src/lib/empthread/pthread.c +++ b/src/lib/empthread/pthread.c @@ -289,65 +289,58 @@ empth_terminate(empth_t *a) pthread_kill(a->id, SIGALRM); } -void -empth_select(int fd, int flags) +int +empth_select(int fd, int flags, struct timeval *tv) { - fd_set readmask; fd_set writemask; - struct timeval tv; int n; + int res = 0; pthread_mutex_unlock(&mtx_ctxsw); empth_status("%s select on %d", flags == EMPTH_FD_READ ? "read" : "write", fd); - while (1) { - tv.tv_sec = 1000000; - tv.tv_usec = 0; - FD_ZERO(&readmask); - FD_ZERO(&writemask); - - switch (flags) { - case EMPTH_FD_READ: - FD_SET(fd, &readmask); - break; - case EMPTH_FD_WRITE: - FD_SET(fd, &writemask); - break; - default: - logerror("bad flag %d passed to empth_select", flags); - empth_exit(); - } + FD_ZERO(&readmask); + FD_ZERO(&writemask); + + switch (flags) { + case EMPTH_FD_READ: + FD_SET(fd, &readmask); + break; + case EMPTH_FD_WRITE: + FD_SET(fd, &writemask); + break; + default: + CANT_REACH(); + errno = EINVAL; + res = -1; + goto done; + } - n = select(fd + 1, &readmask, &writemask, (fd_set *) 0, &tv); + n = select(fd + 1, &readmask, &writemask, (fd_set *) 0, tv); - if (n < 0) { - if (errno == EINTR) { - /* go handle the signal */ - empth_status("select broken by signal"); - goto done; - return; - } - /* strange but we dont get EINTR on select broken by signal */ + if (n < 0) { + if (errno == EINTR) /* go handle the signal */ + empth_status("select broken by signal"); + else empth_status("select failed (%s)", strerror(errno)); - goto done; - return; - } - - if (flags == EMPTH_FD_READ && FD_ISSET(fd, &readmask)) { - empth_status("input ready"); - break; - } - if (flags == EMPTH_FD_WRITE && FD_ISSET(fd, &writemask)) { - empth_status("output ready"); - break; - } + res = -1; + } else if (n == 0) { + empth_status("select timed out"); + res = 0; + } else if (flags == EMPTH_FD_READ && FD_ISSET(fd, &readmask)) { + empth_status("input ready"); + res = 1; + } else if (flags == EMPTH_FD_WRITE && FD_ISSET(fd, &writemask)) { + empth_status("output ready"); + res = 1; } - done: +done: pthread_mutex_lock(&mtx_ctxsw); empth_restorectx(); + return res; } static void diff --git a/src/lib/lwp/lwpint.h b/src/lib/lwp/lwpint.h index 5b36077e5..7525fd60e 100644 --- a/src/lib/lwp/lwpint.h +++ b/src/lib/lwp/lwpint.h @@ -52,6 +52,7 @@ struct lwpProc { int pri; /* which scheduling queue we're on */ time_t runtime; /* time at which process is restarted */ int fd; /* fd we're blocking on */ + int fd_ready; /* woken up because fd was ready */ int argc; /* initial arguments */ char **argv; void *ud; /* user data */ diff --git a/src/lib/lwp/sel.c b/src/lib/lwp/sel.c index d6ec8403e..aa1e66a41 100644 --- a/src/lib/lwp/sel.c +++ b/src/lib/lwp/sel.c @@ -55,7 +55,10 @@ static fd_set LwpReadfds, LwpWritefds; /* Map file descriptor to thread sleeping in lwpSleepFd() */ static struct lwpProc **LwpFdwait; -/* Threads sleeping in lwpSleepUntil(), in no particular order */ +/* + * Threads sleeping until a wakeup time, in lwpSleepUntil() or + * lwpSleepFd(), in no particular order + */ static struct lwpQueue LwpDelayq; /* The thread executing lwpSelect() */ @@ -74,23 +77,25 @@ lwpInitSelect(struct lwpProc *proc) LwpSelProc = proc; } -void -lwpSleepFd(int fd, int mask) +int +lwpSleepFd(int fd, int mask, struct timeval *timeout) { lwpStatus(LwpCurrent, "sleeping on fd %d for %d", fd, mask); - if (CANT_HAPPEN(fd > FD_SETSIZE)) - return; + if (CANT_HAPPEN(fd > FD_SETSIZE)) { + errno = EBADF; + return -1; + } if (LwpFdwait[fd] != 0) { lwpStatus(LwpCurrent, "multiple sleeps attempted on file descriptor %d", fd); - return; + errno = EBADF; + return -1; } if (mask & LWP_FD_READ) FD_SET(fd, &LwpReadfds); if (mask & LWP_FD_WRITE) FD_SET(fd, &LwpWritefds); - LwpNfds++; if (LwpMaxfd == 0 && LwpDelayq.head == 0) { @@ -99,20 +104,39 @@ lwpSleepFd(int fd, int mask) lwpReady(LwpSelProc); } lwpStatus(LwpCurrent, "going to wait on fd %d", fd); + + if (timeout) { + LwpCurrent->runtime = time(NULL) + timeout->tv_sec + + (timeout->tv_usec > 0); + lwpAddTail(&LwpDelayq, LwpCurrent); + } else + LwpCurrent->runtime = (time_t)-1; + if (fd > LwpMaxfd) LwpMaxfd = fd; LwpFdwait[fd] = LwpCurrent; LwpCurrent->fd = fd; + LwpCurrent->fd_ready = 0; lwpReschedule(); + return LwpCurrent->fd_ready != 0; } +/* + * Wake up PROC if it is sleeping in lwpSleepFd(). + * Must be followed by lwpWakeupSleep() before the next lwpReschedule(). + */ static void lwpWakeupFd(struct lwpProc *proc) { - if (proc->fd < 0) + if (CANT_HAPPEN(proc->fd < 0 || proc->fd > LwpMaxfd)) return; lwpStatus(proc, "awakening; was sleeping on fd %d", proc->fd); + if (proc->runtime != (time_t)-1) { + /* is in LwpDelayq; leave the job to lwpWakeupSleep() */ + proc->runtime = 0; + return; + } FD_CLR(proc->fd, &LwpReadfds); FD_CLR(proc->fd, &LwpWritefds); LwpNfds--; @@ -121,6 +145,9 @@ lwpWakeupFd(struct lwpProc *proc) lwpReady(proc); } +/* + * Wake up threads in LwpDelayq whose time has come. + */ void lwpWakeupSleep(void) { @@ -134,7 +161,11 @@ lwpWakeupSleep(void) while (NULL != (proc = lwpGetFirst(&LwpDelayq))) { if (now >= proc->runtime) { lwpStatus(proc, "sleep done"); - lwpReady(proc); + proc->runtime = (time_t)-1; + if (proc->fd >= 0) + lwpWakeupFd(proc); + else + lwpReady(proc); } else { lwpAddTail(&save, proc); } @@ -148,10 +179,9 @@ lwpWakeup(struct lwpProc *proc) { if (proc->fd >= 0) lwpWakeupFd(proc); - else if (proc->runtime != (time_t)-1) { + else if (proc->runtime != (time_t)-1) proc->runtime = 0; - lwpWakeupSleep(); - } + lwpWakeupSleep(); } int @@ -230,7 +260,6 @@ lwpSelect(void *arg) continue; } - lwpWakeupSleep(); if (n > 0) { /* file descriptor activity */ for (fd = 0; fd <= LwpMaxfd; fd++) { @@ -238,16 +267,19 @@ lwpSelect(void *arg) continue; if (FD_ISSET(fd, &readmask)) { lwpStatus(LwpFdwait[fd], "input ready"); + LwpFdwait[fd]->fd_ready = 1; lwpWakeupFd(LwpFdwait[fd]); continue; } if (FD_ISSET(fd, &writemask)) { lwpStatus(LwpFdwait[fd], "output ready"); + LwpFdwait[fd]->fd_ready = 1; lwpWakeupFd(LwpFdwait[fd]); continue; } } } + lwpWakeupSleep(); lwpStatus(us, "fd dispatch completed"); lwpReady(LwpCurrent); lwpReschedule(); diff --git a/src/lib/player/accept.c b/src/lib/player/accept.c index 160ca6480..310684946 100644 --- a/src/lib/player/accept.c +++ b/src/lib/player/accept.c @@ -72,14 +72,18 @@ struct player * player_new(int s) { struct player *lp; + struct timeval idle_timeout; lp = malloc(sizeof(struct player)); if (!lp) return NULL; memset(lp, 0, sizeof(struct player)); + idle_timeout.tv_sec = max_idle * 60; + idle_timeout.tv_usec = 0 ; if (s >= 0) { /* real player, not dummy created by update and market update */ - lp->iop = io_open(s, IO_READ | IO_WRITE | IO_NBLOCK, IO_BUFSIZE); + lp->iop = io_open(s, IO_READ | IO_WRITE | IO_NBLOCK, IO_BUFSIZE, + idle_timeout); if (!lp->iop) { free(lp); return NULL; @@ -173,7 +177,7 @@ player_accept(void *unused) sap = malloc(player_addrlen); while (1) { - empth_select(s, EMPTH_FD_READ); + empth_select(s, EMPTH_FD_READ, NULL); len = player_addrlen; ns = accept(s, sap, &len); /* FIXME accept() can block on some systems (RST after select() reports ready) */ diff --git a/src/lib/player/login.c b/src/lib/player/login.c index 63d3c3f17..c2be754d3 100644 --- a/src/lib/player/login.c +++ b/src/lib/player/login.c @@ -79,16 +79,21 @@ player_login(void *ud) char space[128]; int ac; int cmd; + int res; player->proc = empth_self(); pr_id(player, C_INIT, "Empire server ready\n"); - while (!io_eof(player->iop) && !io_error(player->iop) - && player->state != PS_SHUTDOWN) { + while (player->state != PS_SHUTDOWN) { io_output(player->iop, IO_WAIT); if (io_gets(player->iop, buf, sizeof(buf)) < 0) { - io_input(player->iop, IO_WAIT); + res = io_input(player->iop, IO_WAIT); + if (res <= 0) { + if (res == 0 && !io_eof(player->iop)) + pr_id(player, C_DATA, "idle connection terminated\n"); + break; + } continue; } ac = parse(buf, space, player->argp, NULL, NULL, NULL); diff --git a/src/lib/player/recvclient.c b/src/lib/player/recvclient.c index 467deedf0..20e7d3148 100644 --- a/src/lib/player/recvclient.c +++ b/src/lib/player/recvclient.c @@ -84,10 +84,13 @@ recvclient(char *cmd, int size) if (player->aborted) break; - /* Await more input */ - io_input(player->iop, IO_WAIT); - if (io_error(player->iop) || io_eof(player->iop)) + if (io_input(player->iop, IO_WAIT) <= 0) { + if (!io_error(player->iop) && !io_eof(player->iop)) { + pr_flash(player, "idle connection terminated\n"); + player->state = PS_SHUTDOWN; + } player->aborted = player->eof = 1; + } } if (player->aborted) { diff --git a/src/lib/subs/show.c b/src/lib/subs/show.c index ac3125b8e..3c136f77b 100644 --- a/src/lib/subs/show.c +++ b/src/lib/subs/show.c @@ -37,10 +37,6 @@ #include -#if defined(_WIN32) -#include -#endif - #include #include "file.h" #include "game.h" diff --git a/src/lib/w32/service.c b/src/lib/w32/service.c index d0b6ae968..20ec1d5a4 100644 --- a/src/lib/w32/service.c +++ b/src/lib/w32/service.c @@ -33,7 +33,12 @@ #include -#include +/* + * For WIN32 in empthread.h, winsock2.h is included which interfers + * with including windows.h, use sys/socket.h instead to prevent a + * problem + */ +#include #include "service.h" #include "empthread.h" diff --git a/src/server/idle.c b/src/lib/w32/sys/time.h similarity index 60% rename from src/server/idle.c rename to src/lib/w32/sys/time.h index ba0f7672f..8173a5ab4 100644 --- a/src/server/idle.c +++ b/src/lib/w32/sys/time.h @@ -25,47 +25,16 @@ * * --- * - * idle.c: Stamps out idle players. Runs at low priority + * sys/time.h: POSIX emulation for WIN32 * * Known contributors to this file: - * Dave Pare, 1994 + * Ron Koenderink, 2008 */ -#include +#ifndef SYS_TIME_H +#define SYS_TIME_H -#include -#include "empthread.h" -#include "optlist.h" -#include "player.h" -#include "prototypes.h" -#include "server.h" +/* include winsock2.h thru sys/socket.h to get struct timeval */ +#include "sys/socket.h" -/*ARGSUSED*/ -void -player_kill_idle(void *unused) -{ - struct player *p; - time_t now; - - time(&now); - while (1) { - empth_sleep(now + 60); - time(&now); - for (p = player_next(0); p != 0; p = player_next(p)) { - if (p->state == PS_SHUTDOWN) { - /* - * Player thread hung or just aborted by update or - * shutdown, we can't tell. - */ - continue; - } - if (p->curup + max_idle * 60 < now) { - p->state = PS_SHUTDOWN; - p->aborted++; - pr_flash(p, "idle connection terminated\n"); - empth_wakeup(p->proc); - } - } - } - /*NOTREACHED*/ -} +#endif /* SYS_TIME_H */ diff --git a/src/server/main.c b/src/server/main.c index 2bcdd2d9b..1f66f9881 100644 --- a/src/server/main.c +++ b/src/server/main.c @@ -382,7 +382,6 @@ start_server(int flags) exit(1); empth_create(player_accept, 50 * 1024, flags, "AcceptPlayers", 0); - empth_create(player_kill_idle, 50 * 1024, flags, "KillIdle", 0); market_init(); update_init(); -- 2.43.0