Move queue flush out of io.c

Player threads may only sleep under certain conditions.  In
particular, they must not sleep while a command is being aborted by
the update or shutdown.

io.c should not know about that.  Yet io_output_all() does, because it
needs to give up when update or shutdown interrupt it.  The function
was introduced in Empire 2, but it didn't give up then.  Fixed in
commit a7fa7dee, v4.2.22.  The fix dragged unwanted knowledge of
command abortion into io.c.

To clean up this mess, io_output_all() has to go.

First user is io_write().  io_write() automatically flushes the queue.
In wait-mode, it calls io_output_all() when the queue is longer than
the bufsize, to attempt flushing the queue completely.  In
no-wait-mode, it calls io_output() every bufsize bytes.  Except the
test for that is screwy, so it actually misses some of the flush
conditions.

The automatic flush makes io_write() differ from io_gets(), which is
ugly.  It wasn't present in BSD Empire 1.1.  Remove it again, dropping
io_write()'s last argument.

Flush the queue in its callers pr_player() and upr_player() instead.
Provide new io_output_if_queue_long() for them.  Requires new struct
iop member last_out to keep track of queue growth.  pr_player() and
upr_player() call repeatedly until it makes no more progress.  This
flushes a bit less eagerly in wait-mode, and a bit more eagerly in
non-wait mode.

Second user is recvclient().  It needs to flush the queue before
potentially sleeping in io_input().  Do that with a simple loop around
io_output().  No functional change there.
This commit is contained in:
Markus Armbruster 2009-04-27 23:23:30 +02:00
parent 7fd5b86990
commit 1b4496253d
4 changed files with 53 additions and 39 deletions

View file

@ -49,10 +49,10 @@ extern int io_input(struct iop *, int);
extern int io_inputwaiting(struct iop *); extern int io_inputwaiting(struct iop *);
extern int io_outputwaiting(struct iop *); extern int io_outputwaiting(struct iop *);
extern int io_output(struct iop *, int); extern int io_output(struct iop *, int);
extern int io_output_if_queue_long(struct iop *, int);
extern int io_peek(struct iop *, char *, int); extern int io_peek(struct iop *, char *, int);
extern int io_read(struct iop *, char *, int); extern int io_read(struct iop *, char *, int);
extern int io_write(struct iop *, char *, int, int); extern int io_write(struct iop *, char *, int);
extern int io_output_all(struct iop *);
extern int io_gets(struct iop *, char *, int); extern int io_gets(struct iop *, char *, int);
extern int io_puts(struct iop *, char *); extern int io_puts(struct iop *, char *);
extern int io_shutdown(struct iop *, int); extern int io_shutdown(struct iop *, int);

View file

