ref: 0f2488ea74810014cded7398098a29cdea01fcd2
parent: e4ac0181296586a41efb642e962281f03c099c2a
author: kvik <kvik@a-b.xyz>
date: Wed Jan 20 11:56:23 EST 2021
mq: naming is hard, try again
--- a/src/mq-cat.c
+++ b/src/mq-cat.c
@@ -15,7 +15,7 @@
void
usage(void)
{
- fprint(2, "usage: %s mq stream ...\n", argv0);
+ fprint(2, "usage: %s group stream ...\n", argv0);
exits("usage");
}
--- a/src/mq.c
+++ b/src/mq.c
@@ -7,17 +7,16 @@
#include "list.h"
#include "util.h"
-typedef struct Mq Mq;
+typedef struct Group Group;
typedef struct Stream Stream;
typedef struct Client Client;
typedef struct Read Read;
typedef struct Write Write;
-struct Mq {
- Stream *group;
+struct Group {
+ Stream *streams;
Stream *order;
- /* configuration */
enum {Message, Coalesce} mode;
enum {Replayoff, Replaylast, Replayall} replay;
};
@@ -25,15 +24,14 @@
struct Stream {
List;
- Mq *mq; /* parent */
-
- Write *queue; /* stored messages */
- Read *reads; /* readers queue */
+ Group *parent;
+ Write *wqueue;
+ Read *rqueue;
};
struct Client {
- Write *cursor; /* current chunk */
- vlong offset; /* chunk offset; for coalesce mode */
+ Write *cursor;
+ vlong offset;
};
struct Read {
@@ -46,7 +44,7 @@
List;
/* Twrite.ifcall */
- vlong offset; /* ignored */
+ vlong offset;
uint count;
uchar *data;
};
@@ -55,7 +53,7 @@
/* Dirty trick to help clients tell our
* root from most others, see pin(1). */
Qroot = 0xA,
- Qmq = 0x1,
+ Qgroup = 0x1,
Qstream,
Qorder,
Qctl,
@@ -79,36 +77,36 @@
}
File*
-mqcreate(File *parent, char *name, char *uid, ulong perm)
+groupcreate(File *parent, char *name, char *uid, ulong perm)
{
- Stream *streamalloc(Mq*);
+ Stream *streamalloc(Group*);
void *streamclose(Stream*);
File *d, *ctl, *order;
- Mq *mq;
+ Group *group;
- mq = emalloc(sizeof(Mq));
- mq->group = (Stream*)listalloc();
- mq->order = (Stream*)streamalloc(mq);
- mq->mode = Message;
- mq->replay = Replayoff;
+ group = emalloc(sizeof(Group));
+ group->streams = (Stream*)listalloc();
+ group->order = (Stream*)streamalloc(group);
+ group->mode = Message;
+ group->replay = Replayoff;
ctl = order = nil;
if(strcmp(name, "/") == 0){
d = parent;
- d->aux = mq;
+ d->aux = group;
}
else
- d = createfile(parent, name, uid, perm, mq);
+ d = createfile(parent, name, uid, perm, group);
if(d == nil)
goto err;
- filesettype(d, Qmq);
+ filesettype(d, Qgroup);
- if((ctl = createfile(d, "ctl", nil, 0664, mq)) == nil)
+ if((ctl = createfile(d, "ctl", nil, 0664, group)) == nil)
goto err;
filesettype(ctl, Qctl);
closefile(ctl);
- if((order = createfile(d, "order", nil, 0444, mq->order)) == nil)
+ if((order = createfile(d, "order", nil, 0444, group->order)) == nil)
goto err;
filesettype(order, Qorder);
closefile(order);
@@ -115,8 +113,8 @@
return d;
err:
- free(mq->group);
- streamclose(mq->order);
+ free(group->streams);
+ streamclose(group->order);
if(d) closefile(d);
if(ctl) closefile(ctl);
if(order) closefile(order);
@@ -124,22 +122,22 @@
}
void
-mqclose(File *f)
+groupclose(File *f)
{
- Mq *mq = f->aux;
+ Group *group = f->aux;
- free(mq);
+ free(group);
}
Stream*
-streamalloc(Mq *mq)
+streamalloc(Group *group)
{
Stream *s;
s = emalloc(sizeof(Stream));
- s->mq = mq;
- s->queue = (Write*)listalloc();
- s->reads = (Read*)listalloc();
+ s->parent = group;
+ s->wqueue = (Write*)listalloc();
+ s->rqueue = (Read*)listalloc();
return s;
}
@@ -150,8 +148,8 @@
Write *w;
listunlink(s);
- if(s->reads)
- foreach(Read*, s->reads){
+ if(s->rqueue)
+ foreach(Read*, s->rqueue){
/* eof these? */
r = ptr;
ptr = (Read*)r->tail;
@@ -158,15 +156,15 @@
listunlink(r);
free(r);
}
- free(s->reads);
- if(s->queue)
- foreach(Write*, s->queue){
+ free(s->rqueue);
+ if(s->wqueue)
+ foreach(Write*, s->wqueue){
w = ptr;
ptr = (Write*)w->tail;
listunlink(w);
free(w);
}
- free(s->queue);
+ free(s->wqueue);
free(s);
}
@@ -174,16 +172,16 @@
streamcreate(File *parent, char *name, char *uid, ulong perm)
{
File *f;
- Mq *mq;
+ Group *group;
Stream *s;
- mq = parent->aux;
- s = streamalloc(mq);
+ group = parent->aux;
+ s = streamalloc(group);
if((f = createfile(parent, name, uid, perm, s)) == nil){
streamclose(s);
return nil;
}
- listlink(mq->group, s);
+ listlink(group->streams, s);
filesettype(f, Qstream);
return f;
}
@@ -194,13 +192,13 @@
Client *c;
c = r->fid->aux = emalloc(sizeof(Client));
- switch(s->mq->replay){
+ switch(s->parent->replay){
case Replayoff:
- c->cursor = (Write*)s->queue->tail; break;
+ c->cursor = (Write*)s->wqueue->tail; break;
case Replaylast:
- c->cursor = (Write*)s->queue->tail->tail; break;
+ c->cursor = (Write*)s->wqueue->tail->tail; break;
case Replayall:
- c->cursor = (Write*)s->queue; break;
+ c->cursor = (Write*)s->wqueue; break;
}
}
@@ -258,17 +256,17 @@
Client *c = r->fid->aux;
Read *rd;
- /* Delay the response if the queue is empty
+ /* Delay the response if the wqueue is empty
* or if we've already caught up, respond otherwise. */
- switch(s->mq->mode){
+ switch(s->parent->mode){
case Message:
- if(listisempty(s->queue) || listislast(c->cursor))
+ if(listisempty(s->wqueue) || listislast(c->cursor))
break;
c->cursor = (Write*)c->cursor->link;
respondmessage(r);
return;
case Coalesce:
- if(listisempty(s->queue)
+ if(listisempty(s->wqueue)
|| (listislast(c->cursor) && c->offset == c->cursor->count))
break;
respondcoalesce(r);
@@ -276,7 +274,7 @@
}
rd = emalloc(sizeof(Read));
rd->r = r;
- listlink(s->reads, rd);
+ listlink(s->rqueue, rd);
}
Write*
@@ -294,16 +292,16 @@
{
File *f = r->fid->file;
Stream *s = f->aux;
- Mq *mq = s->mq;
+ Group *group = s->parent;
Write *w, *o;
long n;
- /* Commit to queue */
+ /* Commit to wqueue */
w = writealloc(r->ifcall.count);
w->count = r->ifcall.count;
w->offset = r->ifcall.offset;
memmove(w->data, r->ifcall.data, w->count);
- listlink(s->queue->tail, w);
+ listlink(s->wqueue->tail, w);
/* Commit to order */
n = strlen(f->name)+1;
@@ -311,15 +309,15 @@
o->offset = 0;
o->count = n;
memmove(o->data, f->name, n);
- listlink(mq->order->queue->tail, o);
+ listlink(group->order->wqueue->tail, o);
/* Kick the blocked stream readers */
- foreach(Read*, s->reads){
+ foreach(Read*, s->rqueue){
Client *c = ptr->r->fid->aux;
c->cursor = w;
c->offset = 0;
- switch(mq->mode){
+ switch(group->mode){
case Message:
respondmessage(ptr->r); break;
case Coalesce:
@@ -330,7 +328,7 @@
}
/* Kick the blocked order readers */
- foreach(Read*, mq->order->reads){
+ foreach(Read*, group->order->rqueue){
Client *c = ptr->r->fid->aux;
c->cursor = o;
@@ -347,7 +345,7 @@
ctlread(Req *r)
{
File *f = r->fid->file;
- Mq *mq = f->aux;
+ Group *group = f->aux;
char buf[256];
char *mode2str[] = {
@@ -360,7 +358,7 @@
[Replayall] "all",
};
snprint(buf, sizeof buf, "data %s\nreplay %s\n",
- mode2str[mq->mode], replay2str[mq->replay]);
+ mode2str[group->mode], replay2str[group->replay]);
readstr(r, buf);
respond(r, nil);
}
@@ -370,7 +368,7 @@
Cmdreplay,
Cmddebug, Cmddebug9p,
};
-Cmdtab mqcmd[] = {
+Cmdtab groupcmd[] = {
/* data message|coalesce */
{Cmddata, "data", 2},
/* replay off|last|all */
@@ -386,13 +384,13 @@
ctlwrite(Req *r)
{
File *f = r->fid->file;
- Mq *mq = f->aux;
+ Group *group = f->aux;
char *e = nil;
Cmdbuf *cmd;
Cmdtab *t;
cmd = parsecmd(r->ifcall.data, r->ifcall.count);
- t = lookupcmd(cmd, mqcmd, nelem(mqcmd));
+ t = lookupcmd(cmd, groupcmd, nelem(groupcmd));
if(t == nil){
respondcmderror(r, cmd, "%r");
free(cmd);
@@ -401,10 +399,10 @@
switch(t->index){
case Cmddata: {
if(strncmp(cmd->f[1], "message", 7) == 0)
- mq->mode = Message;
+ group->mode = Message;
else
if(strncmp(cmd->f[1], "coalesce", 8) == 0)
- mq->mode = Coalesce;
+ group->mode = Coalesce;
else
e = "usage: data message|coalesce";
break;
@@ -411,13 +409,13 @@
}
case Cmdreplay: {
if(strncmp(cmd->f[1], "off", 3) == 0)
- mq->replay = Replayoff;
+ group->replay = Replayoff;
else
if(strncmp(cmd->f[1], "last", 4) == 0)
- mq->replay = Replaylast;
+ group->replay = Replaylast;
else
if(strncmp(cmd->f[1], "all", 3) == 0)
- mq->replay = Replayall;
+ group->replay = Replayall;
else
e = "usage: replay off|last|all";
break;
@@ -457,9 +455,9 @@
switch(filetype(parent)){
case Qroot:
- case Qmq:
+ case Qgroup:
if(perm&DMDIR)
- f = mqcreate(parent, name, uid, perm);
+ f = groupcreate(parent, name, uid, perm);
else{
f = streamcreate(parent, name, uid, perm);
r->fid->file = f;
@@ -537,7 +535,7 @@
if(old->ifcall.type != Tread)
break;
- foreach(Read*, s->reads){
+ foreach(Read*, s->rqueue){
if(ptr->r == old){
free(listunlink(ptr));
break;
@@ -628,8 +626,8 @@
xdestroyfile(File *f)
{
switch(filetype(f)){
- case Qmq:
- mqclose(f);
+ case Qgroup:
+ groupclose(f);
break;
case Qstream:
streamclose(f->aux);
@@ -669,7 +667,7 @@
}ARGEND;
fs.tree = alloctree(nil, nil, DMDIR|0777, xdestroyfile);
- mqcreate(fs.tree->root, "/", nil, 0);
+ groupcreate(fs.tree->root, "/", nil, 0);
filesettype(fs.tree->root, Qroot);
if(name || mtpt){