Reimplement max_idle without a separate thread

Remove the KillIdle thread.  Add timeout to struct iop, initialized in
io_open().  Obey it in io_input() by passing it to empth_select().  If
empth_select() times out, report that back through io_input() to
recvclient() and player_login().  If player_login() receives a timeout
indication, print a message and terminate the session.  If
recvclient() receives a timeout indication, flash a message to the
player and initiate a shut down the player's session.

Create WIN32 sys/time.h to define struct timeval.  This creates some
conflicts with WIN32 windows.h definitions.  Including windows.h in
show.c and info.c creates conflicts, so remove that.  Modify service.c
to include sys/socket.h instead of windows.h to remove the conflict
with sys/time.h.
This commit is contained in:
Ron Koenderink 2009-02-01 06:22:26 -06:00 committed by Markus Armbruster
parent a7ee69d112
commit 08b9455682
17 changed files with 181 additions and 133 deletions

View file

@ -33,6 +33,7 @@
#ifndef EMPIO_H
#define EMPIO_H
#include <sys/time.h>
#define IO_READ 0x1
#define IO_WRITE 0x2
@ -46,7 +47,7 @@
#define IO_NOWAIT 0
#define IO_WAIT 1
extern struct iop *io_open(int, int, int);
extern struct iop *io_open(int, int, int, struct timeval);
extern void io_init(void);
extern int io_noblocking(struct iop *, int);
extern void io_close(struct iop *);

View file

