ref: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
dir: /mq.c/
#include <u.h> #include <libc.h> #include <fcall.h> #include <thread.h> #include <9p.h> typedef struct Mq Mq; typedef struct Msg Msg; typedef struct Aux Aux; enum { Qroot, }; struct Aux { Mq *q; int id; }; struct Msg { Ref; Msg *next; int count; char data[]; }; struct Mq { Qid qid; int count; usize logsz; Msg *loghd; Msg *logtl; Msg *hd; Msg *tl; int nrd; int *rd; Msg **rhd; Msg **rtl; Req **wait; char *name; char *user; char *group; int mode; }; Mq **queues; int nqueues; vlong maxlog = -1; vlong queueid = Qroot + 1; char Ebaduse[] = "invalid use of fd"; char Einuse[] = "fid in use"; char Eexist[] = "file already exists"; char Enoexist[] = "file does not exists"; void * emalloc(ulong n) { void *v; v = mallocz(n, 1); if(v == nil) sysfatal("malloc: %r"); setmalloctag(v, getcallerpc(&n)); return v; } void * erealloc(void *p, ulong n) { void *v; v = realloc(p, n); if(v == nil) sysfatal("realloc: %r"); setmalloctag(v, getcallerpc(&p)); return v; } char* estrdup(char *s) { s = strdup(s); if(s == nil) sysfatal("strdup: %r"); setmalloctag(s, getcallerpc(&s)); return s; } Msg* msgref(Msg *m) { incref(m); return m; } void msgunref(Msg *m) { if(decref(m) == 0) free(m); } void trimlog(Mq *q) { Msg *m, *n; if(maxlog < 0) return; n = nil; for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){ n = m->next; q->logsz -= m->count; msgunref(m); } q->loghd = n; if(m == nil) q->logtl = nil; } int subscribe(Mq *q) { Msg *m; int i; for(i = 0; i < q->nrd; i++) if(q->rd[i] != -1) return i; q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd)); q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait)); q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd)); q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl)); q->wait[q->nrd] = nil; q->rhd[q->nrd] = q->loghd; q->rtl[q->nrd] = q->logtl; for(m = q->loghd; m != nil; m = m->next) msgref(m); return q->nrd++; } Mq* lookup(char *name) { int i; for(i = 0; i < nqueues; i++) if(strcmp(queues[i]->name, name) == 0) return queues[i]; return nil; } void qstat(Dir *d, Mq *q) { d->name = estrdup9p(q->name); d->uid = estrdup9p("glenda"); d->gid = estrdup9p("glenda"); d->muid = estrdup9p("glenda"); d->qid = q->qid; d->mtime = 0; d->atime = 0; d->mode = q->mode; } int rootgen(int i, Dir *d, void *) { if(i >= nqueues) return -1; qstat(d, queues[i]); return 0; } char* mqclone(Fid *old, Fid *new) { Aux *o, *n; o = old->aux; if(o != nil){ n = emalloc(sizeof(Aux)); n->q = o->q; n->id = subscribe(o->q); new->aux = n; } return nil; } char* mqwalk1(Fid *f, char *name, Qid *qid) { Mq *q; switch(f->qid.path){ case Qroot: if(strcmp(name, "..") == 0){ *qid = f->qid; return nil; } if((q = lookup(name)) == nil) return Enoexist; f->qid = q->qid; *qid = f->qid; return nil; default: if(strcmp(name, "..") == 0){ f->qid = (Qid){Qroot, 0, QTDIR}; *qid = f->qid; return nil; } return "not a dir"; } } void mqstat(Req *r) { switch(r->fid->qid.path){ case Qroot: r->d.uid = estrdup9p("glenda"); r->d.gid = estrdup9p("glenda"); r->d.muid = estrdup9p("glenda"); r->d.qid = r->fid->qid; r->d.mtime = 0; r->d.atime = 0; r->d.mode = 0755; break; default: qstat(&r->d, ((Aux*)r->fid->aux)->q); } respond(r, nil); } void mqflush(Req *r) { Req *w; Aux *a; if((a = r->oldreq->fid->aux) != nil){ w = a->q->wait[a->id]; if(w != nil) respond(w, "interrupted"); a->q->wait[a->id] = nil; } respond(r, nil); } void mqremove(Req *r) { USED(r); abort(); } void mqwrite(Req *r) { Msg *m; Req *rr; Aux *a; Mq *q; int i; if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; m = emalloc(sizeof(Msg) + r->ifcall.count); m->count = r->ifcall.count; memcpy(m->data, r->ifcall.data, m->count); m->next = nil; for(i = 0; i < q->nrd; i++){ rr = q->wait[i]; if(rr != nil){ rr->ofcall.data = r->ifcall.data; rr->ofcall.count = r->ifcall.count; respond(rr, nil); q->wait[i] = nil; }else{ if(q->rhd[i] == nil) q->rhd[i] = m; if(q->rtl[i] != nil) q->rtl[i]->next = m; q->rtl[i] = m; msgref(m); } } if(q->loghd == nil) q->loghd = m; if(q->logtl != nil) q->logtl->next = m; q->logtl = msgref(m); q->logsz += m->count; q->logtl = m; trimlog(q); q->qid.vers++; r->ofcall.count = r->ifcall.count; respond(r, nil); } void mqread(Req *r) { Aux *a; Msg *m; Mq *q; if(r->fid->qid.path == Qroot){ dirread9p(r, rootgen, nil); respond(r, nil); return; } if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; if(q->rhd[a->id] != nil){ m = q->rhd[a->id]; r->ofcall.data = m->data; r->ofcall.count = m->count; respond(r, nil); q->rhd[a->id] = m->next; if(q->rhd[a->id] == nil) q->rtl[a->id] = nil; msgunref(m); return; } q->wait[a->id] = r; } void mqcreate(Req *r) { Aux *a; Mq *q; int m; if(lookup(r->ifcall.name) != nil){ respond(r, Eexist); return; } m = r->ifcall.mode & OMASK; q = emalloc(sizeof(Mq)); q->name = estrdup(r->ifcall.name); q->mode = r->ifcall.mode; q->qid.path = queueid++; q->qid.vers = 0; q->qid.type = QTFILE; a = emalloc(sizeof(Aux)); a->q = q; if(m == OREAD || m == ORDWR || m == OEXEC) a->id = subscribe(q); r->ofcall.qid = q->qid; r->fid->qid = q->qid; r->fid->aux = a; queues = erealloc(queues, ++nqueues*sizeof(Mq*)); queues[nqueues-1] = q; respond(r, nil); } void mqopen(Req *r) { Aux *a; vlong p; int m; if(r->fid->aux != nil){ respond(r, Einuse); return; } m = r->ifcall.mode & OMASK; p = r->fid->qid.path; a = emalloc(sizeof(Aux)); if(p != Qroot){ a->q = queues[p-1]; if(m == OREAD || m == ORDWR || m == OMASK) a->id = subscribe(a->q); r->fid->aux = a; } r->ofcall.qid = r->fid->qid; respond(r, nil); } void destroyfid(Fid *f) { Aux *a; int m; a = f->aux; m = f->omode & OMASK; if(m != OREAD && m != ORDWR && m != OEXEC) return; if(a != nil) a->q->rd[a->id] = -1; } void mqattach(Req *r) { r->ofcall.qid = (Qid){Qroot, 0, QTDIR}; r->fid->qid = r->ofcall.qid; r->fid->aux = nil; respond(r, nil); } Srv mq = { .attach=mqattach, .open=mqopen, .create=mqcreate, .read=mqread, .write=mqwrite, .remove=mqremove, .flush=mqflush, .stat=mqstat, .walk1=mqwalk1, .clone=mqclone, .destroyfid=destroyfid, }; void usage(void) { fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0); exits("usage"); } void main(int argc, char **argv) { char *srvname, *mntpt; srvname = nil; mntpt = "/mnt/mq"; ARGBEGIN{ case 'd': chatty9p++; break; case 's': srvname=EARGF(usage()); break; case 'm': mntpt = EARGF(usage()); break; case 'r': maxlog = atoi(EARGF(usage())); break; default: usage(); }ARGEND; postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL); }