shithub: mq

Download patch

ref: 5d36e30a4110e8c39daec37ddaa1a55e19ab8e02
author: kvik <kvik@a-b.xyz>
date: Mon Aug 31 08:44:24 EDT 2020

init

--- /dev/null
+++ b/LICENSE
@@ -1,0 +1,19 @@
+Copyright (c) 2020 kvik@a-b.xyz
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
--- /dev/null
+++ b/mkfile
@@ -1,0 +1,61 @@
+</$objtype/mkfile
+
+HFILES=`{test -d src && walk -f src | grep '\.h$'}
+CFILES=`{test -d src && walk -f src | grep '\.c$'}
+CMAIN=`{grep -l '^(thread)?main\(' $CFILES /dev/null}
+CCOM=`{grep -L '^(thread)?main\(' $CFILES /dev/null | sed '/^\/dev\/null/d'}
+OCOM=${CCOM:src/%.c=obj/$objtype/%.o}
+
+BINTARG=${CMAIN:src/%.c=bin/$objtype/%}
+RCFILES=`{test -d rc && walk -f rc}
+MANFILES=`{test -d man && walk -n 2,2 -f man}
+
+BIN=/$objtype/bin
+RC=/rc/bin
+MAN=/sys/man
+
+DIRS=bin obj bin/$objtype obj/$objtype
+
+BININST=${BINTARG:bin/$objtype/%=$BIN/%}
+RCINST=${RCFILES:rc/%=$RC/%}
+MANINST=${MANFILES:man/%=$MAN/%}
+INST=$BININST $RCINST $MANINST
+
+none:V: all
+
+$DIRS:
+	mkdir -p $target
+
+obj/$objtype/%.o: obj/$objtype $HFILES
+
+obj/$objtype/%.o: src/%.c
+	$CC $CFLAGS -o $target src/$stem.c
+
+bin/$objtype/%: bin/$objtype obj/$objtype/%.o $OCOM
+	$LD $LDFLAGS -o $target obj/$objtype/$stem.o $OCOM
+
+$BIN/%: bin/$objtype/%
+	cp $prereq $target
+
+$RC/%: rc/%
+	cp -x $prereq $target
+
+/sys/man/%: man/%
+	cp $prereq $target
+
+man:V: $MANINST
+
+%.cpus:V:
+	for(objtype in $CPUS) mk $MKFLAGS $stem
+
+all:V: $BINTARG
+
+install:V: $INST
+
+installall:V: install.cpus
+
+uninstall:V:
+	rm -f $INST
+
+clean:V:
+	rm -rf bin obj
--- /dev/null
+++ b/rc/attach
@@ -1,0 +1,9 @@
+#!/bin/rc -e
+rfork e
+flagfmt = ''; args = 'name'
+eval `''{aux/getflags $*} || exec aux/usage
+if(! ~ $#* 1) exec aux/usage
+
+mq = $1
+mq-cat $mq &
+exec cat >>$mq/0
--- /dev/null
+++ b/rc/detach
@@ -1,0 +1,14 @@
+#!/bin/rc -e
+rfork e
+flagfmt = 'c:clean'; args = 'name'
+eval `''{aux/getflags $*} || exec aux/usage
+if(! ~ $#* 1) exec aux/usage
+
+mq = $1
+mkdir $mq
+touch $mq/^(0 1 2)
+echo replay on >$mq/ctl
+
+prompt=($mq'% ' '	')
+rc -i <$mq/0 >>$mq/1 >>[2]$mq/2 &
+exec attach $mq
--- /dev/null
+++ b/src/list.c
@@ -1,0 +1,47 @@
+#include <u.h>
+#include <libc.h>
+
+#include "list.h"
+#include "util.h"
+
+List*
+listalloc(void)
+{
+	List *n;
+
+	n = emalloc(sizeof(List));
+	n->tag = Listlead;
+	n->link = n;
+	n->tail = n;
+	return n;
+}
+
+List*
+listlink(List *p, List *n)
+{
+	n->link = p->link;
+	p->link = n;
+	n->tail = p;
+	n->link->tail = n;
+	return n;
+}
+
+List*
+listunlink(List *p)
+{
+	p->link->tail = p->tail;
+	p->tail->link = p->link;
+	return p;
+}
+
+int
+listend(List *p)
+{
+	return p->link->tag == Listlead;
+}
+
+int
+listempty(List *p)
+{
+	return p->link == p;
+}
--- /dev/null
+++ b/src/list.h
@@ -1,0 +1,20 @@
+enum { Listlead = 0xAA };
+
+typedef struct List List;
+
+/* Must be embedded at the top of struct */
+struct List {
+	uchar tag;
+	List *link;
+	List *tail;
+};
+
+/* What. */
+#define foreach(type, list) \
+	for(type ptr = (type)(list)->link; ptr->tag != Listlead; ptr = (type)ptr->link)
+
+List* listalloc(void);
+List* listlink(List*, List*);
+List* listunlink(List*);
+int listempty(List*);
+int listend(List*);
--- /dev/null
+++ b/src/mq-cat.c
@@ -1,0 +1,111 @@
+#include <u.h>
+#include <libc.h>
+
+#include "util.h"
+
+typedef struct Pipe Pipe;
+
+struct Pipe {
+	char *name;
+	int fd;
+};
+
+int npipes;
+Pipe *pipes;
+
+char buf[8192];
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s mq\n", argv0);
+	exits("usage");
+}
+
+int
+eopen(char *s, int m)
+{
+	int fd;
+
+	if((fd = open(s, m)) < 0)
+		sysfatal("open: %r");
+	return fd;
+}
+
+int
+openmq(char *name)
+{
+	int mqfd, n, ismq;
+	Dir *dirs, *d;
+	Pipe *p;
+
+	mqfd = eopen(name, OREAD);
+	if((n = dirreadall(mqfd, &dirs)) == -1)
+		sysfatal("dirread: %r");
+	if(n == 0)
+		return -1;
+	close(mqfd);
+
+	ismq = 0;
+	npipes = n - 2;
+	pipes = p = emalloc(npipes*sizeof(Pipe));
+	for(d = dirs; n--; d++){
+		if(strncmp(d->name, "ctl", 3) == 0
+		|| strncmp(d->name, "order", 5) == 0){
+			ismq++;
+			continue;
+		}
+		p->name = estrdup(d->name);
+		p->fd = eopen(d->name, OREAD);
+		p++;
+	}
+	free(dirs);
+	if(ismq != 2)
+		return -1;
+	return eopen("order", OREAD);
+}
+
+long
+rdwr(int fd0, int fd1)
+{
+	long n;
+	
+	if((n = read(fd0, buf, sizeof buf)) == -1)
+		sysfatal("read: %r");
+	if(n == 0)
+		return 0;
+	if(write(fd1, buf, n) != n)
+		sysfatal("write: %r");
+	return n;
+}
+
+void
+main(int argc, char *argv[])
+{
+	int orderfd, n, i;
+	char name[512+1];
+	Pipe *p;
+
+	ARGBEGIN{
+	default: usage();
+	}ARGEND;
+	if(argc != 1) usage();
+
+	if(chdir(argv[0]) == -1)
+		sysfatal("chdir: %r");
+	if((orderfd = openmq(".")) == -1)
+		sysfatal("not mq");
+	for(;;){
+		if((n = read(orderfd, name, sizeof(name)-1)) == 0)
+			break;
+		buf[n] = 0;
+		for(i = 0, p = pipes; i < npipes; i++, p++)
+			if(strcmp(p->name, name) == 0)
+			if(p->fd != -1){
+				if(rdwr(p->fd, 1) == 0)
+					p->fd = -1;
+				break;
+			}
+	}
+	exits(nil);
+}
--- /dev/null
+++ b/src/mq.c
@@ -1,0 +1,499 @@
+#include <u.h>
+#include <libc.h>
+#include <fcall.h>
+#include <thread.h>
+#include <9p.h>
+
+#include "list.h"
+#include "util.h"
+
+typedef struct Client Client;
+typedef struct Mq Mq;
+typedef struct Pipe Pipe;
+typedef struct Write Write;
+typedef struct Read Read;
+typedef struct Order Order;
+
+struct Client {
+	Write *cursor; /* reader position */
+};
+
+struct Mq {
+	Pipe *pipes;
+	Pipe *order;
+
+	/* configuration */
+	int replay;
+};
+
+struct Pipe {
+	List;
+
+	Mq *group; /* membership */
+
+	Write *history; /* stored messages */
+	Read *reads; /* readers queue */
+};
+
+struct Write {
+	List;
+
+	/* Twrite.ifcall */
+	vlong offset; /* ignored */
+	uint count;
+	uchar *data;
+};
+
+struct Read {
+	List;
+
+	Req *r;
+};
+
+enum {
+	Qroot,
+		Qmq,
+			Qpipe,
+			Qorder,
+			Qctl,
+};
+void
+filesettype(File *f, ushort type)
+{
+	/*
+	 * Use four most-significant bits to store the type.
+	 * This depends on the 9pfile(2) library generating
+	 * simple incremental qid paths.
+	*/
+	f->qid.path |= (uvlong)(type&0xF)<<60;
+}
+
+ushort
+filetype(File *f)
+{
+	return (f->qid.path>>60)&0xF;
+}
+
+File*
+mqcreate(File *parent, char *name, char *uid, ulong perm)
+{
+	Pipe *pipealloc(Mq*);
+	void *pipeclose(Pipe*);
+	File *d, *ctl, *order;
+	Mq *mq;
+
+	mq = emalloc(sizeof(Mq));
+	mq->pipes = (Pipe*)listalloc();
+	mq->order = (Pipe*)pipealloc(mq);
+	mq->replay = 0;
+
+	ctl = order = nil;
+	if((d = createfile(parent, name, uid, perm, mq)) == nil)
+		goto err;
+	filesettype(d, Qmq);
+
+	if((ctl = createfile(d, "ctl", nil, 0220, mq)) == nil)
+		goto err;
+	filesettype(ctl, Qctl);
+	closefile(ctl);
+
+	if((order = createfile(d, "order", nil, 0444, mq->order)) == nil)
+		goto err;
+	filesettype(order, Qorder);
+	closefile(order);
+
+	return d;
+err:
+	free(mq->pipes);
+	pipeclose(mq->order);
+	if(ctl) closefile(ctl);
+	if(order) closefile(order);
+	return nil;
+}
+
+void
+mqclose(File *f)
+{
+	Mq *mq = f->aux;
+
+	free(mq);
+}
+
+Pipe*
+pipealloc(Mq *mq)
+{
+	Pipe *p;
+	
+	p = emalloc(sizeof(Pipe));
+	p->group = mq;
+	p->history = (Write*)listalloc();
+	p->reads = (Read*)listalloc();
+	return p;
+}
+
+void
+pipeclose(Pipe *p)
+{
+	Read *r;
+	Write *w;
+
+	print("pipeclose\n");
+
+	listunlink(p);
+	if(p->reads)
+	foreach(Read*, p->reads){
+		/* eof these? */
+		r = ptr;
+		ptr = (Read*)r->tail;
+		listunlink(r);
+		free(r);
+	}
+	free(p->reads);
+	if(p->history)
+	foreach(Write*, p->history){
+		w = ptr;
+		ptr = (Write*)w->tail;
+		listunlink(w);
+		free(w);
+	}
+	free(p->history);
+	free(p);
+}
+
+File*
+pipecreate(File *parent, char *name, char *uid, ulong perm)
+{
+	File *f;
+	Mq *mq;
+	Pipe *p;
+
+	mq = parent->aux;
+	p = pipealloc(mq);
+	listlink(mq->pipes, p);
+	if((f = createfile(parent, name, uid, perm, p)) == nil){
+		pipeclose(p);
+		return nil;
+	}
+	filesettype(f, Qpipe);
+	return f;
+}
+
+void
+xcreate(Req *r)
+{
+	char *name = r->ifcall.name;
+	char *uid = r->fid->uid;
+	ulong perm = r->ifcall.perm;
+	File *parent = r->fid->file;
+	File *f = nil;
+
+	switch(filetype(parent)){
+	case Qroot:
+		if(!(perm&DMDIR)){
+			respond(r, "forbidden");
+			return;
+		}
+		/* fallthrough */
+	case Qmq:
+		if(perm&DMDIR)
+			f = mqcreate(parent, name, uid, perm);
+		else
+			f = pipecreate(parent, name, uid, perm);
+		break;
+	}
+	if(f == nil)
+		responderror(r);
+	else
+		respond(r, nil);
+}
+
+void
+xopen(Req *r)
+{
+	File *f = r->fid->file;
+
+	switch(filetype(f)){
+	case Qpipe:
+	case Qorder: {
+		Pipe *p = f->aux;
+		Client *c;
+
+		c = r->fid->aux = emalloc(sizeof(Client));
+		if(p->group->replay)
+			c->cursor = (Write*)p->history;
+		else
+			c->cursor = (Write*)p->history->tail;
+		break;
+	}}
+	respond(r, nil);
+}
+
+void
+respondread(Req *r, Write *w)
+{
+	r->ofcall.count = w->count;
+	memmove(r->ofcall.data, w->data, w->count);
+	respond(r, nil);
+}
+
+void
+piperead(Req *r)
+{
+	File *f = r->fid->file;
+	Pipe *p = f->aux;
+	Client *c = r->fid->aux;
+	Read *rd;
+
+	/* Delay the response if there's no history
+	 * or if we've already caught up. */
+	if(listempty(p->history) || listend(c->cursor)){
+		rd = emalloc(sizeof(Read));
+		rd->r = r;
+		listlink(p->reads, rd);
+		return;
+	}
+	c->cursor = (Write*)c->cursor->link;
+	respondread(r, c->cursor);
+}
+
+Write*
+writealloc(long n)
+{
+	Write *w;
+	
+	w = emalloc(sizeof(Write)+n);
+	w->data = (uchar*)&w[1];
+	return w;
+}
+
+void
+pipewrite(Req *r)
+{
+	File *f = r->fid->file;
+	Pipe *p = f->aux;
+	Mq *mq = p->group;
+	Write *w, *o;
+	long n;
+
+	/* Commit to history */
+	w = writealloc(r->ifcall.count);
+	w->count = r->ifcall.count;
+	w->offset = r->ifcall.offset;
+	memmove(w->data, r->ifcall.data, w->count);
+	listlink(p->history->tail, w);
+
+	/* Commit to order */
+	n = strlen(f->name)+1;
+	o = writealloc(n);
+	o->offset = 0;
+	o->count = n;
+	memmove(o->data, f->name, n);
+	listlink(mq->order->history->tail, o);
+
+	/* Kick the blocked pipe readers */
+	foreach(Read*, p->reads){
+		Client *c = ptr->r->fid->aux;
+
+		respondread(ptr->r, w);
+		c->cursor = w;
+		listunlink(ptr);
+	}
+
+	/* Kick the blocked order readers */
+	foreach(Read*, mq->order->reads){
+		Client *c = ptr->r->fid->aux;
+
+		respondread(ptr->r, o);
+		c->cursor = o;
+		listunlink(ptr);
+	}
+
+	r->ofcall.count = r->ifcall.count;
+	respond(r, nil);
+}
+
+enum {
+	Cmdreplay,
+	Cmddebug, Cmddebug9p,
+};
+Cmdtab mqcmd[] = {
+	/* replay on|off*/
+	{Cmdreplay, "replay", 2},
+
+	/* debug on|off */
+	{Cmddebug, "debug", 2},
+	/* debug9p on|off */
+	{Cmddebug9p, "debug9p", 2},
+};
+
+void
+ctlwrite(Req *r)
+{
+	File *f = r->fid->file;
+	Mq *mq = f->aux;
+	char *e = nil;
+	Cmdbuf *cmd;
+	Cmdtab *t;
+
+	cmd = parsecmd(r->ifcall.data, r->ifcall.count);
+	t = lookupcmd(cmd, mqcmd, nelem(mqcmd));
+	if(t == nil){
+		free(cmd);
+		respondcmderror(r, cmd, "%r");
+		return;
+	}
+	switch(t->index){
+	case Cmdreplay: {
+		if(strncmp(cmd->f[1], "on", 2) == 0)
+			mq->replay = 1;
+		else
+		if(strncmp(cmd->f[1], "off", 3) == 0)
+			mq->replay = 0;
+		else
+			e = "usage: replay on|off";
+		break;
+	}
+	case Cmddebug: {
+		if(strncmp(cmd->f[1], "on", 2) == 0)
+			DEBUG = 1;
+		else
+		if(strncmp(cmd->f[1], "off", 3) == 0)
+			DEBUG = 0;
+		else
+			e = "usage: debug on|off";
+		break;
+	}
+	case Cmddebug9p: {
+		if(strncmp(cmd->f[1], "on", 2) == 0)
+			chatty9p = 1;
+		else
+		if(strncmp(cmd->f[1], "off", 3) == 0)
+			chatty9p = 0;
+		else
+			e = "usage: debug9p on|off";
+		break;
+	}}
+	free(cmd);
+	respond(r, e);
+}
+
+void
+xwrite(Req *r)
+{
+	File *f = r->fid->file;
+
+	switch(filetype(f)){
+	case Qpipe:
+		pipewrite(r);
+		break;
+	case Qctl:
+		ctlwrite(r);
+		break;
+	default:
+		respond(r, "forbidden");
+		return;
+	}
+}
+
+void
+xread(Req *r)
+{
+	File *f = r->fid->file;
+
+	switch(filetype(f)){
+	case Qpipe:
+	case Qorder:
+		piperead(r);
+		break;
+	default:
+		respond(r, "forbidden");
+	}
+}
+
+void
+xflush(Req *r)
+{
+	Req *old = r->oldreq;
+	File *f = old->fid->file;
+
+	switch(filetype(f)){
+	case Qpipe:
+	case Qorder: {
+		Pipe *p = f->aux;
+
+		if(old->ifcall.type != Tread)
+			break;
+		foreach(Read*, p->reads){
+			if(ptr->r == old){
+				free(listunlink(ptr));
+				break;
+			}
+		}
+		respond(old, "interrupted");
+	}}
+	respond(r, nil);
+}
+
+void
+xdestroyfid(Fid *fid)
+{
+	Client *f = fid->aux;
+
+	free(f);
+}
+
+void
+xdestroyfile(File *f)
+{
+	switch(filetype(f)){
+	case Qmq:
+		mqclose(f);
+		break;
+	case Qpipe:
+		pipeclose(f->aux);
+		break;
+	}
+	return;
+}
+
+Srv fs = {
+	.create = xcreate,
+	.open = xopen,
+	.read = xread,
+	.write = xwrite,
+	.flush = xflush,
+	.destroyfid = xdestroyfid,
+};
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-D] [-s name] [-m mtpt]\n", argv0);
+	exits("usage");
+}
+
+void
+main(int argc, char *argv[])
+{
+	char *name = nil;
+	char *mtpt = nil;
+
+	ARGBEGIN{
+	case 's': name = EARGF(usage()); break;
+	case 'm': mtpt = EARGF(usage()); break;
+	case 'D': chatty9p++; break;
+	default: usage();
+	}ARGEND;
+
+	fs.tree = alloctree(nil, nil, DMDIR|0777, xdestroyfile);
+	filesettype(fs.tree->root, Qroot);
+
+	if(name || mtpt){
+		postmountsrv(&fs, name, mtpt, MREPL|MCREATE);
+		exits(nil);
+	}
+	fs.infd = fs.outfd = 0;
+	dup(2, 1);
+	srv(&fs);
+	exits(nil);
+}
--- /dev/null
+++ b/src/util.c
@@ -1,0 +1,39 @@
+#include <u.h>
+#include <libc.h>
+
+#include "util.h"
+
+int DEBUG = 0;
+
+void
+dprint(int force, char *fmt, ...)
+{
+	va_list v;
+
+	va_start(v, fmt);
+	if(DEBUG == 0 && force == 0)
+		return;
+	fprint(2, "debug: ");
+	vfprint(2, fmt, v);
+	va_end(v);
+}
+
+void*
+emalloc(ulong sz)
+{
+	void *v = malloc(sz);
+	if(v == nil)
+		sysfatal("malloc: %r");
+	setmalloctag(v, getcallerpc(&sz));
+	memset(v, 0, sz);
+	return v;
+}
+
+char*
+estrdup(char *s)
+{
+	if((s = strdup(s)) == nil)
+		sysfatal("strdup: %r");
+	setmalloctag(s, getcallerpc(&s));
+	return s;
+}
--- /dev/null
+++ b/src/util.h
@@ -1,0 +1,9 @@
+extern int DEBUG;
+
+#define D(force, code) \
+	if((DEBUG) || (force)) do{code}while(0);
+
+void dprint(int force, char *fmt, ...);
+
+void* emalloc(ulong sz);
+char* estrdup(char *s);