@ -48,6 +48,7 @@
#ifndef EMPTHREAD_H
#define EMPTHREAD_H
#include <sys/time.h>
#include <time.h>
#ifdef EMPTH_LWP
@ -168,10 +169,13 @@ void empth_terminate(empth_t *thread);
* If FLAGS & EMPTH_FD_READ, wake up if FD is ready for input.
* If FLAGS & EMPTH_FD_WRITE, wake up if FD is ready for output.
* At most one thread may sleep on the same file descriptor.
* TIMEOUT, if non-null, limits the sleep time.
* Return one when the FD is ready, zero on timeout, -1 on error with
* errno set.
* Note: Currently, Empire sleeps only on network I/O, i.e. FD is a
* socket. Implementations should not rely on that.
*/
void empth_select(int fd, int flags);
int empth_select(int fd, int flags, struct timeval *timeout);
/*
* Awaken THREAD if it is sleeping in empth_select() or empth_sleep().

View file

@ -45,7 +45,7 @@ struct lwpProc *lwpCreate(int prio, void (*)(void *), int size,
void lwpExit(void);
void lwpTerminate(struct lwpProc * p);
void lwpYield(void);
void lwpSleepFd(int fd, int flags);
int lwpSleepFd(int fd, int flags, struct timeval *timeout);
int lwpSleepUntil(time_t until);
void lwpWakeup(struct lwpProc *);
int lwpSigWait(sigset_t *set, int *sig);

View file

@ -43,8 +43,6 @@
#include <stdio.h>
#if !defined(_WIN32)
#include <dirent.h>
#else
#include <windows.h>
#endif
#include "commands.h"
#include "optlist.h"

View file

@ -61,6 +61,7 @@ struct iop {
struct ioqueue *output;
int flags;
int bufsize;
struct timeval input_timeout;
};
void
@ -69,7 +70,7 @@ io_init(void)
}
struct iop *
io_open(int fd, int flags, int bufsize)
io_open(int fd, int flags, int bufsize, struct timeval timeout)
{
struct iop *iop;
@ -83,6 +84,7 @@ io_open(int fd, int flags, int bufsize)
iop->input = 0;
iop->output = 0;
iop->flags = 0;
iop->input_timeout = timeout;
iop->bufsize = bufsize;
if ((flags & IO_READ) && (flags & IO_NEWSOCK) == 0)
iop->input = ioq_create(bufsize);
@ -106,29 +108,45 @@ io_close(struct iop *iop)
free(iop);
}
/*
* Return number of bytes read on success, zero on timeout or EOF, -1
* on error, with errno set appropriately. In particular, return -1
* with errno set to EAGAIN or EWOULDBLOCK when no data is available
* for non-blocking input (WAITFORINPUT false). Use io_eof() to
* distinguish timeout from EOF.
*/
int
io_input(struct iop *iop, int waitforinput)
{
char buf[IO_BUFSIZE];
int cc;
int res;
struct timeval timeout = iop->input_timeout;
/* Not a read IOP */
if ((iop->flags & IO_READ) == 0)
if ((iop->flags & IO_READ) == 0) {
errno = EBADF;
return -1;
}
/* IOP is markes as in error. */
if (iop->flags & IO_ERROR)
if (iop->flags & IO_ERROR) {
errno = EBADF;
return -1;
}
/* Wait for the file to have input. */
if (waitforinput) {
empth_select(iop->fd, EMPTH_FD_READ);
res = empth_select(iop->fd, EMPTH_FD_READ, &timeout);
if (res < 0) {
iop->flags |= IO_ERROR;
return -1;
} else if (res == 0)
return 0;
}
/* Do the actual read. */
cc = read(iop->fd, buf, sizeof(buf));
if (cc < 0) {
/* would block, so nothing to read. */
if (errno == EAGAIN || errno == EWOULDBLOCK)
return 0;
if (errno != EAGAIN && errno != EWOULDBLOCK)
/* Some form of file error occurred... */
iop->flags |= IO_ERROR;
return -1;
@ -190,7 +208,7 @@ io_output(struct iop *iop, int waitforoutput)
if (waitforoutput != IO_NOWAIT) {
/* This waits for the file to be ready for writing, */
/* and lets other threads run. */
empth_select(iop->fd, EMPTH_FD_WRITE);
empth_select(iop->fd, EMPTH_FD_WRITE, NULL);
}
/* Do the actual write. */
@ -272,7 +290,7 @@ io_output_all(struct iop *iop)
* a malicous player could delay the update indefinitely
*/
while ((n = io_output(iop, IO_NOWAIT)) > 0 && !play_wrlock_wanted)
empth_select(iop->fd, EMPTH_FD_WRITE);
empth_select(iop->fd, EMPTH_FD_WRITE, NULL);
return n;
}

View file

@ -104,10 +104,10 @@ empth_terminate(empth_t *a)
lwpTerminate(a);
}
void
empth_select(int fd, int flags)
int
empth_select(int fd, int flags, struct timeval *timeout)
{
lwpSleepFd(fd, flags);
return lwpSleepFd(fd, flags, timeout);
}
void

View file

