shithub: purgatorio

ref: d09cf07a82cf4ffb846a31d0384e774b4c4661e1
dir: /appl/collab/servers/mpx.b/

View raw version
implement Service;

#
# 1 to many and many to 1 multiplexor
#

include "sys.m";
	sys: Sys;
	Qid: import Sys;

include "styx.m";
	styx: Styx;
	Tmsg, Rmsg: import styx;

include "styxservers.m";
	styxservers: Styxservers;
	Styxserver, Navigator: import styxservers;
	nametree: Nametree;
	Tree: import nametree;

include "service.m";

include "messages.m";
	messages: Messages;
	Msg, Msglist, Readreq, User: import messages;

Qdir, Qroot, Qusers, Qleaf: con iota;

srv: ref Styxserver;
clientidgen := 0;

Einactive: con "not currently active";

toleaf: ref Msglist;
toroot: ref Msglist;
userlist: list of ref User;

user := "inferno";

dir(name: string, perm: int, path: int): Sys->Dir
{
	d := sys->zerodir;
	d.name = name;
	d.uid = user;
	d.gid = user;
	d.qid.path = big path;
	if(perm & Sys->DMDIR)
		d.qid.qtype = Sys->QTDIR;
	else
		d.qid.qtype = Sys->QTFILE;
	d.mode = perm;
	return d;
}

init(nil: list of string): (string, string, ref Sys->FD)
{
	sys = load Sys Sys->PATH;
	styx = load Styx Styx->PATH;
	if(styx == nil)
		return (sys->sprint("can't load %s: %r", Styx->PATH), nil, nil);
	styxservers = load Styxservers Styxservers->PATH;
	if(styxservers == nil)
		return (sys->sprint("can't load %s: %r", Styxservers->PATH), nil, nil);
	nametree = load Nametree Nametree->PATH;
	if(nametree == nil)
		return (sys->sprint("can't load %s: %r", Nametree->PATH), nil, nil);
	styx->init();
	styxservers->init(styx);
styxservers->traceset(1);
	nametree->init();
	messages = load Messages Messages->PATH;
	if(messages == nil)
		return (sys->sprint("can't load %s: %r", Messages->PATH), nil, nil);

	(tree, treeop) := nametree->start();
	tree.create(big Qdir, dir(".", Sys->DMDIR|8r555, Qdir));
	tree.create(big Qdir, dir("leaf", 8r666, Qleaf));
	tree.create(big Qdir, dir("root", 8r666, Qroot));
	tree.create(big Qdir, dir("users", 8r444, Qusers));
	
	p := array [2] of ref Sys->FD;
	if (sys->pipe(p) < 0){
		tree.quit();
		return (sys->sprint("can't create pipe: %r"), nil, nil);
	}

	toleaf = Msglist.new();
	toroot = Msglist.new();

	tc: chan of ref Tmsg;
	(tc, srv) = Styxserver.new(p[1], Navigator.new(treeop), big Qdir);
	spawn mpx(tc, tree);

	return (nil, "/", p[0]);
}

