shithub: mq

ref: fc145e25fb17b0a94e9012334bd2c70cf3d8dd00
dir: /mq.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

typedef struct Mq Mq;
typedef struct Rd Rd;
typedef struct Msg Msg;
typedef struct Aux Aux;

enum {
	Qroot,
	Qmq,
};

enum {
	KiB	= 1024,
	MiB	= 1024*KiB,
	GiB	= 1024*MiB,
};

struct Aux {
	Mq	*q;
	int	id;
	int	ntail;
};

struct Msg {
	Ref;
	Msg	*next;
	int	count;
	char	*data;
	char	buf[];
};

struct Rd {
	int	id;
	int	off;
	Msg	*hd;
	Msg	*tl;
	Req	*wait;
};

struct Mq {
	Ref;
	Qid	qid;
	int	moribund;
	usize	logsz;
	Msg	*loghd;
	Msg	*logtl;
	Msg	*hd;
	Msg	*tl;

	int	nrd;
	Rd	*rd;

	char	*name;
	char	*user;
	char	*group;
	int	mode;
};

Mq	**queues;
int	nqueues;
vlong	maxlog = -1;
vlong	queueid = 0;
int	coalesce;

char Ebaduse[] = "invalid use of fd";
char Einuse[] = "fid in use";
char Eexist[] = "file already exists";
char Enoexist[] = "file does not exist";
char Eintr[] = "interrupted";
char Enotdir[] = "not a directory";
char Ebadcmd[] = "unknown command";
char Enomem[] = "out of memory";

#define QTYPE(p)	((int)((p) & 0x3))
#define QIDX(p)		((p)>>2)
#define QPATH(i, t)	((i)<<2 | (t))

void *
emalloc(ulong n)
{
	void *v;
	
	v = mallocz(n, 1);
	if(v == nil)
		sysfatal("malloc: %r");
	setmalloctag(v, getcallerpc(&n));
	return v;
}

void *
erealloc(void *p, ulong n)
{
	void *v;
	
	v = realloc(p, n);
	if(v == nil)
		sysfatal("realloc: %r");
	setmalloctag(v, getcallerpc(&p));
	return v;
}

char*
estrdup(char *s)
{
	s = strdup(s);
	if(s == nil)
		sysfatal("strdup: %r");
	setmalloctag(s, getcallerpc(&s));
	return s;
}

void
trimlog(Mq *q)
{
	Msg *m;

	if(maxlog < 0)
		return;
	while(q->loghd != nil && q->logsz >= maxlog){
		m = q->loghd;
		q->loghd = m->next;
		q->logsz -= m->count;
		if(decref(m) == 0)
			free(m);
	}
	if(q->loghd == nil)
		q->logtl = nil;
}

int
subscribe(Mq *q, vlong ntail)
{
	Msg *m;
	Rd *rd;
	vlong sz;
	int i;

	rd = nil;
	for(i = 0; i < q->nrd; i++){
		if(q->rd[i].id == -1){
			rd = &q->rd[i];
			break;
		}
	}
	if(rd == nil){
		q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd));
		rd = &q->rd[q->nrd - 1];
	}
	rd->id = i;
	rd->wait = nil;
	rd->off = 0;
	rd->hd = nil;
	rd->tl = nil;
	sz = q->logsz;
	m = q->loghd;
	if(ntail != -1)
		for(; m != nil && sz > ntail; m = m->next)
			sz -= m->count;
	rd->hd = m;
	for(; m != nil; m = m->next)
		incref(m);
	rd->tl = m;
	return rd->id;
}

Mq*
lookup(char *name)
{
	int i;

	for(i = 0; i < nqueues; i++)
		if(strcmp(queues[i]->name, name) == 0)
			return queues[i];
	return nil;
}

void
qstat(Dir *d, Mq *q, Aux *a)
{
	d->name = estrdup9p(q->name);
	d->uid = estrdup9p("glenda");
	d->gid = estrdup9p("glenda");
	d->muid = estrdup9p("glenda");
	d->qid = q->qid;
	d->mtime = 0;
	d->atime = 0;
	d->mode = q->mode;
	d->length = q->logsz;
	if(a->ntail < d->length)
		d->length = a->ntail;
}

int
rootgen(int i, Dir *d, void *a)
{
	if(i >= nqueues)
		return -1;
	qstat(d, queues[i], a);
	return 0;
}
	

char*
mqclone(Fid *old, Fid *new)
{
	Aux *o, *n;

	o = old->aux;
	n = emalloc(sizeof(Aux));
	if(o->q != nil){
		n->q = o->q;
		n->id = subscribe(o->q, o->ntail);
	}
	n->ntail = o->ntail;
	new->aux = n;
	return nil;
}