@ -560,12 +560,14 @@ empth_terminate(empth_t *pThread)
*
* This would be one of the main functions used within gen\io.c
*/
void
empth_select(int fd, int flags)
int
empth_select(int fd, int flags, struct timeval *timeout)
{
int handle;
WSAEVENT hEventObject[2];
DWORD result, msec;
empth_t *pThread = TlsGetValue(dwTLSIndex);
int res;
loc_debug("%s select on %d",
flags == EMPTH_FD_READ ? "read" : "write", fd);
@ -586,13 +588,31 @@ empth_select(int fd, int flags)
empth_exit();
}
WSAWaitForMultipleEvents(2, hEventObject, FALSE, WSA_INFINITE, FALSE);
if (timeout)
msec = timeout->tv_sec * 1000L + timeout->tv_usec / 1000L;
else
msec = WSA_INFINITE;
result = WSAWaitForMultipleEvents(2, hEventObject, FALSE, msec,
FALSE);
switch (result) {
case WSA_WAIT_TIMEOUT:
res = 0;
break;
case WSA_WAIT_FAILED:
errno = WSAGetLastError();
res = -1;
break;
default:
res = 1;
}
WSAEventSelect(handle, hEventObject[0], 0);
WSACloseEvent(hEventObject[0]);
loc_RunThisThread(NULL);
return res;
}
/************************

View file

@ -289,21 +289,17 @@ empth_terminate(empth_t *a)
pthread_kill(a->id, SIGALRM);
}
void
empth_select(int fd, int flags)
int
empth_select(int fd, int flags, struct timeval *tv)
{
fd_set readmask;
fd_set writemask;
struct timeval tv;
int n;
int res = 0;
pthread_mutex_unlock(&mtx_ctxsw);
empth_status("%s select on %d",
flags == EMPTH_FD_READ ? "read" : "write", fd);
while (1) {
tv.tv_sec = 1000000;
tv.tv_usec = 0;
FD_ZERO(&readmask);
FD_ZERO(&writemask);
@ -316,38 +312,35 @@ empth_select(int fd, int flags)
FD_SET(fd, &writemask);
break;
default:
logerror("bad flag %d passed to empth_select", flags);
empth_exit();
CANT_REACH();
errno = EINVAL;
res = -1;
goto done;
}
n = select(fd + 1, &readmask, &writemask, (fd_set *) 0, &tv);
n = select(fd + 1, &readmask, &writemask, (fd_set *) 0, tv);
if (n < 0) {
if (errno == EINTR) {
/* go handle the signal */
if (errno == EINTR) /* go handle the signal */
empth_status("select broken by signal");
goto done;
return;
}
/* strange but we dont get EINTR on select broken by signal */
else
empth_status("select failed (%s)", strerror(errno));
goto done;
return;
}
if (flags == EMPTH_FD_READ && FD_ISSET(fd, &readmask)) {
res = -1;
} else if (n == 0) {
empth_status("select timed out");
res = 0;
} else if (flags == EMPTH_FD_READ && FD_ISSET(fd, &readmask)) {
empth_status("input ready");
break;
}
if (flags == EMPTH_FD_WRITE && FD_ISSET(fd, &writemask)) {
res = 1;
} else if (flags == EMPTH_FD_WRITE && FD_ISSET(fd, &writemask)) {
empth_status("output ready");
break;
}
res = 1;
}
done:
pthread_mutex_lock(&mtx_ctxsw);
empth_restorectx();
return res;
}
static void

View file

