ref: d8035d7ee34c4f800509bb0680ad678fef612f99
dir: /src/mq.c/
#include <u.h> #include <libc.h> #include <fcall.h> #include <thread.h> #include <9p.h> #include "list.h" #include "util.h" typedef struct Group Group; typedef struct Stream Stream; typedef struct Client Client; typedef struct Read Read; typedef struct Write Write; struct Read { Listelem; Req *req; }; struct Write { Listelem; /* Twrite.ifcall */ vlong offset; uint count; uchar *data; }; struct Stream { Listelem; Group *group; Write *wqueue; Read *rqueue; }; struct Group { Stream *streams; Stream *order; enum {Message, Coalesce} mode; enum {Replayoff, Replaylast, Replayall} replay; }; struct Client { Write *cursor; vlong offset; int blocked; }; enum { /* Dirty trick to help clients tell our * root from most others, see pin(1). */ Qroot = 0xA, Qgroup = 0x1, Qstream, Qorder, Qctl, }; void filesettype(File *f, ushort type) { /* * Use four most-significant bits to store the type. * This depends on the 9pfile(2) library generating * simple incremental qid paths. */ f->qid.path &= ~((uvlong)0xF<<60); f->qid.path |= (uvlong)(type&0xF)<<60; } ushort filetype(File *f) { return (f->qid.path>>60)&0xF; } File* groupcreate(File *dir, char *name, char *uid, ulong perm) { Stream *streamalloc(Group*); void *streamclose(Stream*); File *d, *ctl, *order; Group *group; group = emalloc(sizeof(Group)); group->streams = (Stream*)listinit(emalloc(sizeof(Stream))); group->order = streamalloc(group); group->mode = Message; group->replay = Replayoff; ctl = order = nil; if(strcmp(name, "/") == 0){ d = dir; d->aux = group; } else d = createfile(dir, name, uid, perm, group); if(d == nil) goto err; filesettype(d, Qgroup); if((ctl = createfile(d, "ctl", nil, 0664, group)) == nil) goto err; filesettype(ctl, Qctl); closefile(ctl); if((order = createfile(d, "order", nil, 0444, group->order)) == nil) goto err; filesettype(order, Qorder); closefile(order); return d; err: streamclose(group->order); if(d) closefile(d); if(ctl) closefile(ctl); if(order) closefile(order); return nil; } void groupclose(Group *g) { free(g); } Stream* streamalloc(Group *g) { Stream *s; s = emalloc(sizeof(Stream)); s->group = g; s->wqueue = (Write*)listinit(emalloc(sizeof(Write))); s->rqueue = (Read*)listinit(emalloc(sizeof(Read))); return s; } void streamclose(Stream *s) { Read *r; Write *w; listeach(Read*, s->rqueue, r){ listunlink(r); free(r); } free(s->rqueue); listeach(Write*, s->wqueue, w){ listunlink(w); free(w); } free(s->wqueue); listunlink(s); free(s); } File* streamcreate(File *dir, char *name, char *uid, ulong perm) { File *f; Group *group; Stream *s; group = dir->aux; s = streamalloc(group); if((f = createfile(dir, name, uid, perm, s)) == nil){ streamclose(s); return nil; } filesettype(f, Qstream); listlink(group->streams, s); return f; } void streamopen(Stream *s, Req *req) { Client *c; c = req->fid->aux = emalloc(sizeof(Client)); switch(s->group->replay){ case Replayoff: c->offset = 0; c->blocked = 1; c->cursor = nil; break; case Replayall: c->offset = 0; if(listisempty(s->wqueue)){ c->blocked = 1; c->cursor = nil; }else{ c->blocked = 0; c->cursor = s->wqueue->front; } break; case Replaylast: c->offset = 0; if(listisempty(s->wqueue)){ c->blocked = 1; c->cursor = nil; }else{ c->blocked = 0; c->cursor = s->wqueue->back; } break; } } void streamrespond(Req *req, int mode) { Client *c = req->fid->aux; Stream *s = req->fid->file->aux; Write *w; /* request size, response buffer offset */ vlong rn, ro; /* chunk size and offset, total read */ vlong n, o, t; t = 0; rn = req->ifcall.count; ro = 0; w = c->cursor; o = c->offset; listrange(Write*, s->wqueue, w){ if(mode == Message && w != c->cursor) break; for(; n = w->count - o, n > 0; o += n, ro += n, t += n){ if(t == rn) goto done; if(n > rn - ro) n = rn - ro; memmove(req->ofcall.data+ro, w->data+o, n); } o = 0; } done: req->ofcall.count = t; respond(req, nil); /* Determine the Client state */ if(w == s->wqueue){ c->offset = 0; c->blocked = 1; c->cursor = nil; return; } c->offset = o; c->blocked = 0; c->cursor = w; } void streamread(Req *req) { Client *c = req->fid->aux; Stream *s = req->fid->file->aux; Read *r; if(c->blocked){ r = emalloc(sizeof(Read)); r->req = req; listlink(s->rqueue, r); return; } streamrespond(req, s->group->mode); } Write* writealloc(long n) { Write *w; w = emalloc(sizeof(Write)+n); w->data = (uchar*)&w[1]; return w; } void streamwrite(Req *req) { File *f = req->fid->file; Stream *s = req->fid->file->aux; Group *group = s->group; Write *w, *wq, *o, *oq; Read *r; Client *c; long n; wq = s->wqueue; oq = group->order->wqueue; /* Commit to queue */ w = writealloc(req->ifcall.count); w->count = req->ifcall.count; w->offset = req->ifcall.offset; memmove(w->data, req->ifcall.data, w->count); listlink(wq->back, w); /* Commit to group order queue */ n = strlen(f->name)+1; o = writealloc(n); o->offset = 0; o->count = n; memmove(o->data, f->name, n); listlink(oq->back, o); /* Kick the blocked stream readers */ listeach(Read*, s->rqueue, r){ c = r->req->fid->aux; c->cursor = w; c->offset = 0; c->blocked = 0; streamrespond(r->req, group->mode); listunlink(r); free(r); } /* Kick the blocked order readers */ listeach(Read*, group->order->rqueue, r){ c = r->req->fid->aux; c->cursor = o; c->offset = 0; c->blocked = 0; streamrespond(r->req, Message); listunlink(r); free(r); } req->ofcall.count = req->ifcall.count; respond(req, nil); } void ctlread(Req *req) { Group *group = req->fid->file->aux; char buf[256]; char *mode2str[] = { [Message] "message", [Coalesce] "coalesce", }; char *replay2str[] = { [Replayoff] "off", [Replaylast] "last", [Replayall] "all", }; snprint(buf, sizeof buf, "data %s\nreplay %s\n", mode2str[group->mode], replay2str[group->replay]); readstr(req, buf); respond(req, nil); } enum { Cmddata, Cmdreplay, Cmddebug, Cmddebug9p, }; Cmdtab groupcmd[] = { /* data message|coalesce */ {Cmddata, "data", 2}, /* replay off|last|all */ {Cmdreplay, "replay", 2}, /* debug on|off */ {Cmddebug, "debug", 2}, /* debug9p on|off */ {Cmddebug9p, "debug9p", 2}, }; void ctlwrite(Req *req) { Group *group = req->fid->file->aux; char *e = nil; Cmdbuf *cmd; Cmdtab *t; cmd = parsecmd(req->ifcall.data, req->ifcall.count); t = lookupcmd(cmd, groupcmd, nelem(groupcmd)); if(t == nil){ respondcmderror(req, cmd, "%r"); free(cmd); return; } switch(t->index){ case Cmddata: { if(strncmp(cmd->f[1], "message", 7) == 0) group->mode = Message; else if(strncmp(cmd->f[1], "coalesce", 8) == 0) group->mode = Coalesce; else e = "usage: data message|coalesce"; break; } case Cmdreplay: { if(strncmp(cmd->f[1], "off", 3) == 0) group->replay = Replayoff; else if(strncmp(cmd->f[1], "last", 4) == 0) group->replay = Replaylast; else if(strncmp(cmd->f[1], "all", 3) == 0) group->replay = Replayall; else e = "usage: replay off|last|all"; break; } case Cmddebug: { if(strncmp(cmd->f[1], "on", 2) == 0) DEBUG = 1; else if(strncmp(cmd->f[1], "off", 3) == 0) DEBUG = 0; else e = "usage: debug on|off"; break; } case Cmddebug9p: { if(strncmp(cmd->f[1], "on", 2) == 0) chatty9p = 1; else if(strncmp(cmd->f[1], "off", 3) == 0) chatty9p = 0; else e = "usage: debug9p on|off"; break; }} free(cmd); respond(req, e); } void xcreate(Req *req) { char *name = req->ifcall.name; char *uid = req->fid->uid; ulong perm = req->ifcall.perm; File *group = req->fid->file; File *f = nil; switch(filetype(group)){ case Qroot: case Qgroup: if(perm&DMDIR) f = groupcreate(group, name, uid, perm); else{ f = streamcreate(group, name, uid, perm); req->fid->file = f; req->ofcall.qid = f->qid; streamopen(f->aux, req); } break; } if(f == nil) respond(req, "internal failure"); else respond(req, nil); } void xopen(Req *req) { File *f = req->fid->file; switch(filetype(f)){ case Qstream: case Qorder: streamopen(f->aux, req); break; } respond(req, nil); } void xwrite(Req *req) { File *f = req->fid->file; switch(filetype(f)){ case Qstream: streamwrite(req); break; case Qctl: ctlwrite(req); break; default: respond(req, "forbidden"); return; } } void xread(Req *req) { File *f = req->fid->file; switch(filetype(f)){ case Qstream: case Qorder: streamread(req); break; case Qctl: ctlread(req); break; default: respond(req, "forbidden"); } } void xflush(Req *req) { Req *old = req->oldreq; File *f = old->fid->file; Read *r; switch(filetype(f)){ case Qstream: case Qorder: { Stream *s = f->aux; if(old->ifcall.type != Tread) break; listeach(Read*, s->rqueue, r){ if(r->req == old){ listunlink(r); free(r); break; } } respond(old, "interrupted"); }} respond(req, nil); } void xwstat(Req *req) { File *w, *f = req->fid->file; char *uid = req->fid->uid; /* To change name, must have write permission in group. */ if(req->d.name[0] != '\0' && strcmp(req->d.name, f->name) != 0){ if((w = f->parent) == nil) goto perm; incref(w); if(!hasperm(w, uid, AWRITE)){ closefile(w); goto perm; } if((w = walkfile(w, req->d.name)) != nil){ closefile(w); respond(req, "file already exists"); return; } } /* To change group, must be owner and member of new group, * or leader of current group and leader of new group. * Second case cannot happen, but we check anyway. */ while(req->d.gid[0] != '\0' && strcmp(f->gid, req->d.gid) != 0){ if(strcmp(uid, f->uid) == 0) break; if(strcmp(uid, f->gid) == 0) if(strcmp(uid, req->d.gid) == 0) break; respond(req, "not owner"); return; } /* To change mode, must be owner or group leader. * Because of lack of users file, leader=>group itself. */ if(req->d.mode != ~0 && f->mode != req->d.mode){ if(strcmp(uid, f->uid) != 0) if(strcmp(uid, f->gid) != 0){ respond(req, "not owner"); return; } } if(req->d.name[0] != '\0'){ free(f->name); f->name = estrdup(req->d.name); } if(req->d.uid[0] != '\0'){ free(f->uid); f->uid = estrdup(req->d.uid); } if(req->d.gid[0] != '\0'){ free(f->gid); f->gid = estrdup(req->d.gid); } if(req->d.mode != ~0){ f->mode = req->d.mode; f->qid.type = f->mode >> 24; } respond(req, nil); return; perm: respond(req, "permission denied"); } void xdestroyfid(Fid *fid) { Client *f = fid->aux; free(f); } void xdestroyfile(File *f) { switch(filetype(f)){ case Qgroup: groupclose(f->aux); break; case Qstream: streamclose(f->aux); break; } return; } Srv fs = { .create = xcreate, .open = xopen, .write = xwrite, .read = xread, .flush = xflush, .wstat = xwstat, .destroyfid = xdestroyfid, }; void usage(void) { fprint(2, "usage: %s [-D] [-s name] [-m mtpt]\n", argv0); exits("usage"); } void main(int argc, char *argv[]) { char *name = nil; char *mtpt = nil; ARGBEGIN{ case 's': name = EARGF(usage()); break; case 'm': mtpt = EARGF(usage()); break; case 'D': chatty9p++; break; default: usage(); }ARGEND; fs.tree = alloctree(nil, nil, DMDIR|0775, xdestroyfile); groupcreate(fs.tree->root, "/", nil, 0); filesettype(fs.tree->root, Qroot); if(name || mtpt){ postmountsrv(&fs, name, mtpt, MREPL|MCREATE); exits(nil); } fs.infd = fs.outfd = 0; dup(2, 1); srv(&fs); exits(nil); }