ref: ad9748422e740128174f51090d7ca776681569ac
parent: 0f9c172712e834b8f45630a9665f2e271161692c
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sun Mar 24 16:37:01 EDT 2024
kernel: Fix qio flow control There is a pathological case with qio that triggers a dead-lock for single threaded servers and multiple requesters that can be reproduced like this: int pfd[2]; void main(int argc, char *argv[]) { char buf[0x10000]; int i, n; ARGBEGIN { } ARGEND; if(pipe(pfd) < 0) sysfatal("pipe: %r"); if(fork() == 0){ while((n = read(pfd[0], buf, sizeof(buf))) > 0){ sleep(10); write(pfd[0], buf, n); } exits(nil); } for(i = 0; i < PROCS; i++){ if(fork() == 0){ buf[0] = i; for(;;){ write(pfd[1], buf, sizeof(buf)); if(read(pfd[1], buf, sizeof(buf)) <= 0) break; print("%d %d\n", i, buf[0]); } exits(nil); } } waitpid(); } The problem is how the reader decides to wake up the writer, which was based only on the global queue length, but it should really depend on the local queuing position of the writers and their distance to the reader position. Otherwise, a writer can be blocked even tho its message has already been consumed by the reader. When the reader tries to reply, it can get blocked himself on writing the reply. The new qio code basically makes sure that writers get unblocked in order avoiding the issue. The qio block statistics and qwindow() are gone now as they where mostly unused.
--- a/sys/lib/acid/kernel
+++ b/sys/lib/acid/kernel
@@ -55,15 +55,6 @@
}
}
-defn qiostats() {
- print ("padblockcnt=", *padblockcnt\D, "\n");
- print ("concatblockcnt=", *concatblockcnt\D, "\n");
- print ("pullupblockcnt=", *pullupblockcnt\D, "\n");
- print ("copyblockcnt=", *copyblockcnt\D, "\n");
- print ("consumecnt=", *consumecnt\D, "\n");
- print ("producecnt=", *producecnt\D, "\n");
-}
-
// dump channels
defn chan(c) {
local d, q;
--- a/sys/src/9/port/devpipe.c
+++ b/sys/src/9/port/devpipe.c
@@ -370,8 +370,6 @@
n = convM2D(dp, n, &d, nil);
if(n == 0)
error(Eshortstat);
- if(d.length < 1 || d.length > conf.pipeqsize)
- error(Ebadarg);
p = c->aux;
switch(NETTYPE(c->qid.path)){
@@ -379,6 +377,8 @@
error(Eperm);
case Qdata0:
case Qdata1:
+ if((uvlong)d.length > conf.pipeqsize)
+ error(Ebadarg);
qsetlimit(p->q[0], d.length);
qsetlimit(p->q[1], d.length);
break;
--- a/sys/src/9/port/portfns.h
+++ b/sys/src/9/port/portfns.h
@@ -301,7 +301,6 @@
void qreopen(Queue*);
void qsetlimit(Queue*, int);
void qunlock(QLock*);
-int qwindow(Queue*);
int qwrite(Queue*, void*, int);
void qnoblock(Queue*, int);
void randominit(void);
--- a/sys/src/9/port/qio.c
+++ b/sys/src/9/port/qio.c
@@ -5,41 +5,34 @@
#include "fns.h"
#include "../port/error.h"
-static ulong padblockcnt;
-static ulong concatblockcnt;
-static ulong pullupblockcnt;
-static ulong copyblockcnt;
-static ulong consumecnt;
-static ulong producecnt;
+#define QDEBUG if(1)
-#define QDEBUG if(0)
-
/*
* IO queues
*/
typedef struct Queue Queue;
-
struct Queue
{
Lock;
+ int state;
+ int dlen; /* data length in bytes */
+ uint rp, wp; /* read/write position (counting BALLOC() bytes) */
+ int limit; /* max BALLOC() bytes in queue */
+ int inilim; /* initial limit */
+ uchar noblock; /* true if writes return immediately when q full */
+ uchar eof; /* number of eofs read by user */
+
Block* bfirst; /* buffer */
Block* blast;
- int len; /* bytes allocated to queue */
- int dlen; /* data bytes in queue */
- int limit; /* max bytes in queue */
- int inilim; /* initial limit */
- int state;
- int noblock; /* true if writes return immediately when q full */
- int eof; /* number of eofs read by user */
-
+ void* arg; /* argument to kick and bypass */
void (*kick)(void*); /* restart output */
void (*bypass)(void*, Block*); /* bypass queue altogether */
- void* arg; /* argument to kick */
QLock rlock; /* mutex for reading processes */
Rendez rr; /* process waiting to read */
+
QLock wlock; /* mutex for writing processes */
Rendez wr; /* process waiting to write */
@@ -69,44 +62,6 @@
}
/*
- * pad a block to the front (or the back if size is negative)
- */
-Block*
-padblock(Block *bp, int size)
-{
- int n;
- Block *nbp;
-
- QDEBUG checkb(bp, "padblock 0");
- if(size >= 0){
- if(bp->rp - bp->base >= size){
- bp->rp -= size;
- return bp;
- }
- n = BLEN(bp);
- nbp = allocb(size+n);
- nbp->rp += size;
- nbp->wp = nbp->rp;
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- nbp->rp -= size;
- } else {
- size = -size;
- if(bp->lim - bp->wp >= size)
- return bp;
- n = BLEN(bp);
- nbp = allocb(n+size);
- memmove(nbp->wp, bp->rp, n);
- nbp->wp += n;
- }
- nbp->next = bp->next;
- freeb(bp);
- padblockcnt++;
- QDEBUG checkb(nbp, "padblock 1");
- return nbp;
-}
-
-/*
* return count of bytes in a string of blocks
*/
int
@@ -123,19 +78,33 @@
}
/*
- * return count of space in blocks
+ * copy the contents of a string of blocks into
+ * memory from an offset. blocklist kept unchanged.
+ * return number of copied bytes.
*/
-int
-blockalloclen(Block *bp)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
{
- int len;
+ ulong m, r;
- len = 0;
- while(bp != nil) {
- len += BALLOC(bp);
- bp = bp->next;
+ r = 0;
+ while(n > 0 && b != nil){
+ m = BLEN(b);
+ if(o >= m)
+ o -= m;
+ else {
+ m -= o;
+ if(n < m)
+ m = n;
+ memmove(p, b->rp + o, m);
+ p += m;
+ r += m;
+ n -= m;
+ o = 0;
+ }
+ b = b->next;
}
- return len;
+ return r;
}
/*
@@ -150,7 +119,6 @@
if(bp->next == nil)
return bp;
len = blocklen(bp);
- concatblockcnt += len;
return pullupblock(bp, len);
}
@@ -163,6 +131,8 @@
Block *nbp;
int i;
+ assert(n >= 0);
+
/*
* this should almost always be true, it's
* just to avoid every caller checking.
@@ -185,7 +155,6 @@
*/
n -= BLEN(bp);
while((nbp = bp->next) != nil){
- pullupblockcnt++;
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
@@ -224,6 +193,8 @@
{
Block *b;
+ assert(n >= 0);
+
if(BLEN(q->bfirst) >= n)
return q->bfirst;
q->bfirst = pullupblock(q->bfirst, n);
@@ -242,6 +213,9 @@
ulong l;
Block *nb, *startb;
+ assert(len >= 0);
+ assert(offset >= 0);
+
QDEBUG checkb(bp, "trimblock 1");
l = blocklen(bp);
if(offset == 0 && len == l)
@@ -278,6 +252,43 @@
}
/*
+ * pad a block to the front (or the back if size is negative)
+ */
+Block*
+padblock(Block *bp, int size)
+{
+ int n;
+ Block *nbp;
+
+ QDEBUG checkb(bp, "padblock 0");
+ if(size >= 0){
+ if(bp->rp - bp->base >= size){
+ bp->rp -= size;
+ return bp;
+ }
+ n = BLEN(bp);
+ nbp = allocb(size+n);
+ nbp->rp += size;
+ nbp->wp = nbp->rp;
+ memmove(nbp->wp, bp->rp, n);
+ nbp->wp += n;
+ nbp->rp -= size;
+ } else {
+ size = -size;
+ if(bp->lim - bp->wp >= size)
+ return bp;
+ n = BLEN(bp);
+ nbp = allocb(n+size);
+ memmove(nbp->wp, bp->rp, n);
+ nbp->wp += n;
+ }
+ nbp->next = bp->next;
+ freeb(bp);
+ QDEBUG checkb(nbp, "padblock 1");
+ return nbp;
+}
+
+/*
* copy 'count' bytes into a new block
*/
Block*
@@ -286,6 +297,8 @@
int l;
Block *nbp;
+ assert(count >= 0);
+
QDEBUG checkb(bp, "copyblock 0");
nbp = allocb(count);
for(; count > 0 && bp != nil; bp = bp->next){
@@ -300,7 +313,6 @@
memset(nbp->wp, 0, count);
nbp->wp += count;
}
- copyblockcnt++;
QDEBUG checkb(nbp, "copyblock 1");
return nbp;
@@ -334,7 +346,30 @@
return bp;
}
+/*
+ * if the allocated space is way out of line with the used
+ * space, reallocate to a smaller block
+ */
+Block*
+packblock(Block *bp)
+{
+ Block **l, *nbp;
+ int n;
+ for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
+ n = BLEN(nbp);
+ if((n<<2) < BALLOC(nbp)){
+ *l = allocb(n);
+ memmove((*l)->wp, nbp->rp, n);
+ (*l)->wp += n;
+ (*l)->next = nbp->next;
+ freeb(nbp);
+ }
+ }
+
+ return bp;
+}
+
/*
* throw away up to count bytes from a
* list of blocks. Return count of bytes
@@ -350,8 +385,8 @@
if(bph == nil)
return 0;
- while(*bph != nil && count != 0) {
- bp = *bph;
+ while((bp = *bph) != nil && count > 0) {
+ QDEBUG checkb(bp, "pullblock ");
n = BLEN(bp);
if(count < n)
n = count;
@@ -358,7 +393,6 @@
bytes += n;
count -= n;
bp->rp += n;
- QDEBUG checkb(bp, "pullblock ");
if(BLEN(bp) == 0) {
*bph = bp->next;
bp->next = nil;
@@ -369,100 +403,146 @@
}
/*
- * get next block from a queue, return null if nothing there
+ * remove a block from the front of the queue
*/
Block*
-qget(Queue *q)
+qremove(Queue *q)
{
- int dowakeup;
Block *b;
- /* sync with qwrite */
- ilock(q);
-
b = q->bfirst;
- if(b == nil){
- q->state |= Qstarve;
- iunlock(q);
+ if(b == nil)
return nil;
- }
- QDEBUG checkb(b, "qget");
+ QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
- q->len -= BALLOC(b);
q->dlen -= BLEN(b);
+ q->rp += BALLOC(b);
+ return b;
+}
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+/*
+ * put a block back to the front of the queue
+ */
+void
+qputback(Queue *q, Block *b)
+{
+ QDEBUG checkb(b, "qputback");
+ b->next = q->bfirst;
+ if(q->bfirst == nil)
+ q->blast = b;
+ q->bfirst = b;
+ q->dlen += BLEN(b);
+ q->rp -= BALLOC(b);
+}
+/*
+ * after removing data from the queue,
+ * unlock queue and wakeup blocked writer.
+ * called at interrupt level.
+ */
+static int
+iunlock_consumer(Queue *q)
+{
+ int s = q->state;
+
+ /* stop flow control when back at or below the limit */
+ if((int)(q->wp - q->rp) <= q->limit)
+ q->state = s & ~Qflow;
+
iunlock(q);
- if(dowakeup)
+ if(s & Qflow){
+ /*
+ * wakeup flow controlled writers.
+ * note that this is done even when q->state
+ * still has Qflow set, as the unblocking
+ * condition depends on the writers local queuing
+ * position, not on the global queue length.
+ */
wakeup(&q->wr);
+ }
+ return s;
+}
- return b;
+/*
+ * after removing data from the queue,
+ * unlock queue and wakeup blocked writer.
+ * get output going again when it was blocked.
+ * called at process level.
+ */
+static int
+iunlock_reader(Queue *q)
+{
+ int s = iunlock_consumer(q);
+
+ if(q->kick != nil && s & Qflow)
+ (*q->kick)(q->arg);
+
+ return s;
}
/*
- * throw away the next 'len' bytes in the queue
+ * after inserting into queue,
+ * unlock queue and wakeup starved reader.
+ * called at interrupt level.
*/
-int
-qdiscard(Queue *q, int len)
+static int
+iunlock_producer(Queue *q)
{
- Block *b, *tofree = nil;
- int dowakeup, n, sofar;
+ int s = q->state;
- ilock(q);
- for(sofar = 0; sofar < len; sofar += n){
- b = q->bfirst;
- if(b == nil)
- break;
- QDEBUG checkb(b, "qdiscard");
- n = BLEN(b);
- if(n <= len - sofar){
- q->bfirst = b->next;
- q->len -= BALLOC(b);
- q->dlen -= BLEN(b);
+ /* start flow control when above the limit */
+ if((int)(q->wp - q->rp) > q->limit)
+ s |= Qflow;
- /* remember to free this */
- b->next = tofree;
- tofree = b;
- } else {
- n = len - sofar;
- b->rp += n;
- q->dlen -= n;
- }
+ q->state = s & ~Qstarve;
+ iunlock(q);
+
+ if(s & Qstarve){
+ Proc *p = wakeup(&q->rr);
+
+ /* if we just wokeup a higher priority process, let it run */
+ if(p != nil && up != nil && p->priority > up->priority && islo())
+ sched();
}
+ return s;
+}
- /*
- * if writer flow controlled, restart
- *
- * This used to be
- * q->len < q->limit/2
- * but it slows down tcp too much for certain write sizes.
- * I really don't understand it completely. It may be
- * due to the queue draining so fast that the transmission
- * stalls waiting for the app to produce more data. - presotto
- */
- if((q->state & Qflow) && q->len < q->limit){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+/*
+ * unlock queue and wakeup starved reader.
+ * get output going again when it was starved.
+ * called at process level.
+ */
+static int
+iunlock_writer(Queue *q)
+{
+ int s = iunlock_producer(q);
- iunlock(q);
+ if(q->kick != nil && s & (Qstarve|Qkick))
+ (*q->kick)(q->arg);
- if(dowakeup)
- wakeup(&q->wr);
+ return s;
+}
- if(tofree != nil)
- freeblist(tofree);
+/*
+ * get next block from a queue, return null if nothing there
+ * called at interrupt level.
+ */
+Block*
+qget(Queue *q)
+{
+ Block *b;
- return sofar;
+ ilock(q);
+ if((b = qremove(q)) == nil){
+ q->state |= Qstarve;
+ iunlock(q);
+ return nil;
+ }
+ iunlock_consumer(q);
+
+ return b;
}
/*
@@ -472,12 +552,11 @@
qconsume(Queue *q, void *vp, int len)
{
Block *b, *tofree = nil;
- int n, dowakeup;
- uchar *p = vp;
+ int n;
- /* sync with qwrite */
- ilock(q);
+ assert(len >= 0);
+ ilock(q);
for(;;) {
b = q->bfirst;
if(b == nil){
@@ -490,8 +569,10 @@
n = BLEN(b);
if(n > 0)
break;
+
+ /* get rid of zero-length blocks */
q->bfirst = b->next;
- q->len -= BALLOC(b);
+ q->rp += BALLOC(b);
/* remember to free this */
b->next = tofree;
@@ -498,10 +579,9 @@
tofree = b;
};
- consumecnt += n;
if(n < len)
len = n;
- memmove(p, b->rp, len);
+ memmove(vp, b->rp, len);
b->rp += len;
q->dlen -= len;
@@ -508,7 +588,7 @@
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
- q->len -= BALLOC(b);
+ q->rp += BALLOC(b);
q->dlen -= BLEN(b);
/* remember to free this */
@@ -515,23 +595,40 @@
b->next = tofree;
tofree = b;
}
-
out:
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- } else
- dowakeup = 0;
+ iunlock_consumer(q);
- iunlock(q);
+ freeblist(tofree);
- if(dowakeup)
- wakeup(&q->wr);
+ return len;
+}
- if(tofree != nil)
- freeblist(tofree);
+/*
+ * add a block list to a queue, return bytes added
+ */
+int
+qaddlist(Queue *q, Block *b)
+{
+ int len;
+ QDEBUG checkb(b, "qaddlist 1");
+
+ /* queue the block */
+ if(q->bfirst != nil)
+ q->blast->next = b;
+ else
+ q->bfirst = b;
+
+ len = BLEN(b);
+ q->wp += BALLOC(b);
+ while(b->next != nil){
+ b = b->next;
+ QDEBUG checkb(b, "qaddlist 2");
+ len += BLEN(b);
+ q->wp += BALLOC(b);
+ }
+ q->dlen += len;
+ q->blast = b;
return len;
}
@@ -538,36 +635,22 @@
int
qpass(Queue *q, Block *b)
{
- int len, dowakeup;
+ int len;
- /* sync with qread */
- dowakeup = 0;
ilock(q);
- if(q->len >= q->limit){
+ if(q->state & Qclosed){
iunlock(q);
freeblist(b);
- return -1;
+ return 0;
}
- if(q->state & Qclosed){
+ if(q->state & Qflow){
iunlock(q);
freeblist(b);
- return 0;
+ return -1;
}
-
len = qaddlist(q, b);
+ iunlock_producer(q);
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
-
- if(dowakeup)
- wakeup(&q->rr);
-
return len;
}
@@ -574,101 +657,36 @@
int
qpassnolim(Queue *q, Block *b)
{
- int len, dowakeup;
+ int len;
- /* sync with qread */
- dowakeup = 0;
ilock(q);
-
if(q->state & Qclosed){
iunlock(q);
freeblist(b);
return 0;
}
-
len = qaddlist(q, b);
+ iunlock_producer(q);
- if(q->len >= q->limit/2)
- q->state |= Qflow;
-
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
-
- if(dowakeup)
- wakeup(&q->rr);
-
return len;
}
-/*
- * if the allocated space is way out of line with the used
- * space, reallocate to a smaller block
- */
-Block*
-packblock(Block *bp)
-{
- Block **l, *nbp;
- int n;
-
- for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
- n = BLEN(nbp);
- if((n<<2) < BALLOC(nbp)){
- *l = allocb(n);
- memmove((*l)->wp, nbp->rp, n);
- (*l)->wp += n;
- (*l)->next = nbp->next;
- freeb(nbp);
- }
- }
-
- return bp;
-}
-
int
qproduce(Queue *q, void *vp, int len)
{
Block *b;
- int dowakeup;
- uchar *p = vp;
+ assert(len >= 0);
+
b = iallocb(len);
if(b == nil)
return 0;
- /* sync with qread */
- dowakeup = 0;
- ilock(q);
-
- /* no waiting receivers, room in buffer? */
- if(q->len >= q->limit){
- q->state |= Qflow;
- iunlock(q);
- freeb(b);
- return -1;
- }
- producecnt += len;
-
/* save in buffer */
- memmove(b->wp, p, len);
+ memmove(b->wp, vp, len);
b->wp += len;
- qaddlist(q, b);
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
-
- if(q->len >= q->limit)
- q->state |= Qflow;
- iunlock(q);
-
- if(dowakeup)
- wakeup(&q->rr);
-
- return len;
+ return qpass(q, b);
}
/*
@@ -679,6 +697,8 @@
{
Block *b;
+ assert(len >= 0);
+
b = allocb(len);
ilock(q);
b->wp += readblist(q->bfirst, b->wp, len, offset);
@@ -694,16 +714,18 @@
{
Queue *q;
+ assert(limit >= 0);
+
q = malloc(sizeof(Queue));
if(q == nil)
return nil;
+ q->dlen = 0;
+ q->wp = q->rp = 0;
q->limit = q->inilim = limit;
q->kick = kick;
q->arg = arg;
- q->state = msg;
-
- q->state |= Qstarve;
+ q->state = msg | Qstarve;
q->eof = 0;
q->noblock = 0;
@@ -720,10 +742,14 @@
if(q == nil)
return nil;
+ q->dlen = 0;
+ q->wp = q->rp = 0;
q->limit = 0;
q->arg = arg;
q->bypass = bypass;
q->state = 0;
+ q->eof = 0;
+ q->noblock = 0;
return q;
}
@@ -733,7 +759,7 @@
{
Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != nil;
+ return q->bfirst != nil || (q->state & Qclosed);
}
/*
@@ -749,10 +775,9 @@
break;
if(q->state & Qclosed){
- if(++q->eof > 3)
+ if(q->eof >= 3 || *q->err && strcmp(q->err, Ehungup) != 0)
return -1;
- if(*q->err && strcmp(q->err, Ehungup) != 0)
- return -1;
+ q->eof++;
return 0;
}
@@ -765,101 +790,6 @@
}
/*
- * add a block list to a queue, return bytes added
- */
-int
-qaddlist(Queue *q, Block *b)
-{
- int len, dlen;
-
- QDEBUG checkb(b, "qaddlist 1");
-
- /* queue the block */
- if(q->bfirst != nil)
- q->blast->next = b;
- else
- q->bfirst = b;
-
- len = BALLOC(b);
- dlen = BLEN(b);
- while(b->next != nil){
- b = b->next;
- QDEBUG checkb(b, "qaddlist 2");
-
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
- return dlen;
-}
-
-/*
- * called with q ilocked
- */
-Block*
-qremove(Queue *q)
-{
- Block *b;
-
- b = q->bfirst;
- if(b == nil)
- return nil;
- QDEBUG checkb(b, "qremove");
- q->bfirst = b->next;
- b->next = nil;
- q->dlen -= BLEN(b);
- q->len -= BALLOC(b);
- return b;
-}
-
-/*
- * copy the contents of a string of blocks into
- * memory from an offset. blocklist kept unchanged.
- * return number of copied bytes.
- */
-long
-readblist(Block *b, uchar *p, long n, ulong o)
-{
- ulong m, r;
-
- r = 0;
- while(n > 0 && b != nil){
- m = BLEN(b);
- if(o >= m)
- o -= m;
- else {
- m -= o;
- if(n < m)
- m = n;
- memmove(p, b->rp + o, m);
- p += m;
- r += m;
- n -= m;
- o = 0;
- }
- b = b->next;
- }
- return r;
-}
-
-/*
- * put a block back to the front of the queue
- * called with q ilocked
- */
-void
-qputback(Queue *q, Block *b)
-{
- b->next = q->bfirst;
- if(q->bfirst == nil)
- q->blast = b;
- q->bfirst = b;
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
-}
-
-/*
* cut off n bytes from the end of *h. return a new
* block with the tail and change *h to refer to the
* head.
@@ -889,31 +819,6 @@
}
/*
- * flow control, get producer going again
- * called with q ilocked
- */
-static void
-qwakeup_iunlock(Queue *q)
-{
- int dowakeup = 0;
-
- /* if writer flow controlled, restart */
- if((q->state & Qflow) && q->len < q->limit/2){
- q->state &= ~Qflow;
- dowakeup = 1;
- }
-
- iunlock(q);
-
- /* wakeup flow controlled writers */
- if(dowakeup){
- if(q->kick != nil)
- q->kick(q->arg);
- wakeup(&q->wr);
- }
-}
-
-/*
* get next block from a queue (up to a limit)
*/
Block*
@@ -922,6 +827,8 @@
Block *b;
int n;
+ assert(len >= 0);
+
eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
@@ -954,10 +861,8 @@
else
b->wp -= n;
}
+ iunlock_reader(q);
- /* restart producer */
- qwakeup_iunlock(q);
-
qunlock(&q->rlock);
poperror();
@@ -974,6 +879,8 @@
Block *b, *first, **last;
int m, n;
+ assert(len >= 0);
+
eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
@@ -1005,8 +912,8 @@
freeb(qremove(q));
goto again;
}
-
- /* grab the first block plus as many
+ /*
+ * grab the first block plus as many
* following blocks as will partially
* fit in the read.
*/
@@ -1029,8 +936,7 @@
if(n > len && (q->state & Qmsg) == 0)
qputback(q, splitblock(last, n - len));
- /* restart producer */
- qwakeup_iunlock(q);
+ iunlock_reader(q);
qunlock(&q->rlock);
poperror();
@@ -1046,34 +952,39 @@
return n;
}
+/*
+ * a Flow represens a flow controlled
+ * writer on queue q with position p.
+ */
+typedef struct {
+ Queue* q;
+ uint p;
+} Flow;
+
static int
-qnotfull(void *a)
+unblocked(void *a)
{
- Queue *q = a;
+ Flow *f = a;
+ Queue *q = f->q;
- return q->len < q->limit || (q->state & Qclosed);
+ return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed);
}
/*
- * flow control, wait for queue to get below the limit
+ * flow control, wait for queue to drain back to the limit
*/
static void
-qflow(Queue *q)
+qflow(Flow *f)
{
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
+ Queue *q = f->q;
- ilock(q);
- q->state |= Qflow;
- iunlock(q);
-
+ while(!unblocked(f)){
eqlock(&q->wlock);
if(waserror()){
qunlock(&q->wlock);
nexterror();
}
- sleep(&q->wr, qnotfull, q);
+ sleep(&q->wr, unblocked, f);
qunlock(&q->wlock);
poperror();
}
@@ -1085,8 +996,8 @@
long
qbwrite(Queue *q, Block *b)
{
- int len, dowakeup;
- Proc *p;
+ Flow flow;
+ int len;
if(q->bypass != nil){
len = blocklen(b);
@@ -1094,7 +1005,6 @@
return len;
}
- dowakeup = 0;
if(waserror()){
freeblist(b);
nexterror();
@@ -1106,9 +1016,11 @@
iunlock(q);
error(q->err);
}
-
- /* don't queue over the limit */
- if(q->len >= q->limit && q->noblock){
+ /*
+ * if the queue is full,
+ * silently discard when non-blocking
+ */
+ if(q->state & Qflow && q->noblock){
iunlock(q);
poperror();
len = blocklen(b);
@@ -1115,43 +1027,48 @@
freeblist(b);
return len;
}
-
len = qaddlist(q, b);
-
- /* make sure other end gets awakened */
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
- iunlock(q);
poperror();
- /* get output going again */
- if(q->kick != nil && (dowakeup || (q->state&Qkick)))
- q->kick(q->arg);
-
- /* wakeup anyone consuming at the other end */
- if(dowakeup){
- p = wakeup(&q->rr);
-
- /* if we just wokeup a higher priority process, let it run */
- if(p != nil && p->priority > up->priority)
- sched();
- }
-
/*
- * flow control, before allowing the process to continue and
- * queue more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This means that
- * things like 9p flushes and ssl messages will not be disrupted
- * by software interrupts.
+ * save our current position in queue
+ * for flow control below.
*/
- qflow(q);
+ flow.q = q;
+ flow.p = q->wp;
+ if(iunlock_writer(q) & Qflow){
+ /*
+ * flow control, before allowing the process to continue and
+ * queue more. We do this here so that postnote can only
+ * interrupt us after the data has been queued. This means that
+ * things like 9p flushes and ssl messages will not be disrupted
+ * by software interrupts.
+ */
+ qflow(&flow);
+ }
return len;
}
/*
+ * block here uninterruptable until queue drains.
+ */
+static void
+qbloated(Queue *q)
+{
+ Flow flow;
+
+ flow.q = q;
+ flow.p = q->wp;
+ while(waserror()){
+ if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
+ error(Egreg);
+ }
+ qflow(&flow);
+ poperror();
+}
+
+/*
* write to a queue. only Maxatomic bytes at a time is atomic.
*/
int
@@ -1161,8 +1078,11 @@
Block *b;
uchar *p = vp;
+ assert(len >= 0);
+
QDEBUG if(!islo())
print("qwrite hi %#p\n", getcallerpc(&q));
+
/*
* when the queue length grew over twice the limit,
* block here before allocating more blocks.
@@ -1170,14 +1090,8 @@
* interrupted by notes, preventing effective
* flow control.
*/
- if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
- while(waserror()){
- if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
- error(Egreg);
- }
- qflow(q);
- poperror();
- }
+ if(q->state & Qflow && (int)(q->wp - q->rp)/2 > q->limit)
+ qbloated(q);
sofar = 0;
do {
@@ -1207,11 +1121,11 @@
int
qiwrite(Queue *q, void *vp, int len)
{
- int n, sofar, dowakeup;
+ int n, sofar;
Block *b;
uchar *p = vp;
- dowakeup = 0;
+ assert(len >= 0);
sofar = 0;
do {
@@ -1226,43 +1140,72 @@
b->wp += n;
ilock(q);
-
- if((q->state & Qclosed) != 0 || q->len >= q->limit){
+ if(q->state & (Qflow|Qclosed)){
iunlock(q);
freeb(b);
break;
}
+ sofar += qaddlist(q, b);
+ iunlock_writer(q);
+ } while(sofar < len && (q->state & Qmsg) == 0);
- qaddlist(q, b);
+ return sofar;
+}
- if(q->state & Qstarve){
- q->state &= ~Qstarve;
- dowakeup = 1;
- }
+/*
+ * throw away the next 'len' bytes in the queue
+ */
+int
+qdiscard(Queue *q, int len)
+{
+ Block *b, *tofree = nil;
+ int n, sofar;
- iunlock(q);
+ assert(len >= 0);
- if(dowakeup){
- if(q->kick != nil)
- q->kick(q->arg);
- wakeup(&q->rr);
+ ilock(q);
+ for(sofar = 0; sofar < len; sofar += n){
+ b = q->bfirst;
+ if(b == nil)
+ break;
+ QDEBUG checkb(b, "qdiscard");
+ n = BLEN(b);
+ if(n <= len - sofar){
+ q->bfirst = b->next;
+ q->rp += BALLOC(b);
+
+ /* remember to free this */
+ b->next = tofree;
+ tofree = b;
+ } else {
+ n = len - sofar;
+ b->rp += n;
}
+ q->dlen -= n;
+ }
+ iunlock_reader(q);
- sofar += n;
- } while(sofar < len && (q->state & Qmsg) == 0);
+ freeblist(tofree);
return sofar;
}
/*
- * be extremely careful when calling this,
- * as there is no reference accounting
+ * flush the output queue
*/
void
-qfree(Queue *q)
+qflush(Queue *q)
{
- qclose(q);
- free(q);
+ Block *tofree;
+
+ ilock(q);
+ tofree = q->bfirst;
+ q->bfirst = nil;
+ q->rp = q->wp;
+ q->dlen = 0;
+ iunlock_reader(q);
+
+ freeblist(tofree);
}
/*
@@ -1272,32 +1215,42 @@
void
qclose(Queue *q)
{
- Block *bfirst;
+ Block *tofree;
if(q == nil)
return;
- /* mark it */
ilock(q);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
kstrcpy(q->err, Ehungup, ERRMAX);
- bfirst = q->bfirst;
+ tofree = q->bfirst;
q->bfirst = nil;
- q->len = 0;
+ q->rp = q->wp;
q->dlen = 0;
q->noblock = 0;
iunlock(q);
- /* free queued blocks */
- freeblist(bfirst);
-
/* wake up readers/writers */
wakeup(&q->rr);
wakeup(&q->wr);
+
+ /* free queued blocks */
+ freeblist(tofree);
}
/*
+ * be extremely careful when calling this,
+ * as there is no reference accounting
+ */
+void
+qfree(Queue *q)
+{
+ qclose(q);
+ free(q);
+}
+
+/*
* Mark a queue as closed. Wakeup any readers. Don't remove queued
* blocks.
*/
@@ -1304,7 +1257,6 @@
void
qhangup(Queue *q, char *msg)
{
- /* mark it */
ilock(q);
q->state |= Qclosed;
if(msg == nil || *msg == '\0')
@@ -1350,26 +1302,21 @@
}
/*
- * return space remaining before flow control
+ * return true if we can read without blocking
*/
int
-qwindow(Queue *q)
+qcanread(Queue *q)
{
- int l;
-
- l = q->limit - q->len;
- if(l < 0)
- l = 0;
- return l;
+ return q->bfirst != nil;
}
/*
- * return true if we can read without blocking
+ * return non-zero when the queue is full
*/
int
-qcanread(Queue *q)
+qfull(Queue *q)
{
- return q->bfirst != nil;
+ return q->state & Qflow;
}
/*
@@ -1378,7 +1325,11 @@
void
qsetlimit(Queue *q, int limit)
{
+ assert(limit >= 0);
+
+ ilock(q);
q->limit = limit;
+ iunlock_consumer(q);
}
/*
@@ -1387,34 +1338,7 @@
void
qnoblock(Queue *q, int onoff)
{
- q->noblock = onoff;
-}
-
-/*
- * flush the output queue
- */
-void
-qflush(Queue *q)
-{
- Block *bfirst;
-
- /* mark it */
ilock(q);
- bfirst = q->bfirst;
- q->bfirst = nil;
- q->len = 0;
- q->dlen = 0;
- iunlock(q);
-
- /* free queued blocks */
- freeblist(bfirst);
-
- /* wake up writers */
- wakeup(&q->wr);
-}
-
-int
-qfull(Queue *q)
-{
- return q->state & Qflow;
+ q->noblock = onoff;
+ iunlock_consumer(q);
}