shithub: mq

Download patch

ref: 0f2488ea74810014cded7398098a29cdea01fcd2
parent: e4ac0181296586a41efb642e962281f03c099c2a
author: kvik <kvik@a-b.xyz>
date: Wed Jan 20 11:56:23 EST 2021

mq: naming is hard, try again

--- a/src/mq-cat.c
+++ b/src/mq-cat.c
@@ -15,7 +15,7 @@
 void
 usage(void)
 {
-	fprint(2, "usage: %s mq stream ...\n", argv0);
+	fprint(2, "usage: %s group stream ...\n", argv0);
 	exits("usage");
 }
 
--- a/src/mq.c
+++ b/src/mq.c
@@ -7,17 +7,16 @@
 #include "list.h"
 #include "util.h"
 
-typedef struct Mq Mq;
+typedef struct Group Group;
 typedef struct Stream Stream;
 typedef struct Client Client;
 typedef struct Read Read;
 typedef struct Write Write;
 
-struct Mq {
-	Stream *group;
+struct Group {
+	Stream *streams;
 	Stream *order;
 
-	/* configuration */
 	enum {Message, Coalesce} mode;
 	enum {Replayoff, Replaylast, Replayall} replay;
 };
@@ -25,15 +24,14 @@
 struct Stream {
 	List;
 
-	Mq *mq; /* parent */
-
-	Write *queue; /* stored messages */
-	Read *reads; /* readers queue */
+	Group *parent;
+	Write *wqueue;
+	Read *rqueue;
 };
 
 struct Client {
-	Write *cursor; /* current chunk */
-	vlong offset; /* chunk offset; for coalesce mode */
+	Write *cursor;
+	vlong offset;
 };
 
 struct Read {
@@ -46,7 +44,7 @@
 	List;
 
 	/* Twrite.ifcall */
-	vlong offset; /* ignored */
+	vlong offset;
 	uint count;
 	uchar *data;
 };
@@ -55,7 +53,7 @@
 	/* Dirty trick to help clients tell our
 	 * root from most others, see pin(1). */
 	Qroot = 0xA,
-	Qmq = 0x1,
+	Qgroup = 0x1,
 		Qstream,
 		Qorder,
 		Qctl,
@@ -79,36 +77,36 @@
 }
 
 File*
-mqcreate(File *parent, char *name, char *uid, ulong perm)
+groupcreate(File *parent, char *name, char *uid, ulong perm)
 {
-	Stream *streamalloc(Mq*);
+	Stream *streamalloc(Group*);
 	void *streamclose(Stream*);
 	File *d, *ctl, *order;
-	Mq *mq;
+	Group *group;
 
-	mq = emalloc(sizeof(Mq));
-	mq->group = (Stream*)listalloc();
-	mq->order = (Stream*)streamalloc(mq);
-	mq->mode = Message;
-	mq->replay = Replayoff;
+	group = emalloc(sizeof(Group));
+	group->streams = (Stream*)listalloc();
+	group->order = (Stream*)streamalloc(group);
+	group->mode = Message;
+	group->replay = Replayoff;
 
 	ctl = order = nil;
 	if(strcmp(name, "/") == 0){
 		d = parent;
-		d->aux = mq;
+		d->aux = group;
 	}
 	else
-		d = createfile(parent, name, uid, perm, mq);
+		d = createfile(parent, name, uid, perm, group);
 	if(d == nil)
 		goto err;
-	filesettype(d, Qmq);
+	filesettype(d, Qgroup);
 
-	if((ctl = createfile(d, "ctl", nil, 0664, mq)) == nil)
+	if((ctl = createfile(d, "ctl", nil, 0664, group)) == nil)
 		goto err;
 	filesettype(ctl, Qctl);
 	closefile(ctl);
 
-	if((order = createfile(d, "order", nil, 0444, mq->order)) == nil)
+	if((order = createfile(d, "order", nil, 0444, group->order)) == nil)
 		goto err;
 	filesettype(order, Qorder);
 	closefile(order);
@@ -115,8 +113,8 @@
 
 	return d;
 err:
-	free(mq->group);
-	streamclose(mq->order);
+	free(group->streams);
+	streamclose(group->order);
 	if(d) closefile(d);
 	if(ctl) closefile(ctl);
 	if(order) closefile(order);
@@ -124,22 +122,22 @@
 }
 
 void
-mqclose(File *f)
+groupclose(File *f)
 {
-	Mq *mq = f->aux;
+	Group *group = f->aux;
 
-	free(mq);
+	free(group);
 }
 
 Stream*
