ref: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
dir: /mq.c/
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
typedef struct Mq Mq;
typedef struct Msg Msg;
typedef struct Aux Aux;
enum {
Qroot,
};
struct Aux {
Mq *q;
int id;
};
struct Msg {
Ref;
Msg *next;
int count;
char data[];
};
struct Mq {
Qid qid;
int count;
usize logsz;
Msg *loghd;
Msg *logtl;
Msg *hd;
Msg *tl;
int nrd;
int *rd;
Msg **rhd;
Msg **rtl;
Req **wait;
char *name;
char *user;
char *group;
int mode;
};
Mq **queues;
int nqueues;
vlong maxlog = -1;
vlong queueid = Qroot + 1;
char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
char Eexist[] = "file already exists";
char Enoexist[] = "file does not exists";
void *
emalloc(ulong n)
{
void *v;
v = mallocz(n, 1);
if(v == nil)
sysfatal("malloc: %r");
setmalloctag(v, getcallerpc(&n));
return v;
}
void *
erealloc(void *p, ulong n)
{
void *v;
v = realloc(p, n);
if(v == nil)
sysfatal("realloc: %r");
setmalloctag(v, getcallerpc(&p));
return v;
}
char*
estrdup(char *s)
{
s = strdup(s);
if(s == nil)
sysfatal("strdup: %r");
setmalloctag(s, getcallerpc(&s));
return s;
}
Msg*
msgref(Msg *m)
{
incref(m);
return m;
}
void
msgunref(Msg *m)
{
if(decref(m) == 0)
free(m);
}
void
trimlog(Mq *q)
{
Msg *m, *n;
if(maxlog < 0)
return;
n = nil;
for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){
n = m->next;
q->logsz -= m->count;
msgunref(m);
}
q->loghd = n;
if(m == nil)
q->logtl = nil;
}
int
subscribe(Mq *q)
{
Msg *m;
int i;
for(i = 0; i < q->nrd; i++)
if(q->rd[i] != -1)
return i;
q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
q->wait[q->nrd] = nil;
q->rhd[q->nrd] = q->loghd;
q->rtl[q->nrd] = q->logtl;
for(m = q->loghd; m != nil; m = m->next)
msgref(m);
return q->nrd++;
}
Mq*
lookup(char *name)
{
int i;
for(i = 0; i < nqueues; i++)
if(strcmp(queues[i]->name, name) == 0)
return queues[i];
return nil;
}
void
qstat(Dir *d, Mq *q)
{
d->name = estrdup9p(q->name);
d->uid = estrdup9p("glenda");
d->gid = estrdup9p("glenda");
d->muid = estrdup9p("glenda");
d->qid = q->qid;
d->mtime = 0;
d->atime = 0;
d->mode = q->mode;
}
int
rootgen(int i, Dir *d, void *)
{
if(i >= nqueues)
return -1;
qstat(d, queues[i]);
return 0;
}
char*
mqclone(Fid *old, Fid *new)
{
Aux *o, *n;
o = old->aux;
if(o != nil){
n = emalloc(sizeof(Aux));
n->q = o->q;
n->id = subscribe(o->q);
new->aux = n;
}
return nil;
}
char*
mqwalk1(Fid *f, char *name, Qid *qid)
{
Mq *q;
switch(f->qid.path){
case Qroot:
if(strcmp(name, "..") == 0){
*qid = f->qid;
return nil;
}
if((q = lookup(name)) == nil)
return Enoexist;
f->qid = q->qid;
*qid = f->qid;
return nil;
default:
if(strcmp(name, "..") == 0){
f->qid = (Qid){Qroot, 0, QTDIR};
*qid = f->qid;
return nil;
}
return "not a dir";
}
}
void
mqstat(Req *r)
{
switch(r->fid->qid.path){
case Qroot:
r->d.uid = estrdup9p("glenda");
r->d.gid = estrdup9p("glenda");
r->d.muid = estrdup9p("glenda");
r->d.qid = r->fid->qid;
r->d.mtime = 0;
r->d.atime = 0;
r->d.mode = 0755;
break;
default:
qstat(&r->d, ((Aux*)r->fid->aux)->q);
}
respond(r, nil);
}
void
mqflush(Req *r)
{
Req *w;
Aux *a;
if((a = r->oldreq->fid->aux) != nil){
w = a->q->wait[a->id];
if(w != nil)
respond(w, "interrupted");
a->q->wait[a->id] = nil;
}
respond(r, nil);
}
void
mqremove(Req *r)
{
USED(r);
abort();
}
void
mqwrite(Req *r)
{
Msg *m;
Req *rr;
Aux *a;
Mq *q;
int i;
if((a = r->fid->aux) == nil){
respond(r, Ebaduse);
return;
}
q = a->q;
m = emalloc(sizeof(Msg) + r->ifcall.count);
m->count = r->ifcall.count;
memcpy(m->data, r->ifcall.data, m->count);
m->next = nil;
for(i = 0; i < q->nrd; i++){
rr = q->wait[i];
if(rr != nil){
rr->ofcall.data = r->ifcall.data;
rr->ofcall.count = r->ifcall.count;
respond(rr, nil);
q->wait[i] = nil;
}else{
if(q->rhd[i] == nil)
q->rhd[i] = m;
if(q->rtl[i] != nil)
q->rtl[i]->next = m;
q->rtl[i] = m;
msgref(m);
}
}
if(q->loghd == nil)
q->loghd = m;
if(q->logtl != nil)
q->logtl->next = m;
q->logtl = msgref(m);
q->logsz += m->count;
q->logtl = m;
trimlog(q);
q->qid.vers++;
r->ofcall.count = r->ifcall.count;
respond(r, nil);
}
void
mqread(Req *r)
{
Aux *a;
Msg *m;
Mq *q;
if(r->fid->qid.path == Qroot){
dirread9p(r, rootgen, nil);
respond(r, nil);
return;
}
if((a = r->fid->aux) == nil){
respond(r, Ebaduse);
return;
}
q = a->q;
if(q->rhd[a->id] != nil){
m = q->rhd[a->id];
r->ofcall.data = m->data;
r->ofcall.count = m->count;
respond(r, nil);
q->rhd[a->id] = m->next;
if(q->rhd[a->id] == nil)
q->rtl[a->id] = nil;
msgunref(m);
return;
}
q->wait[a->id] = r;
}
void
mqcreate(Req *r)
{
Aux *a;
Mq *q;
int m;
if(lookup(r->ifcall.name) != nil){
respond(r, Eexist);
return;
}
m = r->ifcall.mode & OMASK;
q = emalloc(sizeof(Mq));
q->name = estrdup(r->ifcall.name);
q->mode = r->ifcall.mode;
q->qid.path = queueid++;
q->qid.vers = 0;
q->qid.type = QTFILE;
a = emalloc(sizeof(Aux));
a->q = q;
if(m == OREAD || m == ORDWR || m == OEXEC)
a->id = subscribe(q);
r->ofcall.qid = q->qid;
r->fid->qid = q->qid;
r->fid->aux = a;
queues = erealloc(queues, ++nqueues*sizeof(Mq*));
queues[nqueues-1] = q;
respond(r, nil);
}
void
mqopen(Req *r)
{
Aux *a;
vlong p;
int m;
if(r->fid->aux != nil){
respond(r, Einuse);
return;
}
m = r->ifcall.mode & OMASK;
p = r->fid->qid.path;
a = emalloc(sizeof(Aux));
if(p != Qroot){
a->q = queues[p-1];
if(m == OREAD || m == ORDWR || m == OMASK)
a->id = subscribe(a->q);
r->fid->aux = a;
}
r->ofcall.qid = r->fid->qid;
respond(r, nil);
}
void
destroyfid(Fid *f)
{
Aux *a;
int m;
a = f->aux;
m = f->omode & OMASK;
if(m != OREAD && m != ORDWR && m != OEXEC)
return;
if(a != nil)
a->q->rd[a->id] = -1;
}
void
mqattach(Req *r)
{
r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
r->fid->qid = r->ofcall.qid;
r->fid->aux = nil;
respond(r, nil);
}
Srv mq = {
.attach=mqattach,
.open=mqopen,
.create=mqcreate,
.read=mqread,
.write=mqwrite,
.remove=mqremove,
.flush=mqflush,
.stat=mqstat,
.walk1=mqwalk1,
.clone=mqclone,
.destroyfid=destroyfid,
};
void
usage(void)
{
fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
exits("usage");
}
void
main(int argc, char **argv)
{
char *srvname, *mntpt;
srvname = nil;
mntpt = "/mnt/mq";
ARGBEGIN{
case 'd':
chatty9p++;
break;
case 's':
srvname=EARGF(usage());
break;
case 'm':
mntpt = EARGF(usage());
break;
case 'r':
maxlog = atoi(EARGF(usage()));
break;
default:
usage();
}ARGEND;
postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
}