char*
mqwalk1(Fid *f, char *name, Qid *qid)
{
	Mq *q;

	switch(QTYPE(f->qid.path)){
	case Qroot:
		if(strcmp(name, "..") == 0){
			*qid = f->qid;
			return nil;
		}else if((q = lookup(name)) != nil){
			f->qid = q->qid;
			*qid = f->qid;
			return nil;
		}
		return Enoexist;
	default:
		if(strcmp(name, "..") == 0){
			f->qid = (Qid){Qroot, 0, QTDIR};
			*qid = f->qid;
			return nil;
		}
		return Enotdir;
	}	
}

void
mqstat(Req *r)
{
	vlong p;

	p = r->fid->qid.path;
	r->d.uid = estrdup9p("glenda");
	r->d.gid = estrdup9p("glenda");
	r->d.muid = estrdup9p("glenda");
	r->d.qid = r->fid->qid;
	r->d.mtime = 0;
	r->d.atime = 0;
	switch(QTYPE(p)){
	case Qroot:
		r->d.mode = DMDIR|0755;
		break;
	default:
		r->d.mode = 0644;
		incref(queues[QIDX(p)]);
		qstat(&r->d, queues[QIDX(p)], r->fid->aux);
		decref(queues[QIDX(p)]);
	}
	respond(r, nil);
}

void
mqflush(Req *r)
{
	Req *w;
	Aux *a;

	if((a = r->oldreq->fid->aux) != nil){
		w = a->q->rd[a->id].wait;
		if(w != nil)
			respond(w, "interrupted");
		a->q->rd[a->id].wait = nil;
	}
	respond(r, nil);
}

void
mqremove(Req *r)
{
	vlong path;
	int i, o;

	path = r->fid->qid.path;
	if(QTYPE(path) == Qroot){
		respond(r, Ebaduse);
		return;
	}
	o = 0;
	for(i = 0; i < nqueues; i++){
		if(queues[i]->qid.path == path){
			queues[i]->moribund = 1;
			continue;
		}
		queues[o++] = queues[i];
	}
	nqueues--;
	respond(r, nil);
}

void
mqwrite(Req *r)
{
	Msg *m;
	Req *rr;
	Aux *a;
	Mq *q;
	int i;

	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;
	m = emalloc(sizeof(Msg) + r->ifcall.count);
	m->data = m->buf;
	m->count = r->ifcall.count;
	memmove(m->data, r->ifcall.data, m->count);
	m->next = nil;
	for(i = 0; i < q->nrd; i++){
		if(q->rd[i].id == -1)
			continue;
		rr = q->rd[i].wait;
		q->rd[i].wait = nil;
		if(rr != nil){
			rr->ofcall.data = m->data;
			rr->ofcall.count = m->count;
			if(rr->ifcall.count > m->count)
				rr->ofcall.count = m->count;
			respond(rr, nil);
			if(rr->ofcall.count == m->count)
				continue;
			m->count -= rr->ofcall.count;
			m->data += rr->ofcall.count;
		}
		if(q->rd[i].hd == nil)
			q->rd[i].hd = m;
		if(q->rd[i].tl != nil)
			q->rd[i].tl->next = m;
		q->rd[i].tl = m;
		incref(m);
	}
	if(q->loghd == nil)
		q->loghd = m;
	if(q->logtl != nil)
		q->logtl->next = m;
	incref(m);
	q->logtl = m;
	q->logsz += m->count;
	q->logtl = m;

	trimlog(q);
	q->qid.vers++;
	r->ofcall.count = r->ifcall.count;
	respond(r, nil);
}

void
mqread(Req *r)
{
	char *p, *e, *b, *d;
	int n, mcount;
	Aux *a;
	Msg *m;
	Rd *rd;
	Mq *q;

	if(QTYPE(r->fid->qid.path) == Qroot){
		dirread9p(r, rootgen, r->fid->aux);
		respond(r, nil);
		return;
	}
	if((a = r->fid->aux) == nil){
		respond(r, Ebaduse);
		return;
	}
	q = a->q;

	/* no messages: enqueue until next one comes */
	if(q->rd[a->id].hd == nil){
		q->rd[a->id].wait = r;
		return;
	}

	/* queued messages: pop data off */
	rd = &q->rd[a->id];
	d = emalloc(r->ifcall.count);
	p = d;
	e = d + r->ifcall.count;
	r->ofcall.data = p;
	while(rd->hd != nil && p != e){
		m = rd->hd;
		b = m->data + rd->off;
		n = e - p;
		mcount = m->count;
		assert(rd->off >= 0 && rd->off <= m->count);
		incref(m);
		if(n >= m->count - rd->off){
			n = m->count - rd->off;
			memcpy(p, b, n);
			rd->hd = m->next;
			rd->off = 0;
			if(rd->hd == nil)
				rd->tl = nil;
			decref(m);
		}else{
			memcpy(p, b, n);
			rd->off += n;
		}
		p += n;
		if(decref(m) == 0)
			free(m);
		if(!coalesce || n < mcount)
			break;
			
	}
	assert(r->ofcall.count <= r->ifcall.count);
	r->ofcall.count = p - d;
	respond(r, nil);
	free(d);
}