-streamalloc(Mq *mq)
+streamalloc(Group *group)
 {
 	Stream *s;
 	
 	s = emalloc(sizeof(Stream));
-	s->mq = mq;
-	s->queue = (Write*)listalloc();
-	s->reads = (Read*)listalloc();
+	s->parent = group;
+	s->wqueue = (Write*)listalloc();
+	s->rqueue = (Read*)listalloc();
 	return s;
 }
 
@@ -150,8 +148,8 @@
 	Write *w;
 
 	listunlink(s);
-	if(s->reads)
-	foreach(Read*, s->reads){
+	if(s->rqueue)
+	foreach(Read*, s->rqueue){
 		/* eof these? */
 		r = ptr;
 		ptr = (Read*)r->tail;
@@ -158,15 +156,15 @@
 		listunlink(r);
 		free(r);
 	}
-	free(s->reads);
-	if(s->queue)
-	foreach(Write*, s->queue){
+	free(s->rqueue);
+	if(s->wqueue)
+	foreach(Write*, s->wqueue){
 		w = ptr;
 		ptr = (Write*)w->tail;
 		listunlink(w);
 		free(w);
 	}
-	free(s->queue);
+	free(s->wqueue);
 	free(s);
 }
 
@@ -174,16 +172,16 @@
 streamcreate(File *parent, char *name, char *uid, ulong perm)
 {
 	File *f;
-	Mq *mq;
+	Group *group;
 	Stream *s;
 
-	mq = parent->aux;
-	s = streamalloc(mq);
+	group = parent->aux;
+	s = streamalloc(group);
 	if((f = createfile(parent, name, uid, perm, s)) == nil){
 		streamclose(s);
 		return nil;
 	}
-	listlink(mq->group, s);
+	listlink(group->streams, s);
 	filesettype(f, Qstream);
 	return f;
 }
@@ -194,13 +192,13 @@
 	Client *c;
 	
 	c = r->fid->aux = emalloc(sizeof(Client));
-	switch(s->mq->replay){
+	switch(s->parent->replay){
 	case Replayoff:
-		c->cursor = (Write*)s->queue->tail; break;
+		c->cursor = (Write*)s->wqueue->tail; break;
 	case Replaylast:
-		c->cursor = (Write*)s->queue->tail->tail; break;
+		c->cursor = (Write*)s->wqueue->tail->tail; break;
 	case Replayall:
-		c->cursor = (Write*)s->queue; break;
+		c->cursor = (Write*)s->wqueue; break;
 	}
 }
 
@@ -258,17 +256,17 @@
 	Client *c = r->fid->aux;
 	Read *rd;
 
-	/* Delay the response if the queue is empty
+	/* Delay the response if the wqueue is empty
 	 * or if we've already caught up, respond otherwise. */
-	switch(s->mq->mode){
+	switch(s->parent->mode){
 	case Message:
-		if(listisempty(s->queue) || listislast(c->cursor))
+		if(listisempty(s->wqueue) || listislast(c->cursor))
 			break;
 		c->cursor = (Write*)c->cursor->link;
 		respondmessage(r);
 		return;
 	case Coalesce:
-		if(listisempty(s->queue)
+		if(listisempty(s->wqueue)
 		|| (listislast(c->cursor) && c->offset == c->cursor->count))
 			break;
 		respondcoalesce(r);
@@ -276,7 +274,7 @@
 	}
 	rd = emalloc(sizeof(Read));
 	rd->r = r;
-	listlink(s->reads, rd);
+	listlink(s->rqueue, rd);
 }
 
 Write*
@@ -294,16 +292,16 @@
 {
 	File *f = r->fid->file;
 	Stream *s = f->aux;
-	Mq *mq = s->mq;
+	Group *group = s->parent;
 	Write *w, *o;
 	long n;
 
-	/* Commit to queue */
+	/* Commit to wqueue */
 	w = writealloc(r->ifcall.count);
 	w->count = r->ifcall.count;
 	w->offset = r->ifcall.offset;
 	memmove(w->data, r->ifcall.data, w->count);
-	listlink(s->queue->tail, w);
+	listlink(s->wqueue->tail, w);
 
 	/* Commit to order */
 	n = strlen(f->name)+1;
@@ -311,15 +309,15 @@
 	o->offset = 0;
 	o->count = n;
 	memmove(o->data, f->name, n);
-	listlink(mq->order->queue->tail, o);
+	listlink(group->order->wqueue->tail, o);
 
 	/* Kick the blocked stream readers */
-	foreach(Read*, s->reads){
+	foreach(Read*, s->rqueue){
 		Client *c = ptr->r->fid->aux;
 
 		c->cursor = w;
 		c->offset = 0;
-		switch(mq->mode){
+		switch(group->mode){
 		case Message:
 			respondmessage(ptr->r); break;
 		case Coalesce:
@@ -330,7 +328,7 @@
 	}
 
 	/* Kick the blocked order readers */
-	foreach(Read*, mq->order->reads){
+	foreach(Read*, group->order->rqueue){
 		Client *c = ptr->r->fid->aux;
 
 		c->cursor = o;
@@ -347,7 +345,7 @@
 ctlread(Req *r)
 {
 	File *f = r->fid->file;
-	Mq *mq = f->aux;
+	Group *group = f->aux;
 	char buf[256];
 
 	char *mode2str[] = {
@@ -360,7 +358,7 @@
 		[Replayall] "all",
 	};
 	snprint(buf, sizeof buf, "data %s\nreplay %s\n",
-		mode2str[mq->mode], replay2str[mq->replay]);
+		mode2str[group->mode], replay2str[group->replay]);
 	readstr(r, buf);
 	respond(r, nil);
 }
@@ -370,7 +368,7 @@
 	Cmdreplay,
 	Cmddebug, Cmddebug9p,
 };
-Cmdtab mqcmd[] = {
+Cmdtab groupcmd[] = {
 	/* data message|coalesce */
 	{Cmddata, "data", 2},
 	/* replay off|last|all */
@@ -386,13 +384,13 @@
 ctlwrite(Req *r)
 {
 	File *f = r->fid->file;
-	Mq *mq = f->aux;
+	Group *group = f->aux;
 	char *e = nil;
 	Cmdbuf *cmd;
 	Cmdtab *t;
 
 	cmd = parsecmd(r->ifcall.data, r->ifcall.count);
-	t = lookupcmd(cmd, mqcmd, nelem(mqcmd));
+	t = lookupcmd(cmd, groupcmd, nelem(groupcmd));
 	if(t == nil){
 		respondcmderror(r, cmd, "%r");
 		free(cmd);
@@ -401,10 +399,10 @@
 	switch(t->index){
 	case Cmddata: {
 		if(strncmp(cmd->f[1], "message", 7) == 0)
-			mq->mode = Message;
+			group->mode = Message;
 		else
 		if(strncmp(cmd->f[1], "coalesce", 8) == 0)
-			mq->mode = Coalesce;
+			group->mode = Coalesce;
 		else
 			e = "usage: data message|coalesce";
 		break;
@@ -411,13 +409,13 @@
 	}
 	case Cmdreplay: {
 		if(strncmp(cmd->f[1], "off", 3) == 0)
-			mq->replay = Replayoff;
+			group->replay = Replayoff;
 		else
 		if(strncmp(cmd->f[1], "last", 4) == 0)
-			mq->replay = Replaylast;
+			group->replay = Replaylast;
 		else
 		if(strncmp(cmd->f[1], "all", 3) == 0)
-			mq->replay = Replayall;
+			group->replay = Replayall;
 		else
 			e = "usage: replay off|last|all";
 		break;
@@ -457,9 +455,9 @@
 
 	switch(filetype(parent)){
 	case Qroot:
-	case Qmq:
+	case Qgroup:
 		if(perm&DMDIR)
-			f = mqcreate(parent, name, uid, perm);
+			f = groupcreate(parent, name, uid, perm);
 		else{
 			f = streamcreate(parent, name, uid, perm);
 			r->fid->file = f;
@@ -537,7 +535,7 @@
 
 		if(old->ifcall.type != Tread)
 			break;
-		foreach(Read*, s->reads){
+		foreach(Read*, s->rqueue){
 			if(ptr->r == old){
 				free(listunlink(ptr));
 				break;
@@ -628,8 +626,8 @@
 xdestroyfile(File *f)
 {
 	switch(filetype(f)){
-	case Qmq:
-		mqclose(f);
+	case Qgroup:
+		groupclose(f);
 		break;
 	case Qstream:
 		streamclose(f->aux);
@@ -669,7 +667,7 @@
 	}ARGEND;
 
 	fs.tree = alloctree(nil, nil, DMDIR|0777, xdestroyfile);
-	mqcreate(fs.tree->root, "/", nil, 0);
+	groupcreate(fs.tree->root, "/", nil, 0);
 	filesettype(fs.tree->root, Qroot);
 
 	if(name || mtpt){