ref: fc145e25fb17b0a94e9012334bd2c70cf3d8dd00
dir: /mq.c/
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
typedef struct Mq Mq;
typedef struct Rd Rd;
typedef struct Msg Msg;
typedef struct Aux Aux;
enum {
Qroot,
Qmq,
};
enum {
KiB = 1024,
MiB = 1024*KiB,
GiB = 1024*MiB,
};
struct Aux {
Mq *q;
int id;
int ntail;
};
struct Msg {
Ref;
Msg *next;
int count;
char *data;
char buf[];
};
struct Rd {
int id;
int off;
Msg *hd;
Msg *tl;
Req *wait;
};
struct Mq {
Ref;
Qid qid;
int moribund;
usize logsz;
Msg *loghd;
Msg *logtl;
Msg *hd;
Msg *tl;
int nrd;
Rd *rd;
char *name;
char *user;
char *group;
int mode;
};
Mq **queues;
int nqueues;
vlong maxlog = -1;
vlong queueid = 0;
int coalesce;
char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
char Eexist[] = "file already exists";
char Enoexist[] = "file does not exist";
char Eintr[] = "interrupted";
char Enotdir[] = "not a directory";
char Ebadcmd[] = "unknown command";
char Enomem[] = "out of memory";
#define QTYPE(p) ((int)((p) & 0x3))
#define QIDX(p) ((p)>>2)
#define QPATH(i, t) ((i)<<2 | (t))
void *
emalloc(ulong n)
{
void *v;
v = mallocz(n, 1);
if(v == nil)
sysfatal("malloc: %r");
setmalloctag(v, getcallerpc(&n));
return v;
}
void *
erealloc(void *p, ulong n)
{
void *v;
v = realloc(p, n);
if(v == nil)
sysfatal("realloc: %r");
setmalloctag(v, getcallerpc(&p));
return v;
}
char*
estrdup(char *s)
{
s = strdup(s);
if(s == nil)
sysfatal("strdup: %r");
setmalloctag(s, getcallerpc(&s));
return s;
}
void
trimlog(Mq *q)
{
Msg *m;
if(maxlog < 0)
return;
while(q->loghd != nil && q->logsz >= maxlog){
m = q->loghd;
q->loghd = m->next;
q->logsz -= m->count;
if(decref(m) == 0)
free(m);
}
if(q->loghd == nil)
q->logtl = nil;
}
int
subscribe(Mq *q, vlong ntail)
{
Msg *m;
Rd *rd;
vlong sz;
int i;
rd = nil;
for(i = 0; i < q->nrd; i++){
if(q->rd[i].id == -1){
rd = &q->rd[i];
break;
}
}
if(rd == nil){
q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd));
rd = &q->rd[q->nrd - 1];
}
rd->id = i;
rd->wait = nil;
rd->off = 0;
rd->hd = nil;
rd->tl = nil;
sz = q->logsz;
m = q->loghd;
if(ntail != -1)
for(; m != nil && sz > ntail; m = m->next)
sz -= m->count;
rd->hd = m;
for(; m != nil; m = m->next)
incref(m);
rd->tl = m;
return rd->id;
}
Mq*
lookup(char *name)
{
int i;
for(i = 0; i < nqueues; i++)
if(strcmp(queues[i]->name, name) == 0)
return queues[i];
return nil;
}
void
qstat(Dir *d, Mq *q, Aux *a)
{
d->name = estrdup9p(q->name);
d->uid = estrdup9p("glenda");
d->gid = estrdup9p("glenda");
d->muid = estrdup9p("glenda");
d->qid = q->qid;
d->mtime = 0;
d->atime = 0;
d->mode = q->mode;
d->length = q->logsz;
if(a->ntail < d->length)
d->length = a->ntail;
}
int
rootgen(int i, Dir *d, void *a)
{
if(i >= nqueues)
return -1;
qstat(d, queues[i], a);
return 0;
}
char*
mqclone(Fid *old, Fid *new)
{
Aux *o, *n;
o = old->aux;
n = emalloc(sizeof(Aux));
if(o->q != nil){
n->q = o->q;
n->id = subscribe(o->q, o->ntail);
}
n->ntail = o->ntail;
new->aux = n;
return nil;
}
char*
mqwalk1(Fid *f, char *name, Qid *qid)
{
Mq *q;
switch(QTYPE(f->qid.path)){
case Qroot:
if(strcmp(name, "..") == 0){
*qid = f->qid;
return nil;
}else if((q = lookup(name)) != nil){
f->qid = q->qid;
*qid = f->qid;
return nil;
}
return Enoexist;
default:
if(strcmp(name, "..") == 0){
f->qid = (Qid){Qroot, 0, QTDIR};
*qid = f->qid;
return nil;
}
return Enotdir;
}
}
void
mqstat(Req *r)
{
vlong p;
p = r->fid->qid.path;
r->d.uid = estrdup9p("glenda");
r->d.gid = estrdup9p("glenda");
r->d.muid = estrdup9p("glenda");
r->d.qid = r->fid->qid;
r->d.mtime = 0;
r->d.atime = 0;
switch(QTYPE(p)){
case Qroot:
r->d.mode = DMDIR|0755;
break;
default:
r->d.mode = 0644;
incref(queues[QIDX(p)]);
qstat(&r->d, queues[QIDX(p)], r->fid->aux);
decref(queues[QIDX(p)]);
}
respond(r, nil);
}
void
mqflush(Req *r)
{
Req *w;
Aux *a;
if((a = r->oldreq->fid->aux) != nil){
w = a->q->rd[a->id].wait;
if(w != nil)
respond(w, "interrupted");
a->q->rd[a->id].wait = nil;
}
respond(r, nil);
}
void
mqremove(Req *r)
{
vlong path;
int i, o;
path = r->fid->qid.path;
if(QTYPE(path) == Qroot){
respond(r, Ebaduse);
return;
}
o = 0;
for(i = 0; i < nqueues; i++){
if(queues[i]->qid.path == path){
queues[i]->moribund = 1;
continue;
}
queues[o++] = queues[i];
}
nqueues--;
respond(r, nil);
}
void
mqwrite(Req *r)
{
Msg *m;
Req *rr;
Aux *a;
Mq *q;
int i;
if((a = r->fid->aux) == nil){
respond(r, Ebaduse);
return;
}
q = a->q;
m = emalloc(sizeof(Msg) + r->ifcall.count);
m->data = m->buf;
m->count = r->ifcall.count;
memmove(m->data, r->ifcall.data, m->count);
m->next = nil;
for(i = 0; i < q->nrd; i++){
if(q->rd[i].id == -1)
continue;
rr = q->rd[i].wait;
q->rd[i].wait = nil;
if(rr != nil){
rr->ofcall.data = m->data;
rr->ofcall.count = m->count;
if(rr->ifcall.count > m->count)
rr->ofcall.count = m->count;
respond(rr, nil);
if(rr->ofcall.count == m->count)
continue;
m->count -= rr->ofcall.count;
m->data += rr->ofcall.count;
}
if(q->rd[i].hd == nil)
q->rd[i].hd = m;
if(q->rd[i].tl != nil)
q->rd[i].tl->next = m;
q->rd[i].tl = m;
incref(m);
}
if(q->loghd == nil)
q->loghd = m;
if(q->logtl != nil)
q->logtl->next = m;
incref(m);
q->logtl = m;
q->logsz += m->count;
q->logtl = m;
trimlog(q);
q->qid.vers++;
r->ofcall.count = r->ifcall.count;
respond(r, nil);
}
void
mqread(Req *r)
{
char *p, *e, *b, *d;
int n, mcount;
Aux *a;
Msg *m;
Rd *rd;
Mq *q;
if(QTYPE(r->fid->qid.path) == Qroot){
dirread9p(r, rootgen, r->fid->aux);
respond(r, nil);
return;
}
if((a = r->fid->aux) == nil){
respond(r, Ebaduse);
return;
}
q = a->q;
/* no messages: enqueue until next one comes */
if(q->rd[a->id].hd == nil){
q->rd[a->id].wait = r;
return;
}
/* queued messages: pop data off */
rd = &q->rd[a->id];
d = emalloc(r->ifcall.count);
p = d;
e = d + r->ifcall.count;
r->ofcall.data = p;
while(rd->hd != nil && p != e){
m = rd->hd;
b = m->data + rd->off;
n = e - p;
mcount = m->count;
assert(rd->off >= 0 && rd->off <= m->count);
incref(m);
if(n >= m->count - rd->off){
n = m->count - rd->off;
memcpy(p, b, n);
rd->hd = m->next;
rd->off = 0;
if(rd->hd == nil)
rd->tl = nil;
decref(m);
}else{
memcpy(p, b, n);
rd->off += n;
}
p += n;
if(decref(m) == 0)
free(m);
if(!coalesce || n < mcount)
break;
}
assert(r->ofcall.count <= r->ifcall.count);
r->ofcall.count = p - d;
respond(r, nil);
free(d);
}
void
mqcreate(Req *r)
{
Aux *a;
Mq *q;
int m;
if(lookup(r->ifcall.name) != nil){
respond(r, Eexist);
return;
}
m = r->ifcall.mode & OMASK;
q = emalloc(sizeof(Mq));
q->name = estrdup(r->ifcall.name);
q->mode = r->ifcall.mode;
q->qid.path = QPATH(queueid, Qmq);
q->qid.vers = 0;
q->qid.type = QTFILE;
queueid++;
a = r->fid->aux;
assert(a->q == nil);
a->q = q;
if(m == OREAD || m == ORDWR || m == OEXEC)
a->id = subscribe(q, a->ntail);
r->ofcall.qid = q->qid;
r->fid->qid = q->qid;
r->fid->aux = a;
queues = erealloc(queues, ++nqueues*sizeof(Mq*));
queues[nqueues-1] = q;
respond(r, nil);
}
void
mqopen(Req *r)
{
Aux *a;
vlong p;
int m;
a = r->fid->aux;
if(a->q != nil){
respond(r, Einuse);
return;
}
m = r->ifcall.mode & OMASK;
p = r->fid->qid.path;
if(QTYPE(p) != Qroot){
incref(queues[QIDX(p)]);
a->q = queues[QIDX(p)];
if(m == OREAD || m == ORDWR || m == OMASK)
a->id = subscribe(a->q, a->ntail);
}
r->ofcall.qid = r->fid->qid;
respond(r, nil);
}
void
destroyfid(Fid *f)
{
Aux *a;
int m;
a = f->aux;
m = f->omode & OMASK;
if(m != OREAD && m != ORDWR && m != OEXEC)
return;
if(a != nil && a->q != nil)
a->q->rd[a->id].id = -1;
free(a);
}
void
mqattach(Req *r)
{
Aux *a;
char *n, *e;
n = r->ifcall.aname;
a = emalloc(sizeof(Aux));
r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
r->fid->qid = r->ofcall.qid;
r->fid->aux = a;
a->ntail = -1;
if(n != nil && strncmp(n, "tail:", 5) == 0){
a->ntail = strtol(n+5, &e, 0);
while(*e){
switch(*e++){
case 'g': a->ntail *= 1024*1024*1024; break;
case 'm': a->ntail *= 1024*1024; break;
case 'k': a->ntail *= 1024; break;
default:
respond(r, "bad scale");
return;
}
}
}
respond(r, nil);
}
Srv mq = {
.attach=mqattach,
.open=mqopen,
.create=mqcreate,
.read=mqread,
.write=mqwrite,
.remove=mqremove,
.flush=mqflush,
.stat=mqstat,
.walk1=mqwalk1,
.clone=mqclone,
.destroyfid=destroyfid,
};
void
usage(void)
{
fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
exits("usage");
}
void
main(int argc, char **argv)
{
char *srvname, *mntpt, *s, *e;
srvname = "mq";
mntpt = "/mnt/mq";
ARGBEGIN{
case 'd':
chatty9p++;
break;
case 's':
srvname=EARGF(usage());
break;
case 'm':
mntpt = EARGF(usage());
break;
case 'l':
s = EARGF(usage());
maxlog = strtoll(s, &e, 0);
while(*e){
switch(*e++){
case 'k': maxlog *= KiB; break;
case 'K': maxlog *= KiB; break;
case 'm': maxlog *= MiB; break;
case 'M': maxlog *= MiB; break;
case 'g': maxlog *= GiB; break;
case 'G': maxlog *= GiB; break;
default: sysfatal("unknown suffix %c", *e);
}
}
break;
case 'c':
coalesce = 1;
break;
default:
usage();
}ARGEND;
postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
exits(nil);
}