shithub: mq

ref: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
dir: /mq.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

typedef struct Mq Mq;
typedef struct Msg Msg;
typedef struct Aux Aux;

enum {
	Qroot,
};

struct Aux {
	Mq	*q;
	int	id;
};

struct Msg {
	Ref;
	Msg	*next;
	int	count;
	char	data[];
};

struct Mq {
	Qid	qid;
	int	count;
	usize	logsz;
	Msg	*loghd;
	Msg	*logtl;
	Msg	*hd;
	Msg	*tl;

	int	nrd;
	int	*rd;
	Msg	**rhd;
	Msg	**rtl;
	Req	**wait;

	char	*name;
	char	*user;
	char	*group;
	int	mode;
};

Mq	**queues;
int	nqueues;
vlong	maxlog = -1;
vlong	queueid = Qroot + 1;

char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
char Eexist[] = "file already exists";
char Enoexist[] = "file does not exists";

void *
emalloc(ulong n)
{
	void *v;
	
	v = mallocz(n, 1);
	if(v == nil)
		sysfatal("malloc: %r");
	setmalloctag(v, getcallerpc(&n));
	return v;
}

void *
erealloc(void *p, ulong n)
{
	void *v;
	
	v = realloc(p, n);
	if(v == nil)
		sysfatal("realloc: %r");
	setmalloctag(v, getcallerpc(&p));
	return v;
}

char*
estrdup(char *s)
{
	s = strdup(s);
	if(s == nil)
		sysfatal("strdup: %r");
	setmalloctag(s, getcallerpc(&s));
	return s;
}

Msg*
msgref(Msg *m)
{
	incref(m);
	return m;
}

void
msgunref(Msg *m)
{
	if(decref(m) == 0)
		free(m);
}

void
trimlog(Mq *q)
{
	Msg *m, *n;

	if(maxlog < 0)
		return;
	n = nil;
	for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){
		n = m->next;
		q->logsz -= m->count;
		msgunref(m);
	}
	q->loghd = n;
	if(m == nil)
		q->logtl = nil;
}

int
subscribe(Mq *q)
{
	Msg *m;
	int i;

	for(i = 0; i < q->nrd; i++)
		if(q->rd[i] != -1)
			return i;
	q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
	q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
	q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
	q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
	q->wait[q->nrd] = nil;
	q->rhd[q->nrd] = q->loghd;
	q->rtl[q->nrd] = q->logtl;
	for(m = q->loghd; m != nil; m = m->next)
		msgref(m);
	return q->nrd++;
}

Mq*
lookup(char *name)
{
	int i;

	for(i = 0; i < nqueues; i++)
		if(strcmp(queues[i]->name, name) == 0)
			return queues[i];
	return nil;
}

void
qstat(Dir *d, Mq *q)
{
	d->name = estrdup9p(q->name);
	d->uid = estrdup9p("glenda");
	d->gid = estrdup9p("glenda");
	d->muid = estrdup9p("glenda");
	d->qid = q->qid;
	d->mtime = 0;
	d->atime = 0;
	d->mode = q->mode;
}

int
rootgen(int i, Dir *d, void *)
{
	if(i >= nqueues)
		return -1;
	qstat(d, queues[i]);
	return 0;
}
	

char*
mqclone(Fid *old, Fid *new)
{
	Aux *o, *n;

	o = old->aux;
	if(o != nil){
		n = emalloc(sizeof(Aux));
		n->q = o->q;
		n->id = subscribe(o->q);
		new->aux = n;
	}
	return nil;
}

char*
mqwalk1(Fid *f, char *name, Qid *qid)
{
	Mq *q;

	switch(f->qid.path){
	case Qroot:
		if(strcmp(name, "..") == 0){
			*qid = f->qid;
			return nil;
		}
		if((q = lookup(name)) == nil)
			return Enoexist;
		f->qid = q->qid;
		*qid = f->qid;
		return nil;
	default:
		if(strcmp(name, "..") == 0){
			f->qid = (Qid){Qroot, 0, QTDIR};
			*qid = f->qid;
			return nil;
		}
		return "not a dir";
	}	
}

void
mqstat(Req *r)
{
	switch(r->fid->qid.path){
	case Qroot:
		r->d.uid = estrdup9p("glenda");
		r->d.gid = estrdup9p("glenda");
		r->d.muid = estrdup9p("glenda");
		r->d.qid = r->fid->qid;
		r->d.mtime = 0;
		r->d.atime = 0;
		r->d.mode = 0755;
		break;
	default:
		qstat(&r->d, ((Aux*)r->fid->aux)->q);
	}
	respond(r, nil);
}

void
mqflush(Req *r)
{
	Req *w;
	Aux *a;

	if((a = r->oldreq->fid->aux) != nil){
		w = a->q->wait[a->id];
		if(w != nil)
			respond(w, "interrupted");
		a->q->wait[a->id] = nil;
	}
	respond(r, nil);
}

