ref: a2be120ea93ae67447315da268fa336650cd5149
parent: 5aaa7240a20f34d319bee496005e2136040b8f5c
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Thu Mar 17 13:48:19 EDT 2016
abandon streaming experiment for queue like non-seekable files, it is impossible to implement an exportfs because one has to run the kernels devtab read() and write() in separate processes, and that makes it impossible to maintain 9p message order as the scheduler can come in and randomly schedule one process before another. so as soon as we have a transition from 9p -> syscalls, we'r screwed. i currently see just two possibilities: - introduce special file type like QTSEQ with strictly ordered i/o semantics - fix all fileservers and exportfs to only do one outstanding i/o to QTSEQ files which means maintaining a queue per fid this doesnt propagate. so exporting slow 9p mount again will be limited again by latency of the inner mount. other option: - return offset in Rread, so client can bring responses back into order. this requires changing all fileservers and drivers to maintain such an per fid offset and change the protocol to include it in the response, and also pass it to userspace (new syscalls or pass it in TOS) this only works for read pipelining, write is still screwed. both options suck. -- cinap
--- a/sys/src/9/port/devmnt.c
+++ b/sys/src/9/port/devmnt.c
@@ -30,8 +30,6 @@
uint rpclen; /* len of buffer */
Block* b; /* reply blocks */
Mntrpc* flushed; /* message this one flushes */
- void *iocomarg; /* Rpc completion callback for pipelining */
- void (*iocomfun)(void*, int);
char done; /* Rpc completed */
};
@@ -1017,9 +1015,6 @@
lock(m);
r->z = &up->sleep;
r->m = m;
- r->iocomarg = up->iocomarg;
- r->iocomfun = up->iocomfun;
- up->iocomfun = nil;
r->list = m->queue;
m->queue = r;
unlock(m);
@@ -1044,10 +1039,6 @@
if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n)
error(Emountrpc);
- /* Rpc commited */
- if(r->iocomfun != nil)
- (*r->iocomfun)(r->iocomarg, 0);
-
/* Gate readers onto the mount point one at a time */
for(;;) {
lock(m);
@@ -1190,11 +1181,6 @@
/* look for a reply to a message */
if(q->request.tag == r->reply.tag) {
*l = q->list;
-
- /* Rpc completed */
- if(q->iocomfun != nil)
- (*q->iocomfun)(q->iocomarg, 1);
-
if(q == r) {
q->done = 1;
unlock(m);
--- a/sys/src/9/port/devstream.c
+++ /dev/null
@@ -1,580 +1,0 @@
-#include "u.h"
-#include "../port/lib.h"
-#include "mem.h"
-#include "dat.h"
-#include "fns.h"
-#include "../port/error.h"
-
-typedef struct Stream Stream;
-typedef struct Iocom Iocom;
-
-struct Stream
-{
- Ref;
- Lock;
-
- int iounit;
- int noseek;
-
- Ref nrp;
- Ref nwp;
- Ref nwq;
-
- Proc *rp[4];
- Proc *wp[2];
-
- Block *rlist;
-
- vlong soff;
- vlong roff;
- vlong woff;
-
- QLock rcl;
- QLock wcl;
- QLock rql;
- QLock wql;
-
- Rendez wz;
-
- Queue *rq;
- Queue *wq;
- Chan *f;
-};
-
-struct Iocom
-{
- Proc *p;
- QLock *q;
- Stream *s;
- Block *b;
-};
-
-static void
-putstream(Stream *s)
-{
- if(decref(s))
- return;
- freeblist(s->rlist);
- qfree(s->rq);
- qfree(s->wq);
- if(s->f != nil)
- cclose(s->f);
- free(s);
-}
-
-#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
-#define BDONE (1<<15)
-#define BERROR (1<<14)
-
-static Block*
-sblock(Stream *s)
-{
- Block *b;
-
- b = allocb(sizeof(vlong)+s->iounit);
- b->flag &= ~(BDONE|BERROR);
- b->wp += sizeof(vlong);
- b->rp = b->wp;
- return b;
-}
-
-static void
-iocom(void *arg, int complete)
-{
- Iocom *io = arg;
- Stream *s;
- QLock *q;
- Proc *p;
-
- p = io->p;
- if(complete && p == up){
- up->iocomfun = nil;
- up->iocomarg = nil;
- }
-
- q = io->q;
- if(q != nil && p == up){
- io->q = nil;
- qunlock(q);
- }
-
- s = io->s;
- if(complete && s != nil && s->noseek){
- io->s = nil;
- lock(s);
- BOFF(io->b) = s->soff;
- s->soff += s->iounit;
- unlock(s);
- }
-}
-
-static void
-ioq(Iocom *io, QLock *q)
-{
- eqlock(q); /* unlocked in iocom() above */
-
- io->p = up;
- io->q = q;
- io->s = nil;
- io->b = nil;
-
- up->iocomarg = io;
- up->iocomfun = iocom;
-}
-
-static void
-streamreader(void *arg)
-{
- Stream *s = arg;
- Iocom io;
- Chan *f;
- Block *b, *l, **ll;
- vlong o;
- int id, n;
-
- id = incref(&s->nrp) % nelem(s->rp);
- s->rp[id] = up;
-
- f = s->f;
- b = sblock(s);
- qlock(&s->rql);
- if(waserror()){
- qhangup(s->rq, up->errstr);
- goto Done;
- }
- if(s->noseek == -1){
- BOFF(b) = 0;
- n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
-
- if(n > 0){
- b->wp += n;
- b->flag |= BDONE;
- b->next = nil;
- s->rlist = b;
- s->soff = s->iounit;
- s->roff = 0;
- s->noseek = 1;
-
- b = sblock(s);
- } else {
- s->noseek = 0;
- }
- }
- while(!qisclosed(s->rq)) {
- ll = &s->rlist;
- while((l = *ll) != nil){
- if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
- if(s->noseek){
- ll = &l->next;
- continue;
- }
- break;
- }
- if((l->flag & BERROR) != 0)
- error((char*)l->rp);
- if(BLEN(l) == 0){
- qhangup(s->rq, nil);
- poperror();
- goto Done;
- }
- s->roff += s->noseek ? s->iounit : BLEN(l);
- *ll = l->next;
- l->next = nil;
- qbwrite(s->rq, l);
- }
-
- n = s->iounit;
- o = s->roff;
- l = s->rlist;
- if(s->noseek) {
- o = 0;
- b->next = l;
- s->rlist = b;
- } else if(l == nil) {
- b->next = nil;
- s->rlist = b;
- } else {
- if(o < BOFF(l)){
- n = BOFF(l) - o;
- b->next = l;
- s->rlist = b;
- } else {
- for(;; l = l->next){
- if((l->flag & BDONE) != 0 && BLEN(l) == 0)
- goto Done;
- o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
- if(l->next == nil)
- break;
- if(o < BOFF(l->next)){
- n = BOFF(l->next) - o;
- break;
- }
- }
- b->next = l->next;
- l->next = b;
- }
- }
- BOFF(b) = o;
- qunlock(&s->rql);
-
- if(waserror()){
- poperror();
- goto Exit;
- }
- ioq(&io, &s->rcl);
- io.b = b;
- io.s = s;
- if(waserror()){
- strncpy((char*)b->wp, up->errstr, s->iounit-1);
- b->wp[s->iounit-1] = 0;
- n = -1;
- } else {
- n = devtab[f->type]->read(f, b->wp, n, o);
- if(n < 0)
- error(Eio);
- poperror();
- }
- iocom(&io, 1);
- poperror();
-
- l = b;
- b = sblock(s);
- qlock(&s->rql);
- if(n >= 0)
- l->wp += n;
- else
- l->flag |= BERROR;
- l->flag |= BDONE;
- }
- poperror();
-Done:
- qunlock(&s->rql);
- freeb(b);
-Exit:
- s->rp[id] = nil;
- putstream(s);
- pexit("closed", 1);
-}
-
-static void
-streamwriter(void *arg)
-{
- Stream *s = arg;
- Iocom io;
- Block *b;
- Chan *f;
- vlong o;
- int id, n;
-
- id = incref(&s->nwp) % nelem(s->wp);
- s->wp[id] = up;
-
- f = s->f;
- while(!qisclosed(s->wq)) {
- if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
- wakeup(&s->wz); /* queue drained */
- if(waserror()){
- decref(&s->nwq);
- break;
- }
- ioq(&io, &s->wcl);
- b = qbread(s->wq, s->iounit);
- decref(&s->nwq);
- if(b == nil){
- iocom(&io, 1);
- break;
- }
- poperror();
-
- if(waserror()){
- qhangup(s->wq, up->errstr);
- iocom(&io, 1);
- freeb(b);
- break;
- }
- n = BLEN(b);
- o = s->woff;
- s->woff += n;
- if(devtab[f->type]->write(f, b->rp, n, o) != n)
- error(Eio);
- iocom(&io, 1);
- freeb(b);
- poperror();
- }
-
- s->wp[id] = nil;
- wakeup(&s->wz);
-
- putstream(s);
- pexit("closed", 1);
-}
-
-static int
-streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
-{
- static int perm[] = { 0400, 0200, 0600, 0 };
- Fgrp *fgrp = up->fgrp;
- Chan *f;
- Qid q;
-
- if(s == DEVDOTDOT){
- devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
- return 1;
- }
- if(s == 0)
- return 0;
- s--;
- if(s > fgrp->maxfd)
- return -1;
- if((f=fgrp->fd[s]) == nil)
- return 0;
- sprint(up->genbuf, "%dstream", s);
- mkqid(&q, s+1, 0, QTFILE);
- devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
- return 1;
-}
-
-static Chan*
-streamattach(char *spec)
-{
- return devattach(L'¶', spec);
-}
-
-static Walkqid*
-streamwalk(Chan *c, Chan *nc, char **name, int nname)
-{
- return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
-}
-
-static int
-streamstat(Chan *c, uchar *db, int n)
-{
- return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
-}
-
-static Chan*
-streamopen(Chan *c, int omode)
-{
- Stream *s;
-
- c->aux = nil;
- if(c->qid.type & QTDIR){
- if(omode != 0)
- error(Eisdir);
- c->mode = 0;
- c->flag |= COPEN;
- c->offset = 0;
- return c;
- }
- s = mallocz(sizeof(*s), 1);
- if(s == nil)
- error(Enomem);
- incref(s);
- if(waserror()){
- putstream(s);
- nexterror();
- }
- omode = openmode(omode);
- s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
- if(s->f == nil || s->f->qid.type != QTFILE)
- error(Eperm);
- s->noseek = -1;
- s->roff = s->f->offset;
- s->woff = s->f->offset;
- s->iounit = s->f->iounit;
- if(s->iounit <= 0 || s->iounit > qiomaxatomic)
- s->iounit = qiomaxatomic;
- c->iounit = s->iounit;
- c->aux = s;
- c->mode = omode;
- c->flag |= COPEN;
- c->offset = 0;
- poperror();
- return c;
-}
-
-static int
-isdrained(void *a)
-{
- Stream *s;
- int i;
-
- s = a;
- if(s->wq == nil)
- return 1;
-
- if(qisclosed(s->wq) == 0)
- return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
-
- for(i=0; i<nelem(s->wp); i++)
- if(s->wp[i] != nil)
- return 0;
-
- return 1;
-}
-
-static void
-streamdrain(Chan *c)
-{
- Stream *s;
-
- if((s = c->aux) == nil)
- return;
- eqlock(&s->wql);
- if(waserror()){
- qunlock(&s->wql);
- nexterror();
- }
- while(!isdrained(s))
- sleep(&s->wz, isdrained, s);
- qunlock(&s->wql);
- poperror();
-}
-
-static void
-streamclose(Chan *c)
-{
- Stream *s;
- int i;
-
- if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
- return;
- if(s->rq != nil){
- qclose(s->rq);
- for(i=0; i<nelem(s->rp); i++)
- postnote(s->rp[i], 1, "streamclose", 0);
- }
- if(s->wq != nil){
- qhangup(s->wq, nil);
- if(!waserror()){
- streamdrain(c);
- poperror();
- }
- qclose(s->wq); /* discard the data */
- for(i=0; i<nelem(s->wp); i++)
- postnote(s->wp[i], 1, "streamclose", 0);
- }
- c->aux = nil;
- putstream(s);
-}
-
-static int
-canpipeline(Chan *f, int mode)
-{
- USED(mode);
-
- return devtab[f->type]->dc == 'M';
-}
-
-static Queue*
-streamqueue(Chan *c, int mode)
-{
- Stream *s;
- int i, n;
-
- s = c->aux;
- if(s == nil || c->qid.type != QTFILE)
- error(Eperm);
-
- switch(mode){
- case OREAD:
- while(s->rq == nil){
- qlock(&s->rql);
- if(s->rq != nil){
- qunlock(&s->rql);
- break;
- }
- s->rq = qopen(conf.pipeqsize, 0, 0, 0);
- if(s->rq == nil){
- qunlock(&s->rql);
- error(Enomem);
- }
- n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
- for(i=0; i<n; i++){
- incref(s);
- kproc("streamreader", streamreader, s);
- }
- while(s->nrp.ref != n)
- sched();
- qunlock(&s->rql);
- break;
- }
- return s->rq;
- case OWRITE:
- while(s->wq == nil){
- qlock(&s->wql);
- if(s->wq != nil){
- qunlock(&s->wql);
- break;
- }
- s->wq = qopen(conf.pipeqsize, 0, 0, 0);
- if(s->wq == nil){
- qunlock(&s->wql);
- error(Enomem);
- }
- n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
- for(i=0; i<n; i++){
- incref(s);
- kproc("streamwriter", streamwriter, s);
- }
- while(s->nwp.ref != n)
- sched();
- qunlock(&s->wql);
- break;
- }
- return s->wq;
- }
- error(Egreg);
- return nil;
-}
-
-static long
-streamread(Chan *c, void *va, long n, vlong)
-{
- if(c->qid.type == QTDIR)
- return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
- return qread(streamqueue(c, OREAD), va, n);
-}
-
-static Block*
-streambread(Chan *c, long n, ulong)
-{
- return qbread(streamqueue(c, OREAD), n);
-}
-
-static long
-streamwrite(Chan *c, void *va, long n, vlong)
-{
- if(n == 0)
- streamdrain(c);
- return qwrite(streamqueue(c, OWRITE), va, n);
-}
-
-static long
-streambwrite(Chan *c, Block *b, ulong)
-{
- if(BLEN(b) == 0)
- streamdrain(c);
- return qbwrite(streamqueue(c, OWRITE), b);
-}
-
-Dev streamdevtab = {
- L'¶',
- "stream",
-
- devreset,
- devinit,
- devshutdown,
- streamattach,
- streamwalk,
- streamstat,
- streamopen,
- devcreate,
- streamclose,
- streamread,
- streambread,
- streamwrite,
- streambwrite,
- devremove,
- devwstat,
-};
--- a/sys/src/9/port/portdat.h
+++ b/sys/src/9/port/portdat.h
@@ -777,9 +777,6 @@
PMMU;
char *syscalltrace; /* syscall trace */
-
- void *iocomarg; /* I/O completion callback for pipelining */
- void (*iocomfun)(void*, int);
};
enum
--- a/sys/src/cmd/cp.c
+++ b/sys/src/cmd/cp.c
@@ -127,19 +127,6 @@
if(buflen <= 0)
buflen = DEFB;
- if(dirb->length/2 > buflen){
- char nam[32];
-
- snprint(nam, sizeof nam, "/fd/%dstream", fdf);
- fds = open(nam, OREAD);
- if(fds >= 0){
- close(fdf);
- fdf = fds;
- }
- snprint(nam, sizeof nam, "/fd/%dstream", fdt);
- fds = open(nam, OWRITE);
- }
-
if(copy1(fdf, fds < 0 ? fdt : fds, from, to)==0){
if(fds >= 0 && write(fds, "", 0) < 0){
fprint(2, "cp: error writing %s: %r\n", to);