@ -52,6 +52,7 @@ struct lwpProc {
int pri; /* which scheduling queue we're on */
time_t runtime; /* time at which process is restarted */
int fd; /* fd we're blocking on */
int fd_ready; /* woken up because fd was ready */
int argc; /* initial arguments */
char **argv;
void *ud; /* user data */

View file

@ -55,7 +55,10 @@ static fd_set LwpReadfds, LwpWritefds;
/* Map file descriptor to thread sleeping in lwpSleepFd() */
static struct lwpProc **LwpFdwait;
/* Threads sleeping in lwpSleepUntil(), in no particular order */
/*
* Threads sleeping until a wakeup time, in lwpSleepUntil() or
* lwpSleepFd(), in no particular order
*/
static struct lwpQueue LwpDelayq;
/* The thread executing lwpSelect() */
@ -74,23 +77,25 @@ lwpInitSelect(struct lwpProc *proc)
LwpSelProc = proc;
}
void
lwpSleepFd(int fd, int mask)
int
lwpSleepFd(int fd, int mask, struct timeval *timeout)
{
lwpStatus(LwpCurrent, "sleeping on fd %d for %d", fd, mask);
if (CANT_HAPPEN(fd > FD_SETSIZE))
return;
if (CANT_HAPPEN(fd > FD_SETSIZE)) {
errno = EBADF;
return -1;
}
if (LwpFdwait[fd] != 0) {
lwpStatus(LwpCurrent,
"multiple sleeps attempted on file descriptor %d", fd);
return;
errno = EBADF;
return -1;
}
if (mask & LWP_FD_READ)
FD_SET(fd, &LwpReadfds);
if (mask & LWP_FD_WRITE)
FD_SET(fd, &LwpWritefds);
LwpNfds++;
if (LwpMaxfd == 0 && LwpDelayq.head == 0) {
@ -99,20 +104,39 @@ lwpSleepFd(int fd, int mask)
lwpReady(LwpSelProc);
}
lwpStatus(LwpCurrent, "going to wait on fd %d", fd);
if (timeout) {
LwpCurrent->runtime = time(NULL) + timeout->tv_sec +
(timeout->tv_usec > 0);
lwpAddTail(&LwpDelayq, LwpCurrent);
} else
LwpCurrent->runtime = (time_t)-1;
if (fd > LwpMaxfd)
LwpMaxfd = fd;
LwpFdwait[fd] = LwpCurrent;
LwpCurrent->fd = fd;
LwpCurrent->fd_ready = 0;
lwpReschedule();
return LwpCurrent->fd_ready != 0;
}
/*
* Wake up PROC if it is sleeping in lwpSleepFd().
* Must be followed by lwpWakeupSleep() before the next lwpReschedule().
*/
static void
lwpWakeupFd(struct lwpProc *proc)
{
if (proc->fd < 0)
if (CANT_HAPPEN(proc->fd < 0 || proc->fd > LwpMaxfd))
return;
lwpStatus(proc, "awakening; was sleeping on fd %d", proc->fd);
if (proc->runtime != (time_t)-1) {
/* is in LwpDelayq; leave the job to lwpWakeupSleep() */
proc->runtime = 0;
return;
}
FD_CLR(proc->fd, &LwpReadfds);
FD_CLR(proc->fd, &LwpWritefds);
LwpNfds--;
@ -121,6 +145,9 @@ lwpWakeupFd(struct lwpProc *proc)
lwpReady(proc);
}
/*
* Wake up threads in LwpDelayq whose time has come.
*/
void
lwpWakeupSleep(void)
{
@ -134,6 +161,10 @@ lwpWakeupSleep(void)
while (NULL != (proc = lwpGetFirst(&LwpDelayq))) {
if (now >= proc->runtime) {
lwpStatus(proc, "sleep done");
proc->runtime = (time_t)-1;
if (proc->fd >= 0)
lwpWakeupFd(proc);
else
lwpReady(proc);
} else {
lwpAddTail(&save, proc);
@ -148,11 +179,10 @@ lwpWakeup(struct lwpProc *proc)
{
if (proc->fd >= 0)
lwpWakeupFd(proc);
else if (proc->runtime != (time_t)-1) {
else if (proc->runtime != (time_t)-1)
proc->runtime = 0;
lwpWakeupSleep();
}
}
int
lwpSleepUntil(time_t until)
@ -230,7 +260,6 @@ lwpSelect(void *arg)
continue;
}
lwpWakeupSleep();
if (n > 0) {
/* file descriptor activity */
for (fd = 0; fd <= LwpMaxfd; fd++) {
@ -238,16 +267,19 @@ lwpSelect(void *arg)
continue;
if (FD_ISSET(fd, &readmask)) {
lwpStatus(LwpFdwait[fd], "input ready");
LwpFdwait[fd]->fd_ready = 1;
lwpWakeupFd(LwpFdwait[fd]);
continue;
}
if (FD_ISSET(fd, &writemask)) {
lwpStatus(LwpFdwait[fd], "output ready");
LwpFdwait[fd]->fd_ready = 1;
lwpWakeupFd(LwpFdwait[fd]);
continue;
}
}
}
lwpWakeupSleep();
lwpStatus(us, "fd dispatch completed");
lwpReady(LwpCurrent);
lwpReschedule();

View file

@ -72,14 +72,18 @@ struct player *
player_new(int s)
{
struct player *lp;
struct timeval idle_timeout;
lp = malloc(sizeof(struct player));
if (!lp)
return NULL;
memset(lp, 0, sizeof(struct player));
idle_timeout.tv_sec = max_idle * 60;
idle_timeout.tv_usec = 0 ;
if (s >= 0) {
/* real player, not dummy created by update and market update */
lp->iop = io_open(s, IO_READ | IO_WRITE | IO_NBLOCK, IO_BUFSIZE);
lp->iop = io_open(s, IO_READ | IO_WRITE | IO_NBLOCK, IO_BUFSIZE,
idle_timeout);
if (!lp->iop) {
free(lp);
return NULL;
@ -173,7 +177,7 @@ player_accept(void *unused)
sap = malloc(player_addrlen);
while (1) {
empth_select(s, EMPTH_FD_READ);
empth_select(s, EMPTH_FD_READ, NULL);
len = player_addrlen;
ns = accept(s, sap, &len);
/* FIXME accept() can block on some systems (RST after select() reports ready) */

View file

@ -79,16 +79,21 @@ player_login(void *ud)
char space[128];
int ac;
int cmd;
int res;
player->proc = empth_self();
pr_id(player, C_INIT, "Empire server ready\n");
while (!io_eof(player->iop) && !io_error(player->iop)
&& player->state != PS_SHUTDOWN) {
while (player->state != PS_SHUTDOWN) {
io_output(player->iop, IO_WAIT);
if (io_gets(player->iop, buf, sizeof(buf)) < 0) {
io_input(player->iop, IO_WAIT);
res = io_input(player->iop, IO_WAIT);
if (res <= 0) {
if (res == 0 && !io_eof(player->iop))
pr_id(player, C_DATA, "idle connection terminated\n");
break;
}
continue;
}
ac = parse(buf, space, player->argp, NULL, NULL, NULL);

View file

@ -84,11 +84,14 @@ recvclient(char *cmd, int size)
if (player->aborted)
break;
/* Await more input */
io_input(player->iop, IO_WAIT);
if (io_error(player->iop) || io_eof(player->iop))
if (io_input(player->iop, IO_WAIT) <= 0) {
if (!io_error(player->iop) && !io_eof(player->iop)) {
pr_flash(player, "idle connection terminated\n");
player->state = PS_SHUTDOWN;
}
player->aborted = player->eof = 1;
}
}
if (player->aborted) {
player->recvfail++;

View file

@ -37,10 +37,6 @@
#include <config.h>
#if defined(_WIN32)
#include <Windows.h>
#endif
#include <math.h>
#include "file.h"
#include "game.h"

View file

@ -33,7 +33,12 @@
#include <config.h>
#include <windows.h>
/*
* For WIN32 in empthread.h, winsock2.h is included which interfers
* with including windows.h, use sys/socket.h instead to prevent a
* problem
*/
#include <sys/socket.h>
#include "service.h"
#include "empthread.h"

View file

@ -25,47 +25,16 @@
*
* ---
*
* idle.c: Stamps out idle players. Runs at low priority
* sys/time.h: POSIX emulation for WIN32
*
* Known contributors to this file:
* Dave Pare, 1994
* Ron Koenderink, 2008
*/
#include <config.h>
#ifndef SYS_TIME_H
#define SYS_TIME_H
#include <time.h>
#include "empthread.h"
#include "optlist.h"
#include "player.h"
#include "prototypes.h"
#include "server.h"
/* include winsock2.h thru sys/socket.h to get struct timeval */
#include "sys/socket.h"
/*ARGSUSED*/
void
player_kill_idle(void *unused)
{
struct player *p;
time_t now;
time(&now);
while (1) {
empth_sleep(now + 60);
time(&now);
for (p = player_next(0); p != 0; p = player_next(p)) {
if (p->state == PS_SHUTDOWN) {
/*
* Player thread hung or just aborted by update or
* shutdown, we can't tell.
*/
continue;
}
if (p->curup + max_idle * 60 < now) {
p->state = PS_SHUTDOWN;
p->aborted++;
pr_flash(p, "idle connection terminated\n");
empth_wakeup(p->proc);
}
}
}
/*NOTREACHED*/
}
#endif /* SYS_TIME_H */

View file

@ -382,7 +382,6 @@ start_server(int flags)
exit(1);
empth_create(player_accept, 50 * 1024, flags, "AcceptPlayers", 0);
empth_create(player_kill_idle, 50 * 1024, flags, "KillIdle", 0);
market_init();
update_init();