mpx(tc: chan of ref Tmsg, tree: ref Tree)
{
	root: ref User;
	while((tmsg := <-tc) != nil){
		pick tm := tmsg {
		Readerror =>
			break;
		Open =>
			c := srv.getfid(tm.fid);
			if(c == nil || c.isopen){
				srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
				continue;
			}
			case int c.path {
			Qroot =>
				if(root != nil){
					srv.reply(ref Rmsg.Error(tm.tag, sys->sprint("interaction already directed by %s", root.name)));
					continue;
				}
				c = srv.open(tm);
				if (c == nil)
					continue;
				root = ref User(0, tm.fid, c.uname, nil);
				root.initqueue(toroot);
			Qleaf =>
				if(root == nil){
					srv.reply(ref Rmsg.Error(tm.tag, Einactive));
					continue;
				}
				c = srv.open(tm);
				if (c == nil)
					continue;
				userarrives(tm.fid, c.uname);
				# mpxdir[1].qid.vers++;	# TO DO
			* =>
				srv.open(tm);
			}
		Read =>
			c := srv.getfid(tm.fid);
			if (c == nil || !c.isopen) {
				srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
				continue;
			}
			case int c.path {
			Qdir =>
				srv.read(tm);
			Qroot =>
				tm.offset = big 0;
				m := qread(toroot, root, tm, 1);
				if(m != nil)
					srv.reply(ref Rmsg.Read(tm.tag, m.data));
			Qleaf =>
				u := fid2user(tm.fid);
				if (u == nil) {
					srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
					continue;
				}
				tm.offset = big 0;
				m := qread(toleaf, u, tm, 0);
				if(m == nil){
					if(root == nil)
						srv.reply(ref Rmsg.Read(tm.tag, nil));
					else
						qread(toleaf, u, tm, 1);	# put us on the wait queue
				}else
					srv.reply(ref Rmsg.Read(tm.tag, m.data));
			Qusers =>
				srv.reply(styxservers->readstr(tm, usernames()));
			* =>
				srv.reply(ref Rmsg.Error(tm.tag, "phase error -- bad path"));
			}
		Write =>
			c := srv.getfid(tm.fid);
			if (c == nil || !c.isopen) {
				srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
				continue;
			}
			case int c.path {
			Qroot =>
				qwrite(toleaf, msg(root, 'M', tm.data));
				srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
			Qleaf =>
				u := fid2user(tm.fid);
				if(u == nil) {
					srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
					continue;
				}
				if(root == nil){
					srv.reply(ref Rmsg.Error(tm.tag, Einactive));
					continue;
				}
				qwrite(toroot, msg(u, 'm', tm.data));
				srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
			* =>
				srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Eperm));
			}
		Flush =>
			cancelpending(tm.tag);
			srv.reply(ref Rmsg.Flush(tm.tag));
		Clunk =>
			c := srv.getfid(tm.fid);
			if(c.isopen){
				case int c.path {
				Qroot =>
					# shut down?
					qwrite(toleaf, msg(root, 'L', nil));
					root = nil;
				Qleaf =>
					userleaves(tm.fid);
					# mpxdir[1].qid.vers++;	# TO DO
				}
			}
		* =>
			srv.default(tmsg);
		}
	}
	tree.quit();
	sys->print("mpx exit\n");
}

mpxseqgen := 0;

time(): int
{
	return ++mpxseqgen;	# server time; assumes 2^31-1 is large enough
}

userarrives(fid: int, name: string)
{
	u := User.new(fid, name);
	qwrite(toroot, msg(u, 'a', nil));
	u.initqueue(toleaf);	# sees leaf messages from now on
	userlist = u :: userlist;
}

fid2user(fid: int): ref User
{
	for(ul := userlist; ul != nil; ul = tl ul)
		if((u := hd ul).fid == fid)
			return u;
	return nil;
}

userleaves(fid: int)
{
	ul := userlist;
	userlist = nil;
	u: ref User;
	for(; ul != nil; ul = tl ul)
		if((hd ul).fid != fid)
			userlist = hd ul :: userlist;
		else
			u = hd ul;
	if(u != nil)
		qwrite(toroot, msg(u, 'l', nil));
}

usernames(): string
{
	s := "";
	for(ul := userlist; ul != nil; ul = tl ul){
		u := hd ul;
		s += string u.id+" "+u.name+"\n";
	}
	return s;
}

qwrite(msgs: ref Msglist, m: ref Msg)
{
	pending := msgs.write(m);
	for(; pending != nil; pending = tl pending){
		(u, req) := hd pending;
		m = u.read();	# must succeed, or the code is wrong
		data := m.data;
		if(req.count < len data)
			data = data[0:req.count];
		srv.reply(ref Rmsg.Read(req.tag, data));
	}
}

qread(msgs: ref Msglist, u: ref User, tm: ref Tmsg.Read, wait: int): ref Msg
{
	m := u.read();
	if(m != nil){
		if(tm.count < len m.data)
			m.data = m.data[0:tm.count];
	}else if(wait)
		msgs.wait(u, ref Readreq(tm.tag, tm.fid, tm.count, tm.offset));
	return m;
}

cancelpending(tag: int)
{
	toroot.flushtag(tag);
	toleaf.flushtag(tag);
}

msg(u: ref User, op: int, data: array of byte): ref Msg
{
	a := sys->aprint("%ud %d %c %s ", time(), u.id, op, u.name);
	m := ref Msg(u, array[len a + len data] of byte, nil);
	m.data[0:] = a;
	m.data[len a:] = data;
	return m;
}