void
mqcreate(Req *r)
{
	Aux *a;
	Mq *q;
	int m;

	if(lookup(r->ifcall.name) != nil){
		respond(r, Eexist);
		return;
	}
	m = r->ifcall.mode & OMASK;
	q = emalloc(sizeof(Mq));
	q->name = estrdup(r->ifcall.name);
	q->mode = r->ifcall.mode;
	q->qid.path = QPATH(queueid, Qmq);
	q->qid.vers = 0;
	q->qid.type = QTFILE;
	queueid++;

	a = r->fid->aux;
	assert(a->q == nil);
	a->q = q;
	if(m == OREAD || m == ORDWR || m == OEXEC)
		a->id = subscribe(q, a->ntail);
	r->ofcall.qid = q->qid;
	r->fid->qid = q->qid;
	r->fid->aux = a;
	queues = erealloc(queues, ++nqueues*sizeof(Mq*));
	queues[nqueues-1] = q;

	respond(r, nil);
}

void
mqopen(Req *r)
{
	Aux *a;
	vlong p;
	int m;

	a = r->fid->aux;
	if(a->q != nil){
		respond(r, Einuse);
		return;
	}
	m = r->ifcall.mode & OMASK;
	p = r->fid->qid.path;
	if(QTYPE(p) != Qroot){
		incref(queues[QIDX(p)]);
		a->q = queues[QIDX(p)];
		if(m == OREAD || m == ORDWR || m == OMASK)
			a->id = subscribe(a->q, a->ntail);
	}
	r->ofcall.qid = r->fid->qid;
	respond(r, nil);
}

void
destroyfid(Fid *f)
{
	Aux *a;
	int m;

	a = f->aux;
	m = f->omode & OMASK;
	if(m != OREAD && m != ORDWR && m != OEXEC)
		return;
	if(a != nil && a->q != nil)
		a->q->rd[a->id].id = -1;
	free(a);
}

void
mqattach(Req *r)
{
	Aux *a;
	char *n, *e;

	n = r->ifcall.aname;
	a = emalloc(sizeof(Aux));
	r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
	r->fid->qid = r->ofcall.qid;
	r->fid->aux = a;
	a->ntail = -1;
	if(n != nil && strncmp(n, "tail:", 5) == 0){
		a->ntail = strtol(n+5, &e, 0);
		while(*e){
			switch(*e++){
			case 'g':	a->ntail *= 1024*1024*1024;	break;
			case 'm':	a->ntail *= 1024*1024;		break;
			case 'k':	a->ntail *= 1024;		break;
			default:
				respond(r, "bad scale");
				return;
			}
		}
	}
	respond(r, nil);
}

Srv mq  = {
	.attach=mqattach,
	.open=mqopen,
	.create=mqcreate,
	.read=mqread,
	.write=mqwrite,
	.remove=mqremove,
	.flush=mqflush,
	.stat=mqstat,
	.walk1=mqwalk1,
	.clone=mqclone,
	.destroyfid=destroyfid,
};

void
usage(void)
{
	fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
	exits("usage");
}

void
main(int argc, char **argv)
{
	char *srvname, *mntpt, *s, *e;

	srvname = "mq";
	mntpt = "/mnt/mq";
	ARGBEGIN{
	case 'd':
		chatty9p++;
		break;
	case 's':
		srvname=EARGF(usage());
		break;
	case 'm':
		mntpt = EARGF(usage());
		break;
	case 'l':
		s = EARGF(usage());
		maxlog = strtoll(s, &e, 0);
		while(*e){
			switch(*e++){
			case 'k':	maxlog *= KiB;	break;
			case 'K':	maxlog *= KiB;	break;
			case 'm':	maxlog *= MiB;	break;
			case 'M':	maxlog *= MiB;	break;
			case 'g':	maxlog *= GiB;	break;
			case 'G':	maxlog *= GiB;	break;
			default:	sysfatal("unknown suffix %c", *e);
			}
		}
		break;
	case 'c':
		coalesce = 1;
		break;
	default:
		usage();
	}ARGEND;

	postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
	exits(nil);
}