@ -54,7 +54,6 @@
#include "ioqueue.h" #include "ioqueue.h"
#include "misc.h" #include "misc.h"
#include "queue.h" #include "queue.h"
#include "server.h"
struct iop { struct iop {
int fd; int fd;
@ -62,6 +61,7 @@ struct iop {
struct ioqueue *output; struct ioqueue *output;
int flags; int flags;
int bufsize; int bufsize;
int last_out;
struct timeval input_timeout; struct timeval input_timeout;
}; };
@ -94,6 +94,7 @@ io_open(int fd, int flags, int bufsize, struct timeval timeout)
iop->input = NULL; iop->input = NULL;
iop->output = NULL; iop->output = NULL;
iop->flags = flags; iop->flags = flags;
iop->last_out = 0;
iop->bufsize = bufsize; iop->bufsize = bufsize;
iop->input_timeout = timeout; iop->input_timeout = timeout;
if (flags & IO_READ) if (flags & IO_READ)
@ -225,9 +226,32 @@ io_output(struct iop *iop, int wait)
} }
ioq_dequeue(iop->output, cc); ioq_dequeue(iop->output, cc);
iop->last_out = ioq_qsize(iop->output);
return cc; return cc;
} }
/*
* Write output queued in IOP if enough have been enqueued.
* Write if at least one buffer has been filled since the last write.
* If WAIT, writing may put the thread to sleep.
* Return number of bytes written on success, -1 on error.
* In particular, return zero when nothing was written because the
* queue was not long, or the write slept and got woken up (only if
* WAIT), or the write refused to sleep (only if !WAIT).
*/
int
io_output_if_queue_long(struct iop *iop, int wait)
{
int len = ioq_qsize(iop->output);
if (CANT_HAPPEN(iop->last_out > len))
iop->last_out = 0;
if (len - iop->last_out < iop->bufsize)
return 0;
return io_output(iop, wait);
}
int int
io_peek(struct iop *iop, char *buf, int nbytes) io_peek(struct iop *iop, char *buf, int nbytes)
{ {
@ -250,41 +274,14 @@ io_read(struct iop *iop, char *buf, int nbytes)
} }
int int
io_write(struct iop *iop, char *buf, int nbytes, int wait) io_write(struct iop *iop, char *buf, int nbytes)
{ {
int len;
if ((iop->flags & IO_WRITE) == 0) if ((iop->flags & IO_WRITE) == 0)
return -1; return -1;
ioq_append(iop->output, buf, nbytes); ioq_append(iop->output, buf, nbytes);
len = ioq_qsize(iop->output);
if (len > iop->bufsize) {
if (wait) {
io_output_all(iop);
} else {
/* only try a write every BUFSIZE characters */
if (((len - nbytes) % iop->bufsize) < (len % iop->bufsize))
io_output(iop, 0);
}
}
return nbytes; return nbytes;
} }
int
io_output_all(struct iop *iop)
{
int n;
/*
* Mustn't block a player thread while update is pending, or else
* a malicous player could delay the update indefinitely
*/
while ((n = io_output(iop, 0)) > 0 && !play_wrlock_wanted)
empth_select(iop->fd, EMPTH_FD_WRITE, NULL);
return n;
}
int int
io_gets(struct iop *iop, char *buf, int nbytes) io_gets(struct iop *iop, char *buf, int nbytes)
{ {
@ -314,6 +311,7 @@ io_shutdown(struct iop *iop, int flags)
if (flags & IO_WRITE) { if (flags & IO_WRITE) {
shutdown(iop->fd, 1); shutdown(iop->fd, 1);
ioq_drain(iop->output); ioq_drain(iop->output);
iop->last_out = 0;
} }
return 0; return 0;
} }

View file

@ -39,6 +39,7 @@
#include "journal.h" #include "journal.h"
#include "player.h" #include "player.h"
#include "prototypes.h" #include "prototypes.h"
#include "server.h"
/* /*
* Receive a line of input from the current player. * Receive a line of input from the current player.
@ -75,8 +76,11 @@ recvclient(char *cmd, int size)
break; break;
} }
/* Make sure player sees prompt */ /*
io_output_all(player->iop); * Flush all queued output before potentially sleeping in
* io_input(), to make sure player sees the prompt.
*/
while (io_output(player->iop, !play_wrlock_wanted) > 0) ;
/* /*
* If io_output_all() blocked and got unblocked by command * If io_output_all() blocked and got unblocked by command

View file

@ -233,9 +233,7 @@ pr_player(struct player *pl, int id, char *buf)
p = strchr(bp, '\n'); p = strchr(bp, '\n');
if (p != NULL) { if (p != NULL) {
len = (p - bp) + 1; len = (p - bp) + 1;
io_write(pl->iop, bp, len, io_write(pl->iop, bp, len);
player == pl
&& !(pl->command && (pl->command->c_flags & C_MOD)));
bp += len; bp += len;
pl->curid = -1; pl->curid = -1;
} else { } else {
@ -243,6 +241,14 @@ pr_player(struct player *pl, int id, char *buf)
bp += len; bp += len;
} }
} }
if (player == pl) {
while (io_output_if_queue_long(pl->iop,
!play_wrlock_wanted
&& !(pl->command && (pl->command->c_flags & C_MOD)))
> 0)
;
}
} }
/* /*
@ -286,15 +292,21 @@ upr_player(struct player *pl, int id, char *buf)
} }
} }
if (ch == '\n') { if (ch == '\n') {
io_write(pl->iop, &ch, 1, io_write(pl->iop, &ch, 1);
player == pl
&& !(pl->command && (pl->command->c_flags & C_MOD)));
pl->curid = -1; pl->curid = -1;
} else { } else {
printbuf[0] = ch; printbuf[0] = ch;
io_puts(pl->iop, printbuf); io_puts(pl->iop, printbuf);
} }
} }
if (player == pl) {
while (io_output_if_queue_long(pl->iop,
!play_wrlock_wanted
&& !(pl->command && (pl->command->c_flags & C_MOD)))
> 0)
;
}
} }
/* /*