ref: 4e663b30ce677bb6ab868d83d5ac34ab094c7e3b
parent: a43e93c246dbb5743927d132cb726a61ffb9f35f
author: kvik <kvik@a-b.xyz>
date: Thu Sep 3 09:08:55 EDT 2020
rename Pipe -> Stream; misc refactoring
--- a/src/mq-cat.c
+++ b/src/mq-cat.c
@@ -3,15 +3,15 @@
#include "util.h"
-typedef struct Pipe Pipe;
+typedef struct Stream Stream;
-struct Pipe {
+struct Stream {
char *name;
int fd;
};
-int npipes;
-Pipe *pipes;
+int nstreams;
+Stream *streams;
char buf[8192];
@@ -37,7 +37,7 @@
{
int mqfd, n, ismq;
Dir *dirs, *d;
- Pipe *p;
+ Stream *s;
mqfd = eopen(name, OREAD);
if((n = dirreadall(mqfd, &dirs)) == -1)
@@ -47,8 +47,8 @@
close(mqfd);
ismq = 0;
- npipes = n - 2;
- pipes = p = emalloc(npipes*sizeof(Pipe));
+ nstreams = n - 2;
+ streams = s = emalloc(nstreams*sizeof(Stream));
for(d = dirs; n--; d++){
if(strncmp(d->name, "ctl", 3) == 0
|| strncmp(d->name, "order", 5) == 0){
@@ -55,9 +55,9 @@
ismq++;
continue;
}
- p->name = estrdup(d->name);
- p->fd = eopen(d->name, OREAD);
- p++;
+ s->name = estrdup(d->name);
+ s->fd = eopen(d->name, OREAD);
+ s++;
}
free(dirs);
if(ismq != 2)
@@ -84,7 +84,7 @@
{
int orderfd, n, i;
char name[512+1];
- Pipe *p;
+ Stream *s;
ARGBEGIN{
default: usage();
@@ -99,11 +99,11 @@
if((n = read(orderfd, name, sizeof(name)-1)) == 0)
break;
buf[n] = 0;
- for(i = 0, p = pipes; i < npipes; i++, p++)
- if(strcmp(p->name, name) == 0)
- if(p->fd != -1){
- if(rdwr(p->fd, 1) == 0)
- p->fd = -1;
+ for(i = 0, s = streams; i < nstreams; i++, s++)
+ if(strcmp(s->name, name) == 0)
+ if(s->fd != -1){
+ if(rdwr(s->fd, 1) == 0)
+ s->fd = -1;
break;
}
}
--- a/src/mq.c
+++ b/src/mq.c
@@ -7,33 +7,39 @@
#include "list.h"
#include "util.h"
-typedef struct Client Client;
typedef struct Mq Mq;
-typedef struct Pipe Pipe;
-typedef struct Write Write;
+typedef struct Stream Stream;
+typedef struct Client Client;
typedef struct Read Read;
+typedef struct Write Write;
-struct Client {
- Write *cursor; /* reader position */
-};
-
struct Mq {
- Pipe *pipes;
- Pipe *order;
+ Stream *group;
+ Stream *order;
/* configuration */
int replay;
};
-struct Pipe {
+struct Stream {
List;
- Mq *group; /* membership */
+ Mq *mq; /* parent */
- Write *history; /* stored messages */
+ Write *queue; /* stored messages */
Read *reads; /* readers queue */
};
+struct Client {
+ Write *cursor; /* reader position */
+};
+
+struct Read {
+ List;
+
+ Req *r;
+};
+
struct Write {
List;
@@ -43,16 +49,10 @@
uchar *data;
};
-struct Read {
- List;
-
- Req *r;
-};
-
enum {
Qroot,
Qmq,
- Qpipe,
+ Qstream,
Qorder,
Qctl,
};
@@ -76,14 +76,14 @@
File*
mqcreate(File *parent, char *name, char *uid, ulong perm)
{
- Pipe *pipealloc(Mq*);
- void *pipeclose(Pipe*);
+ Stream *streamalloc(Mq*);
+ void *streamclose(Stream*);
File *d, *ctl, *order;
Mq *mq;
mq = emalloc(sizeof(Mq));
- mq->pipes = (Pipe*)listalloc();
- mq->order = (Pipe*)pipealloc(mq);
+ mq->group = (Stream*)listalloc();
+ mq->order = (Stream*)streamalloc(mq);
mq->replay = 0;
ctl = order = nil;
@@ -103,8 +103,8 @@
return d;
err:
- free(mq->pipes);
- pipeclose(mq->order);
+ free(mq->group);
+ streamclose(mq->order);
if(d) closefile(d);
if(ctl) closefile(ctl);
if(order) closefile(order);
@@ -119,27 +119,27 @@
free(mq);
}
-Pipe*
-pipealloc(Mq *mq)
+Stream*
+streamalloc(Mq *mq)
{
- Pipe *p;
+ Stream *s;
- p = emalloc(sizeof(Pipe));
- p->group = mq;
- p->history = (Write*)listalloc();
- p->reads = (Read*)listalloc();
- return p;
+ s = emalloc(sizeof(Stream));
+ s->mq = mq;
+ s->queue = (Write*)listalloc();
+ s->reads = (Read*)listalloc();
+ return s;
}
void
-pipeclose(Pipe *p)
+streamclose(Stream *s)
{
Read *r;
Write *w;
- listunlink(p);
- if(p->reads)
- foreach(Read*, p->reads){
+ listunlink(s);
+ if(s->reads)
+ foreach(Read*, s->reads){
/* eof these? */
r = ptr;
ptr = (Read*)r->tail;
@@ -146,37 +146,36 @@
listunlink(r);
free(r);
}
- free(p->reads);
- if(p->history)
- foreach(Write*, p->history){
+ free(s->reads);
+ if(s->queue)
+ foreach(Write*, s->queue){
w = ptr;
ptr = (Write*)w->tail;
listunlink(w);
free(w);
}
- free(p->history);
- free(p);
+ free(s->queue);
+ free(s);
}
File*
-pipecreate(File *parent, char *name, char *uid, ulong perm)
+streamcreate(File *parent, char *name, char *uid, ulong perm)
{
File *f;
Mq *mq;
- Pipe *p;
+ Stream *s;
mq = parent->aux;
- p = pipealloc(mq);
- if((f = createfile(parent, name, uid, perm, p)) == nil){
- pipeclose(p);
+ s = streamalloc(mq);
+ if((f = createfile(parent, name, uid, perm, s)) == nil){
+ streamclose(s);
return nil;
}
- listlink(mq->pipes, p);
- filesettype(f, Qpipe);
+ listlink(mq->group, s);
+ filesettype(f, Qstream);
return f;
}
-
void
respondread(Req *r, Write *w)
{
@@ -186,19 +185,19 @@
}
void
-piperead(Req *r)
+streamread(Req *r)
{
File *f = r->fid->file;
- Pipe *p = f->aux;
+ Stream *s = f->aux;
Client *c = r->fid->aux;
Read *rd;
- /* Delay the response if there's no history
+ /* Delay the response if the queue is empty
* or if we've already caught up. */
- if(listempty(p->history) || listend(c->cursor)){
+ if(listempty(s->queue) || listend(c->cursor)){
rd = emalloc(sizeof(Read));
rd->r = r;
- listlink(p->reads, rd);
+ listlink(s->reads, rd);
return;
}
c->cursor = (Write*)c->cursor->link;
@@ -219,17 +218,17 @@
pipewrite(Req *r)
{
File *f = r->fid->file;
- Pipe *p = f->aux;
- Mq *mq = p->group;
+ Stream *s = f->aux;
+ Mq *mq = s->mq;
Write *w, *o;
long n;
- /* Commit to history */
+ /* Commit to queue */
w = writealloc(r->ifcall.count);
w->count = r->ifcall.count;
w->offset = r->ifcall.offset;
memmove(w->data, r->ifcall.data, w->count);
- listlink(p->history->tail, w);
+ listlink(s->queue->tail, w);
/* Commit to order */
n = strlen(f->name)+1;
@@ -237,10 +236,10 @@
o->offset = 0;
o->count = n;
memmove(o->data, f->name, n);
- listlink(mq->order->history->tail, o);
+ listlink(mq->order->queue->tail, o);
- /* Kick the blocked pipe readers */
- foreach(Read*, p->reads){
+ /* Kick the blocked stream readers */
+ foreach(Read*, s->reads){
Client *c = ptr->r->fid->aux;
respondread(ptr->r, w);
@@ -346,7 +345,7 @@
if(perm&DMDIR)
f = mqcreate(parent, name, uid, perm);
else
- f = pipecreate(parent, name, uid, perm);
+ f = streamcreate(parent, name, uid, perm);
break;
}
if(f == nil)
@@ -361,16 +360,16 @@
File *f = r->fid->file;
switch(filetype(f)){
- case Qpipe:
+ case Qstream:
case Qorder: {
- Pipe *p = f->aux;
+ Stream *s = f->aux;
Client *c;
c = r->fid->aux = emalloc(sizeof(Client));
- if(p->group->replay)
- c->cursor = (Write*)p->history;
+ if(s->mq->replay)
+ c->cursor = (Write*)s->queue;
else
- c->cursor = (Write*)p->history->tail;
+ c->cursor = (Write*)s->queue->tail;
break;
}}
respond(r, nil);
@@ -382,7 +381,7 @@
File *f = r->fid->file;
switch(filetype(f)){
- case Qpipe:
+ case Qstream:
pipewrite(r);
break;
case Qctl:
@@ -400,9 +399,9 @@
File *f = r->fid->file;
switch(filetype(f)){
- case Qpipe:
+ case Qstream:
case Qorder:
- piperead(r);
+ streamread(r);
break;
default:
respond(r, "forbidden");
@@ -416,13 +415,13 @@
File *f = old->fid->file;
switch(filetype(f)){
- case Qpipe:
+ case Qstream:
case Qorder: {
- Pipe *p = f->aux;
+ Stream *s = f->aux;
if(old->ifcall.type != Tread)
break;
- foreach(Read*, p->reads){
+ foreach(Read*, s->reads){
if(ptr->r == old){
free(listunlink(ptr));
break;
@@ -448,8 +447,8 @@
case Qmq:
mqclose(f);
break;
- case Qpipe:
- pipeclose(f->aux);
+ case Qstream:
+ streamclose(f->aux);
break;
}
return;