ref: 71cda09d1ec39aa29dc4bcdd332fa64ca7169d59
parent: 0bdfa3699dede75e657a24bd22e0e4aa7eafd5e4
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sat Jul 18 23:31:17 EDT 2015
devstream: fast sequential file access with 9p pipelining experiment
--- /dev/null
+++ b/sys/man/3/stream
@@ -1,0 +1,40 @@
+.TH STREAM 3
+.SH NAME
+stream \- fast sequential file access
+.SH SYNOPSIS
+.nf
+.B bind #¶ /fd
+
+.B /fd/0stream
+.B /fd/1stream
+\&...
+.fi
+.SH DESCRIPTION
+The
+.I stream
+device serves a one-level directory containing files of the form
+.IR N stream
+where
+.I N
+is a file descriptor of the current process.
+.PP
+An
+.IR open (2)
+returns a stream file descriptor connected to the original file
+refered to by the file descriptor
+.IR N .
+When a stream was opend for reading, the device will start
+continuously reading the file in the background until it reaches
+the end of the file. A
+.IR read (2)
+on the stream consumes the prefetched data in sequential order.
+.PP
+When a stream is opend for writing, writes to the stream will
+return immidiately without waiting for the data to be written
+to the file. A zero-length write can be used to wait for the
+buffered data to drain and return any previous write errors.
+.SH SEE ALSO
+.IR dup (2),
+.IR pipe (3)
+.SH SOURCE
+.B /sys/src/9/port/devstream.c
--- a/sys/src/9/port/devmnt.c
+++ b/sys/src/9/port/devmnt.c
@@ -30,6 +30,8 @@
uint rpclen; /* len of buffer */
Block* b; /* reply blocks */
Mntrpc* flushed; /* message this one flushes */
+ void *iocomarg; /* Rpc completion callback for pipelining */
+ void (*iocomfun)(void*, int);
char done; /* Rpc completed */
};
@@ -789,6 +791,9 @@
lock(m);
r->z = &up->sleep;
r->m = m;
+ r->iocomarg = up->iocomarg;
+ r->iocomfun = up->iocomfun;
+ up->iocomfun = nil;
r->list = m->queue;
m->queue = r;
unlock(m);
@@ -806,6 +811,10 @@
if(devtab[m->c->type]->write(m->c, r->rpc, n, 0) != n)
error(Emountrpc);
+ /* Rpc commited */
+ if(r->iocomfun != nil)
+ (*r->iocomfun)(r->iocomarg, 0);
+
/* Gate readers onto the mount point one at a time */
for(;;) {
lock(m);
@@ -948,6 +957,11 @@
/* look for a reply to a message */
if(q->request.tag == r->reply.tag) {
*l = q->list;
+
+ /* Rpc completed */
+ if(q->iocomfun != nil)
+ (*q->iocomfun)(q->iocomarg, 1);
+
if(q == r) {
q->done = 1;
unlock(m);
--- /dev/null
+++ b/sys/src/9/port/devstream.c
@@ -1,0 +1,580 @@
+#include "u.h"
+#include "../port/lib.h"
+#include "mem.h"
+#include "dat.h"
+#include "fns.h"
+#include "../port/error.h"
+
+typedef struct Stream Stream;
+typedef struct Iocom Iocom;
+
+struct Stream
+{
+ Ref;
+ Lock;
+
+ int iounit;
+ int noseek;
+
+ Ref nrp;
+ Ref nwp;
+ Ref nwq;
+
+ Proc *rp[4];
+ Proc *wp[2];
+
+ Block *rlist;
+
+ vlong soff;
+ vlong roff;
+ vlong woff;
+
+ QLock rcl;
+ QLock wcl;
+ QLock rql;
+ QLock wql;
+
+ Rendez wz;
+
+ Queue *rq;
+ Queue *wq;
+ Chan *f;
+};
+
+struct Iocom
+{
+ Proc *p;
+ QLock *q;
+ Stream *s;
+ Block *b;
+};
+
+static void
+putstream(Stream *s)
+{
+ if(decref(s))
+ return;
+ freeblist(s->rlist);
+ qfree(s->rq);
+ qfree(s->wq);
+ if(s->f != nil)
+ cclose(s->f);
+ free(s);
+}
+
+#define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
+#define BDONE (1<<15)
+#define BERROR (1<<14)
+
+static Block*
+sblock(Stream *s)
+{
+ Block *b;
+
+ b = allocb(sizeof(vlong)+s->iounit);
+ b->flag &= ~(BDONE|BERROR);
+ b->wp += sizeof(vlong);
+ b->rp = b->wp;
+ return b;
+}
+
+static void
+iocom(void *arg, int complete)
+{
+ Iocom *io = arg;
+ Stream *s;
+ QLock *q;
+ Proc *p;
+
+ p = io->p;
+ if(complete && p == up){
+ up->iocomfun = nil;
+ up->iocomarg = nil;
+ }
+
+ q = io->q;
+ if(q != nil && p == up){
+ io->q = nil;
+ qunlock(q);
+ }
+
+ s = io->s;
+ if(complete && s != nil && s->noseek){
+ io->s = nil;
+ lock(s);
+ BOFF(io->b) = s->soff;
+ s->soff += s->iounit;
+ unlock(s);
+ }
+}
+
+static void
+ioq(Iocom *io, QLock *q)
+{
+ eqlock(q); /* unlocked in iocom() above */
+
+ io->p = up;
+ io->q = q;
+ io->s = nil;
+ io->b = nil;
+
+ up->iocomarg = io;
+ up->iocomfun = iocom;
+}
+
+static void
+streamreader(void *arg)
+{
+ Stream *s = arg;
+ Iocom io;
+ Chan *f;
+ Block *b, *l, **ll;
+ vlong o;
+ int id, n;
+
+ id = incref(&s->nrp) % nelem(s->rp);
+ s->rp[id] = up;
+
+ f = s->f;
+ b = sblock(s);
+ qlock(&s->rql);
+ if(waserror()){
+ qhangup(s->rq, up->errstr);
+ goto Done;
+ }
+ if(s->noseek == -1){
+ BOFF(b) = 0;
+ n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
+
+ if(n > 0){
+ b->wp += n;
+ b->flag |= BDONE;
+ b->next = nil;
+ s->rlist = b;
+ s->soff = s->iounit;
+ s->roff = 0;
+ s->noseek = 1;
+
+ b = sblock(s);
+ } else {
+ s->noseek = 0;
+ }
+ }
+ while(!qisclosed(s->rq)) {
+ ll = &s->rlist;
+ while((l = *ll) != nil){
+ if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
+ if(s->noseek){
+ ll = &l->next;
+ continue;
+ }
+ break;
+ }
+ if((l->flag & BERROR) != 0)
+ error((char*)l->rp);
+ if(BLEN(l) == 0){
+ qhangup(s->rq, nil);
+ poperror();
+ goto Done;
+ }
+ s->roff += s->noseek ? s->iounit : BLEN(l);
+ *ll = l->next;
+ l->next = nil;
+ qbwrite(s->rq, l);
+ }
+
+ n = s->iounit;
+ o = s->roff;
+ l = s->rlist;
+ if(s->noseek) {
+ o = 0;
+ b->next = l;
+ s->rlist = b;
+ } else if(l == nil) {
+ b->next = nil;
+ s->rlist = b;
+ } else {
+ if(o < BOFF(l)){
+ n = BOFF(l) - o;
+ b->next = l;
+ s->rlist = b;
+ } else {
+ for(;; l = l->next){
+ if((l->flag & BDONE) != 0 && BLEN(l) == 0)
+ goto Done;
+ o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
+ if(l->next == nil)
+ break;
+ if(o < BOFF(l->next)){
+ n = BOFF(l->next) - o;
+ break;
+ }
+ }
+ b->next = l->next;
+ l->next = b;
+ }
+ }
+ BOFF(b) = o;
+ qunlock(&s->rql);
+
+ if(waserror()){
+ poperror();
+ goto Exit;
+ }
+ ioq(&io, &s->rcl);
+ io.b = b;
+ io.s = s;
+ if(waserror()){
+ strncpy((char*)b->wp, up->errstr, s->iounit-1);
+ b->wp[s->iounit-1] = 0;
+ n = -1;
+ } else {
+ n = devtab[f->type]->read(f, b->wp, n, o);
+ if(n < 0)
+ error(Eio);
+ poperror();
+ }
+ iocom(&io, 1);
+ poperror();
+
+ l = b;
+ b = sblock(s);
+ qlock(&s->rql);
+ if(n >= 0)
+ l->wp += n;
+ else
+ l->flag |= BERROR;
+ l->flag |= BDONE;
+ }
+ poperror();
+Done:
+ qunlock(&s->rql);
+ freeb(b);
+Exit:
+ s->rp[id] = nil;
+ putstream(s);
+ pexit("closed", 1);
+}
+
+static void
+streamwriter(void *arg)
+{
+ Stream *s = arg;
+ Iocom io;
+ Block *b;
+ Chan *f;
+ vlong o;
+ int id, n;
+
+ id = incref(&s->nwp) % nelem(s->wp);
+ s->wp[id] = up;
+
+ f = s->f;
+ while(!qisclosed(s->wq)) {
+ if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
+ wakeup(&s->wz); /* queue drained */
+ if(waserror()){
+ decref(&s->nwq);
+ break;
+ }
+ ioq(&io, &s->wcl);
+ b = qbread(s->wq, s->iounit);
+ decref(&s->nwq);
+ if(b == nil){
+ iocom(&io, 1);
+ break;
+ }
+ poperror();
+
+ if(waserror()){
+ qhangup(s->wq, up->errstr);
+ iocom(&io, 1);
+ freeb(b);
+ break;
+ }
+ n = BLEN(b);
+ o = s->woff;
+ s->woff += n;
+ if(devtab[f->type]->write(f, b->rp, n, o) != n)
+ error(Eio);
+ iocom(&io, 1);
+ freeb(b);
+ poperror();
+ }
+
+ s->wp[id] = nil;
+ wakeup(&s->wz);
+
+ putstream(s);
+ pexit("closed", 1);
+}
+
+static int
+streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
+{
+ static int perm[] = { 0400, 0200, 0600, 0 };
+ Fgrp *fgrp = up->fgrp;
+ Chan *f;
+ Qid q;
+
+ if(s == DEVDOTDOT){
+ devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
+ return 1;
+ }
+ if(s == 0)
+ return 0;
+ s--;
+ if(s > fgrp->maxfd)
+ return -1;
+ if((f=fgrp->fd[s]) == nil)
+ return 0;
+ sprint(up->genbuf, "%dstream", s);
+ mkqid(&q, s+1, 0, QTFILE);
+ devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
+ return 1;
+}
+
+static Chan*
+streamattach(char *spec)
+{
+ return devattach(L'¶', spec);
+}
+
+static Walkqid*
+streamwalk(Chan *c, Chan *nc, char **name, int nname)
+{
+ return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
+}
+
+static int
+streamstat(Chan *c, uchar *db, int n)
+{
+ return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
+}
+
+static Chan*
+streamopen(Chan *c, int omode)
+{
+ Stream *s;
+
+ c->aux = nil;
+ if(c->qid.type & QTDIR){
+ if(omode != 0)
+ error(Eisdir);
+ c->mode = 0;
+ c->flag |= COPEN;
+ c->offset = 0;
+ return c;
+ }
+ s = mallocz(sizeof(*s), 1);
+ if(s == nil)
+ error(Enomem);
+ incref(s);
+ if(waserror()){
+ putstream(s);
+ nexterror();
+ }
+ omode = openmode(omode);
+ s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
+ if(s->f == nil || s->f->qid.type != QTFILE)
+ error(Eperm);
+ s->noseek = -1;
+ s->roff = s->f->offset;
+ s->woff = s->f->offset;
+ s->iounit = s->f->iounit;
+ if(s->iounit <= 0 || s->iounit > qiomaxatomic)
+ s->iounit = qiomaxatomic;
+ c->iounit = s->iounit;
+ c->aux = s;
+ c->mode = omode;
+ c->flag |= COPEN;
+ c->offset = 0;
+ poperror();
+ return c;
+}
+
+static int
+isdrained(void *a)
+{
+ Stream *s;
+ int i;
+
+ s = a;
+ if(s->wq == nil)
+ return 1;
+
+ if(qisclosed(s->wq) == 0)
+ return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
+
+ for(i=0; i<nelem(s->wp); i++)
+ if(s->wp[i] != nil)
+ return 0;
+
+ return 1;
+}
+
+static void
+streamdrain(Chan *c)
+{
+ Stream *s;
+
+ if((s = c->aux) == nil)
+ return;
+ eqlock(&s->wql);
+ if(waserror()){
+ qunlock(&s->wql);
+ nexterror();
+ }
+ while(!streamdrained(s))
+ sleep(&s->wz, isdrained, s);
+ qunlock(&s->wql);
+ poperror();
+}
+
+static void
+streamclose(Chan *c)
+{
+ Stream *s;
+ int i;
+
+ if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
+ return;
+ if(s->rq != nil){
+ qclose(s->rq);
+ for(i=0; i<nelem(s->rp); i++)
+ postnote(s->rp[i], 1, "streamclose", 0);
+ }
+ if(s->wq != nil){
+ qhangup(s->wq, nil);
+ if(!waserror()){
+ streamdrain(c);
+ poperror();
+ }
+ qclose(s->wq); /* discard the data */
+ for(i=0; i<nelem(s->wp); i++)
+ postnote(s->wp[i], 1, "streamclose", 0);
+ }
+ c->aux = nil;
+ putstream(s);
+}
+
+static int
+canpipeline(Chan *f, int mode)
+{
+ USED(mode);
+
+ return devtab[f->type]->dc == 'M';
+}
+
+static Queue*
+streamqueue(Chan *c, int mode)
+{
+ Stream *s;
+ int i, n;
+
+ s = c->aux;
+ if(s == nil || c->qid.type != QTFILE)
+ error(Eperm);
+
+ switch(mode){
+ case OREAD:
+ while(s->rq == nil){
+ qlock(&s->rql);
+ if(s->rq != nil){
+ qunlock(&s->rql);
+ break;
+ }
+ s->rq = qopen(conf.pipeqsize, 0, 0, 0);
+ if(s->rq == nil){
+ qunlock(&s->rql);
+ error(Enomem);
+ }
+ n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
+ for(i=0; i<n; i++){
+ incref(s);
+ kproc("streamreader", streamreader, s);
+ }
+ while(s->nrp.ref != n)
+ sched();
+ qunlock(&s->rql);
+ break;
+ }
+ return s->rq;
+ case OWRITE:
+ while(s->wq == nil){
+ qlock(&s->wql);
+ if(s->wq != nil){
+ qunlock(&s->wql);
+ break;
+ }
+ s->wq = qopen(conf.pipeqsize, 0, 0, 0);
+ if(s->wq == nil){
+ qunlock(&s->wql);
+ error(Enomem);
+ }
+ n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
+ for(i=0; i<n; i++){
+ incref(s);
+ kproc("streamwriter", streamwriter, s);
+ }
+ while(s->nwp.ref != n)
+ sched();
+ qunlock(&s->wql);
+ break;
+ }
+ return s->wq;
+ }
+ error(Egreg);
+ return nil;
+}
+
+static long
+streamread(Chan *c, void *va, long n, vlong)
+{
+ if(c->qid.type == QTDIR)
+ return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
+ return qread(streamqueue(c, OREAD), va, n);
+}
+
+static Block*
+streambread(Chan *c, long n, ulong)
+{
+ return qbread(streamqueue(c, OREAD), n);
+}
+
+static long
+streamwrite(Chan *c, void *va, long n, vlong)
+{
+ if(n == 0)
+ streamdrain(c);
+ return qwrite(streamqueue(c, OWRITE), va, n);
+}
+
+static long
+streambwrite(Chan *c, Block *b, ulong)
+{
+ if(BLEN(b) == 0)
+ streamdrain(c);
+ return qbwrite(streamqueue(c, OWRITE), b);
+}
+
+Dev streamdevtab = {
+ L'¶',
+ "stream",
+
+ devreset,
+ devinit,
+ devshutdown,
+ streamattach,
+ streamwalk,
+ streamstat,
+ streamopen,
+ devcreate,
+ streamclose,
+ streamread,
+ streambread,
+ streamwrite,
+ streambwrite,
+ devremove,
+ devwstat,
+};
--- a/sys/src/9/port/portdat.h
+++ b/sys/src/9/port/portdat.h
@@ -755,7 +755,11 @@
* machine specific MMU
*/
PMMU;
+
char *syscalltrace; /* syscall trace */
+
+ void *iocomarg; /* I/O completion callback for pipelining */
+ void (*iocomfun)(void*, int);
};
enum