void
mqremove(Req *r)
{
	USED(r);
	abort();
}

void
mqwrite(Req *r)
{
	Msg *m;
	Req *rr;
	Aux *a;
	Mq *q;
	int i;

	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;
	m = emalloc(sizeof(Msg) + r->ifcall.count);
	m->count = r->ifcall.count;
	memcpy(m->data, r->ifcall.data, m->count);
	m->next = nil;
	for(i = 0; i < q->nrd; i++){
		rr = q->wait[i];
		if(rr != nil){
			rr->ofcall.data = r->ifcall.data;
			rr->ofcall.count = r->ifcall.count;
			respond(rr, nil);
			q->wait[i] = nil;
		}else{
			if(q->rhd[i] == nil)
				q->rhd[i] = m;
			if(q->rtl[i] != nil)
				q->rtl[i]->next = m;
			q->rtl[i] = m;
			msgref(m);
		}
	}
	if(q->loghd == nil)
		q->loghd = m;
	if(q->logtl != nil)
		q->logtl->next = m;
	q->logtl = msgref(m);
	q->logsz += m->count;
	q->logtl = m;

	trimlog(q);
	q->qid.vers++;
	r->ofcall.count = r->ifcall.count;
	respond(r, nil);
}

void
mqread(Req *r)
{
	Aux *a;
	Msg *m;
	Mq *q;

	if(r->fid->qid.path == Qroot){
		dirread9p(r, rootgen, nil);
		respond(r, nil);
		return;
	}
	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;
	if(q->rhd[a->id] != nil){
		m = q->rhd[a->id];
		r->ofcall.data = m->data;
		r->ofcall.count = m->count;
		respond(r, nil);
		q->rhd[a->id] = m->next;
		if(q->rhd[a->id] == nil)
			q->rtl[a->id] = nil;
		msgunref(m);
		return;
	}
	q->wait[a->id] = r;
}

void
mqcreate(Req *r)
{
	Aux *a;
	Mq *q;
	int m;

	if(lookup(r->ifcall.name) != nil){
		respond(r, Eexist);
		return;
	}
	m = r->ifcall.mode & OMASK;
	q = emalloc(sizeof(Mq));
	q->name = estrdup(r->ifcall.name);
	q->mode = r->ifcall.mode;
	q->qid.path = queueid++;
	q->qid.vers = 0;
	q->qid.type = QTFILE;
	a = emalloc(sizeof(Aux));
	a->q = q;
	if(m == OREAD || m == ORDWR || m == OEXEC)
		a->id = subscribe(q);
	r->ofcall.qid = q->qid;
	r->fid->qid = q->qid;
	r->fid->aux = a;
	queues = erealloc(queues, ++nqueues*sizeof(Mq*));
	queues[nqueues-1] = q;

	respond(r, nil);
}

void
mqopen(Req *r)
{
	Aux *a;
	vlong p;
	int m;

	if(r->fid->aux != nil){
		respond(r, Einuse);
		return;
	}
	m = r->ifcall.mode & OMASK;
	p = r->fid->qid.path;
	a = emalloc(sizeof(Aux));
	if(p != Qroot){
		a->q = queues[p-1];
		if(m == OREAD || m == ORDWR || m == OMASK)
			a->id = subscribe(a->q);
		r->fid->aux = a;
	}
	r->ofcall.qid = r->fid->qid;
	respond(r, nil);
}

void
destroyfid(Fid *f)
{
	Aux *a;
	int m;

	a = f->aux;
	m = f->omode & OMASK;
	if(m != OREAD && m != ORDWR && m != OEXEC)
		return;
	if(a != nil)
		a->q->rd[a->id] = -1;
}

void
mqattach(Req *r)
{
	r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
	r->fid->qid = r->ofcall.qid;
	r->fid->aux = nil;
	respond(r, nil);
}

Srv mq  = {
	.attach=mqattach,
	.open=mqopen,
	.create=mqcreate,
	.read=mqread,
	.write=mqwrite,
	.remove=mqremove,
	.flush=mqflush,
	.stat=mqstat,
	.walk1=mqwalk1,
	.clone=mqclone,
	.destroyfid=destroyfid,
};

void
usage(void)
{
	fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
	exits("usage");
}

void
main(int argc, char **argv)
{
	char *srvname, *mntpt;

	srvname = nil;
	mntpt = "/mnt/mq";
	ARGBEGIN{
	case 'd':
		chatty9p++;
		break;
	case 's':
		srvname=EARGF(usage());
		break;
	case 'm':
		mntpt = EARGF(usage());
		break;
	case 'r':
		maxlog = atoi(EARGF(usage()));
		break;
	default:
		usage();
	}ARGEND;

	postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
}