ref: 5d36e30a4110e8c39daec37ddaa1a55e19ab8e02
author: kvik <kvik@a-b.xyz>
date: Mon Aug 31 08:44:24 EDT 2020
init
--- /dev/null
+++ b/LICENSE
@@ -1,0 +1,19 @@
+Copyright (c) 2020 kvik@a-b.xyz
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
--- /dev/null
+++ b/mkfile
@@ -1,0 +1,61 @@
+</$objtype/mkfile
+
+HFILES=`{test -d src && walk -f src | grep '\.h$'}
+CFILES=`{test -d src && walk -f src | grep '\.c$'}
+CMAIN=`{grep -l '^(thread)?main\(' $CFILES /dev/null}
+CCOM=`{grep -L '^(thread)?main\(' $CFILES /dev/null | sed '/^\/dev\/null/d'}
+OCOM=${CCOM:src/%.c=obj/$objtype/%.o}
+
+BINTARG=${CMAIN:src/%.c=bin/$objtype/%}
+RCFILES=`{test -d rc && walk -f rc}
+MANFILES=`{test -d man && walk -n 2,2 -f man}
+
+BIN=/$objtype/bin
+RC=/rc/bin
+MAN=/sys/man
+
+DIRS=bin obj bin/$objtype obj/$objtype
+
+BININST=${BINTARG:bin/$objtype/%=$BIN/%}
+RCINST=${RCFILES:rc/%=$RC/%}
+MANINST=${MANFILES:man/%=$MAN/%}
+INST=$BININST $RCINST $MANINST
+
+none:V: all
+
+$DIRS:
+ mkdir -p $target
+
+obj/$objtype/%.o: obj/$objtype $HFILES
+
+obj/$objtype/%.o: src/%.c
+ $CC $CFLAGS -o $target src/$stem.c
+
+bin/$objtype/%: bin/$objtype obj/$objtype/%.o $OCOM
+ $LD $LDFLAGS -o $target obj/$objtype/$stem.o $OCOM
+
+$BIN/%: bin/$objtype/%
+ cp $prereq $target
+
+$RC/%: rc/%
+ cp -x $prereq $target
+
+/sys/man/%: man/%
+ cp $prereq $target
+
+man:V: $MANINST
+
+%.cpus:V:
+ for(objtype in $CPUS) mk $MKFLAGS $stem
+
+all:V: $BINTARG
+
+install:V: $INST
+
+installall:V: install.cpus
+
+uninstall:V:
+ rm -f $INST
+
+clean:V:
+ rm -rf bin obj
--- /dev/null
+++ b/rc/attach
@@ -1,0 +1,9 @@
+#!/bin/rc -e
+rfork e
+flagfmt = ''; args = 'name'
+eval `''{aux/getflags $*} || exec aux/usage
+if(! ~ $#* 1) exec aux/usage
+
+mq = $1
+mq-cat $mq &
+exec cat >>$mq/0
--- /dev/null
+++ b/rc/detach
@@ -1,0 +1,14 @@
+#!/bin/rc -e
+rfork e
+flagfmt = 'c:clean'; args = 'name'
+eval `''{aux/getflags $*} || exec aux/usage
+if(! ~ $#* 1) exec aux/usage
+
+mq = $1
+mkdir $mq
+touch $mq/^(0 1 2)
+echo replay on >$mq/ctl
+
+prompt=($mq'% ' ' ')
+rc -i <$mq/0 >>$mq/1 >>[2]$mq/2 &
+exec attach $mq
--- /dev/null
+++ b/src/list.c
@@ -1,0 +1,47 @@
+#include <u.h>
+#include <libc.h>
+
+#include "list.h"
+#include "util.h"
+
+List*
+listalloc(void)
+{
+ List *n;
+
+ n = emalloc(sizeof(List));
+ n->tag = Listlead;
+ n->link = n;
+ n->tail = n;
+ return n;
+}
+
+List*
+listlink(List *p, List *n)
+{
+ n->link = p->link;
+ p->link = n;
+ n->tail = p;
+ n->link->tail = n;
+ return n;
+}
+
+List*
+listunlink(List *p)
+{
+ p->link->tail = p->tail;
+ p->tail->link = p->link;
+ return p;
+}
+
+int
+listend(List *p)
+{
+ return p->link->tag == Listlead;
+}
+
+int
+listempty(List *p)
+{
+ return p->link == p;
+}
--- /dev/null
+++ b/src/list.h
@@ -1,0 +1,20 @@
+enum { Listlead = 0xAA };
+
+typedef struct List List;
+
+/* Must be embedded at the top of struct */
+struct List {
+ uchar tag;
+ List *link;
+ List *tail;
+};
+
+/* What. */
+#define foreach(type, list) \
+ for(type ptr = (type)(list)->link; ptr->tag != Listlead; ptr = (type)ptr->link)
+
+List* listalloc(void);
+List* listlink(List*, List*);
+List* listunlink(List*);
+int listempty(List*);
+int listend(List*);
--- /dev/null
+++ b/src/mq-cat.c
@@ -1,0 +1,111 @@
+#include <u.h>
+#include <libc.h>
+
+#include "util.h"
+
+typedef struct Pipe Pipe;
+
+struct Pipe {
+ char *name;
+ int fd;
+};
+
+int npipes;
+Pipe *pipes;
+
+char buf[8192];
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s mq\n", argv0);
+ exits("usage");
+}
+
+int
+eopen(char *s, int m)
+{
+ int fd;
+
+ if((fd = open(s, m)) < 0)
+ sysfatal("open: %r");
+ return fd;
+}
+
+int
+openmq(char *name)
+{
+ int mqfd, n, ismq;
+ Dir *dirs, *d;
+ Pipe *p;
+
+ mqfd = eopen(name, OREAD);
+ if((n = dirreadall(mqfd, &dirs)) == -1)
+ sysfatal("dirread: %r");
+ if(n == 0)
+ return -1;
+ close(mqfd);
+
+ ismq = 0;
+ npipes = n - 2;
+ pipes = p = emalloc(npipes*sizeof(Pipe));
+ for(d = dirs; n--; d++){
+ if(strncmp(d->name, "ctl", 3) == 0
+ || strncmp(d->name, "order", 5) == 0){
+ ismq++;
+ continue;
+ }
+ p->name = estrdup(d->name);
+ p->fd = eopen(d->name, OREAD);
+ p++;
+ }
+ free(dirs);
+ if(ismq != 2)
+ return -1;
+ return eopen("order", OREAD);
+}
+
+long
+rdwr(int fd0, int fd1)
+{
+ long n;
+
+ if((n = read(fd0, buf, sizeof buf)) == -1)
+ sysfatal("read: %r");
+ if(n == 0)
+ return 0;
+ if(write(fd1, buf, n) != n)
+ sysfatal("write: %r");
+ return n;
+}
+
+void
+main(int argc, char *argv[])
+{
+ int orderfd, n, i;
+ char name[512+1];
+ Pipe *p;
+
+ ARGBEGIN{
+ default: usage();
+ }ARGEND;
+ if(argc != 1) usage();
+
+ if(chdir(argv[0]) == -1)
+ sysfatal("chdir: %r");
+ if((orderfd = openmq(".")) == -1)
+ sysfatal("not mq");
+ for(;;){
+ if((n = read(orderfd, name, sizeof(name)-1)) == 0)
+ break;
+ buf[n] = 0;
+ for(i = 0, p = pipes; i < npipes; i++, p++)
+ if(strcmp(p->name, name) == 0)
+ if(p->fd != -1){
+ if(rdwr(p->fd, 1) == 0)
+ p->fd = -1;
+ break;
+ }
+ }
+ exits(nil);
+}
--- /dev/null
+++ b/src/mq.c
@@ -1,0 +1,499 @@
+#include <u.h>
+#include <libc.h>
+#include <fcall.h>
+#include <thread.h>
+#include <9p.h>
+
+#include "list.h"
+#include "util.h"
+
+typedef struct Client Client;
+typedef struct Mq Mq;
+typedef struct Pipe Pipe;
+typedef struct Write Write;
+typedef struct Read Read;
+typedef struct Order Order;
+
+struct Client {
+ Write *cursor; /* reader position */
+};
+
+struct Mq {
+ Pipe *pipes;
+ Pipe *order;
+
+ /* configuration */
+ int replay;
+};
+
+struct Pipe {
+ List;
+
+ Mq *group; /* membership */
+
+ Write *history; /* stored messages */
+ Read *reads; /* readers queue */
+};
+
+struct Write {
+ List;
+
+ /* Twrite.ifcall */
+ vlong offset; /* ignored */
+ uint count;
+ uchar *data;
+};
+
+struct Read {
+ List;
+
+ Req *r;
+};
+
+enum {
+ Qroot,
+ Qmq,
+ Qpipe,
+ Qorder,
+ Qctl,
+};
+void
+filesettype(File *f, ushort type)
+{
+ /*
+ * Use four most-significant bits to store the type.
+ * This depends on the 9pfile(2) library generating
+ * simple incremental qid paths.
+ */
+ f->qid.path |= (uvlong)(type&0xF)<<60;
+}
+
+ushort
+filetype(File *f)
+{
+ return (f->qid.path>>60)&0xF;
+}
+
+File*
+mqcreate(File *parent, char *name, char *uid, ulong perm)
+{
+ Pipe *pipealloc(Mq*);
+ void *pipeclose(Pipe*);
+ File *d, *ctl, *order;
+ Mq *mq;
+
+ mq = emalloc(sizeof(Mq));
+ mq->pipes = (Pipe*)listalloc();
+ mq->order = (Pipe*)pipealloc(mq);
+ mq->replay = 0;
+
+ ctl = order = nil;
+ if((d = createfile(parent, name, uid, perm, mq)) == nil)
+ goto err;
+ filesettype(d, Qmq);
+
+ if((ctl = createfile(d, "ctl", nil, 0220, mq)) == nil)
+ goto err;
+ filesettype(ctl, Qctl);
+ closefile(ctl);
+
+ if((order = createfile(d, "order", nil, 0444, mq->order)) == nil)
+ goto err;
+ filesettype(order, Qorder);
+ closefile(order);
+
+ return d;
+err:
+ free(mq->pipes);
+ pipeclose(mq->order);
+ if(ctl) closefile(ctl);
+ if(order) closefile(order);
+ return nil;
+}
+
+void
+mqclose(File *f)
+{
+ Mq *mq = f->aux;
+
+ free(mq);
+}
+
+Pipe*
+pipealloc(Mq *mq)
+{
+ Pipe *p;
+
+ p = emalloc(sizeof(Pipe));
+ p->group = mq;
+ p->history = (Write*)listalloc();
+ p->reads = (Read*)listalloc();
+ return p;
+}
+
+void
+pipeclose(Pipe *p)
+{
+ Read *r;
+ Write *w;
+
+ print("pipeclose\n");
+
+ listunlink(p);
+ if(p->reads)
+ foreach(Read*, p->reads){
+ /* eof these? */
+ r = ptr;
+ ptr = (Read*)r->tail;
+ listunlink(r);
+ free(r);
+ }
+ free(p->reads);
+ if(p->history)
+ foreach(Write*, p->history){
+ w = ptr;
+ ptr = (Write*)w->tail;
+ listunlink(w);
+ free(w);
+ }
+ free(p->history);
+ free(p);
+}
+
+File*
+pipecreate(File *parent, char *name, char *uid, ulong perm)
+{
+ File *f;
+ Mq *mq;
+ Pipe *p;
+
+ mq = parent->aux;
+ p = pipealloc(mq);
+ listlink(mq->pipes, p);
+ if((f = createfile(parent, name, uid, perm, p)) == nil){
+ pipeclose(p);
+ return nil;
+ }
+ filesettype(f, Qpipe);
+ return f;
+}
+
+void
+xcreate(Req *r)
+{
+ char *name = r->ifcall.name;
+ char *uid = r->fid->uid;
+ ulong perm = r->ifcall.perm;
+ File *parent = r->fid->file;
+ File *f = nil;
+
+ switch(filetype(parent)){
+ case Qroot:
+ if(!(perm&DMDIR)){
+ respond(r, "forbidden");
+ return;
+ }
+ /* fallthrough */
+ case Qmq:
+ if(perm&DMDIR)
+ f = mqcreate(parent, name, uid, perm);
+ else
+ f = pipecreate(parent, name, uid, perm);
+ break;
+ }
+ if(f == nil)
+ responderror(r);
+ else
+ respond(r, nil);
+}
+
+void
+xopen(Req *r)
+{
+ File *f = r->fid->file;
+
+ switch(filetype(f)){
+ case Qpipe:
+ case Qorder: {
+ Pipe *p = f->aux;
+ Client *c;
+
+ c = r->fid->aux = emalloc(sizeof(Client));
+ if(p->group->replay)
+ c->cursor = (Write*)p->history;
+ else
+ c->cursor = (Write*)p->history->tail;
+ break;
+ }}
+ respond(r, nil);
+}
+
+void
+respondread(Req *r, Write *w)
+{
+ r->ofcall.count = w->count;
+ memmove(r->ofcall.data, w->data, w->count);
+ respond(r, nil);
+}
+
+void
+piperead(Req *r)
+{
+ File *f = r->fid->file;
+ Pipe *p = f->aux;
+ Client *c = r->fid->aux;
+ Read *rd;
+
+ /* Delay the response if there's no history
+ * or if we've already caught up. */
+ if(listempty(p->history) || listend(c->cursor)){
+ rd = emalloc(sizeof(Read));
+ rd->r = r;
+ listlink(p->reads, rd);
+ return;
+ }
+ c->cursor = (Write*)c->cursor->link;
+ respondread(r, c->cursor);
+}
+
+Write*
+writealloc(long n)
+{
+ Write *w;
+
+ w = emalloc(sizeof(Write)+n);
+ w->data = (uchar*)&w[1];
+ return w;
+}
+
+void
+pipewrite(Req *r)
+{
+ File *f = r->fid->file;
+ Pipe *p = f->aux;
+ Mq *mq = p->group;
+ Write *w, *o;
+ long n;
+
+ /* Commit to history */
+ w = writealloc(r->ifcall.count);
+ w->count = r->ifcall.count;
+ w->offset = r->ifcall.offset;
+ memmove(w->data, r->ifcall.data, w->count);
+ listlink(p->history->tail, w);
+
+ /* Commit to order */
+ n = strlen(f->name)+1;
+ o = writealloc(n);
+ o->offset = 0;
+ o->count = n;
+ memmove(o->data, f->name, n);
+ listlink(mq->order->history->tail, o);
+
+ /* Kick the blocked pipe readers */
+ foreach(Read*, p->reads){
+ Client *c = ptr->r->fid->aux;
+
+ respondread(ptr->r, w);
+ c->cursor = w;
+ listunlink(ptr);
+ }
+
+ /* Kick the blocked order readers */
+ foreach(Read*, mq->order->reads){
+ Client *c = ptr->r->fid->aux;
+
+ respondread(ptr->r, o);
+ c->cursor = o;
+ listunlink(ptr);
+ }
+
+ r->ofcall.count = r->ifcall.count;
+ respond(r, nil);
+}
+
+enum {
+ Cmdreplay,
+ Cmddebug, Cmddebug9p,
+};
+Cmdtab mqcmd[] = {
+ /* replay on|off*/
+ {Cmdreplay, "replay", 2},
+
+ /* debug on|off */
+ {Cmddebug, "debug", 2},
+ /* debug9p on|off */
+ {Cmddebug9p, "debug9p", 2},
+};
+
+void
+ctlwrite(Req *r)
+{
+ File *f = r->fid->file;
+ Mq *mq = f->aux;
+ char *e = nil;
+ Cmdbuf *cmd;
+ Cmdtab *t;
+
+ cmd = parsecmd(r->ifcall.data, r->ifcall.count);
+ t = lookupcmd(cmd, mqcmd, nelem(mqcmd));
+ if(t == nil){
+ free(cmd);
+ respondcmderror(r, cmd, "%r");
+ return;
+ }
+ switch(t->index){
+ case Cmdreplay: {
+ if(strncmp(cmd->f[1], "on", 2) == 0)
+ mq->replay = 1;
+ else
+ if(strncmp(cmd->f[1], "off", 3) == 0)
+ mq->replay = 0;
+ else
+ e = "usage: replay on|off";
+ break;
+ }
+ case Cmddebug: {
+ if(strncmp(cmd->f[1], "on", 2) == 0)
+ DEBUG = 1;
+ else
+ if(strncmp(cmd->f[1], "off", 3) == 0)
+ DEBUG = 0;
+ else
+ e = "usage: debug on|off";
+ break;
+ }
+ case Cmddebug9p: {
+ if(strncmp(cmd->f[1], "on", 2) == 0)
+ chatty9p = 1;
+ else
+ if(strncmp(cmd->f[1], "off", 3) == 0)
+ chatty9p = 0;
+ else
+ e = "usage: debug9p on|off";
+ break;
+ }}
+ free(cmd);
+ respond(r, e);
+}
+
+void
+xwrite(Req *r)
+{
+ File *f = r->fid->file;
+
+ switch(filetype(f)){
+ case Qpipe:
+ pipewrite(r);
+ break;
+ case Qctl:
+ ctlwrite(r);
+ break;
+ default:
+ respond(r, "forbidden");
+ return;
+ }
+}
+
+void
+xread(Req *r)
+{
+ File *f = r->fid->file;
+
+ switch(filetype(f)){
+ case Qpipe:
+ case Qorder:
+ piperead(r);
+ break;
+ default:
+ respond(r, "forbidden");
+ }
+}
+
+void
+xflush(Req *r)
+{
+ Req *old = r->oldreq;
+ File *f = old->fid->file;
+
+ switch(filetype(f)){
+ case Qpipe:
+ case Qorder: {
+ Pipe *p = f->aux;
+
+ if(old->ifcall.type != Tread)
+ break;
+ foreach(Read*, p->reads){
+ if(ptr->r == old){
+ free(listunlink(ptr));
+ break;
+ }
+ }
+ respond(old, "interrupted");
+ }}
+ respond(r, nil);
+}
+
+void
+xdestroyfid(Fid *fid)
+{
+ Client *f = fid->aux;
+
+ free(f);
+}
+
+void
+xdestroyfile(File *f)
+{
+ switch(filetype(f)){
+ case Qmq:
+ mqclose(f);
+ break;
+ case Qpipe:
+ pipeclose(f->aux);
+ break;
+ }
+ return;
+}
+
+Srv fs = {
+ .create = xcreate,
+ .open = xopen,
+ .read = xread,
+ .write = xwrite,
+ .flush = xflush,
+ .destroyfid = xdestroyfid,
+};
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-D] [-s name] [-m mtpt]\n", argv0);
+ exits("usage");
+}
+
+void
+main(int argc, char *argv[])
+{
+ char *name = nil;
+ char *mtpt = nil;
+
+ ARGBEGIN{
+ case 's': name = EARGF(usage()); break;
+ case 'm': mtpt = EARGF(usage()); break;
+ case 'D': chatty9p++; break;
+ default: usage();
+ }ARGEND;
+
+ fs.tree = alloctree(nil, nil, DMDIR|0777, xdestroyfile);
+ filesettype(fs.tree->root, Qroot);
+
+ if(name || mtpt){
+ postmountsrv(&fs, name, mtpt, MREPL|MCREATE);
+ exits(nil);
+ }
+ fs.infd = fs.outfd = 0;
+ dup(2, 1);
+ srv(&fs);
+ exits(nil);
+}
--- /dev/null
+++ b/src/util.c
@@ -1,0 +1,39 @@
+#include <u.h>
+#include <libc.h>
+
+#include "util.h"
+
+int DEBUG = 0;
+
+void
+dprint(int force, char *fmt, ...)
+{
+ va_list v;
+
+ va_start(v, fmt);
+ if(DEBUG == 0 && force == 0)
+ return;
+ fprint(2, "debug: ");
+ vfprint(2, fmt, v);
+ va_end(v);
+}
+
+void*
+emalloc(ulong sz)
+{
+ void *v = malloc(sz);
+ if(v == nil)
+ sysfatal("malloc: %r");
+ setmalloctag(v, getcallerpc(&sz));
+ memset(v, 0, sz);
+ return v;
+}
+
+char*
+estrdup(char *s)
+{
+ if((s = strdup(s)) == nil)
+ sysfatal("strdup: %r");
+ setmalloctag(s, getcallerpc(&s));
+ return s;
+}
--- /dev/null
+++ b/src/util.h
@@ -1,0 +1,9 @@
+extern int DEBUG;
+
+#define D(force, code) \
+ if((DEBUG) || (force)) do{code}while(0);
+
+void dprint(int force, char *fmt, ...);
+
+void* emalloc(ulong sz);
+char* estrdup(char *s);