shithub: dddb

ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/ctlfs.b/

View raw version
include "dial.m";
	dial: Dial;

include "security.m";
	auth: Auth;

include "keyring.m";
	keyring: Keyring;

include "styx.m";
	styx: Styx;
	Tmsg, Rmsg: import Styx;

include "styxservers.m";
	styxservers: Styxservers;
	nametree: Nametree;
	Tree: import nametree;
	Styxserver, Fid, Navigator, Navop,
	Eperm, Ecount, Eoffset, Ebadarg: import styxservers;

# Database features
dbfeatures: list of string;

# Initial fs files
Qroot, Qctl, Qname, Qstatus, Qstorage, Qnodes: con big iota;

# helper functions
is_nonempty(s: string): int
{
	if(len s == 0)
		return 0;
	return 1;
}

# create ctlfs and the appropriate listeners
run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
	sys->fprint(stderr, "setting up ctlfs\n");

	dbfeatures = DBVER :: "export" :: dbfeatures;

	styx = load Styx Styx->PATH;
	styxservers = load Styxservers Styxservers->PATH;
	nametree = load Nametree Nametree->PATH;

	if(debug)
		sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");

	if(styx == nil)
		error("ctlfs: styx module not found");
	if(styxservers == nil)
		error("ctlfs: styxservers module not found");
	if(nametree == nil)
		error("ctlfs: nametree module not found");

	if(debug)
		sys->fprint(stderr, "ctlfs: initializing modules\n");

	auth->init();

	styx->init();
	styxservers->init(styx);

	nametree->init();

	# authinfo init

	authinfo: ref Keyring->Authinfo;
	if (keyfile == nil)
		keyfile = "/usr/" + user() + "/keyring/default";
	if(debug)
		sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
	authinfo = keyring->readauthinfo(keyfile);
	if (authinfo == nil)
		error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));

	# announcing
	if(debug)
		sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
	# addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
	c := dial->announce(cfg.addr);
	if(c == nil)
		error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));

	# bootstrapping
	if(debug)
		sys->fprint(stderr, "ctlfs: bootstrapping\n");
	sys->unmount(nil, "/mnt/keys");
	sys->unmount(nil, "/mnt");

	sys->fprint(stderr, "ctlfs: finished setting up; starting\n");

	# listener entrypoint
	ctlfs_listener(cfg, dbreg, c, authinfo, algs);
}

# dddbctl listener loop
ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string)
{
	loop: for (;;) {
		nc := dial->listen(c);
		if (nc == nil)
			error(sys->sprint("listen failed: %r"));
		if (debug)
			sys->fprint(stderr, "ctlfs: got connection from %s",
						readfile(nc.dir + "/remote"));
		dfd := dial->accept(nc);
		if (dfd == nil)
			continue loop;

		if(nc.cfd != nil)
			sys->fprint(nc.cfd, "keepalive");

		hostname: string;
		hostname = readfile(nc.dir + "/remote");
		if(hostname != nil)
			hostname = hostname[0:len hostname - 1];

		regchan := dbreg.changen();
		spawn ctlfs_authenticator(cfg, nametree, regchan, dfd, authinfo, algs, hostname);
	}
}

# authenticate a connection and set the user id.
ctlfs_authenticator(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo, algs: list of string, hostname: string)
{
	# authenticate and change user id appropriately
	(fd, err) := auth->server(algs, authinfo, dfd, 1);
	if (fd == nil) {
		if (debug)
			sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
		return;
	}
	if (debug)
		sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);

	spawn ctlfs_loop(cfg, nametree, regchan, fd, hostname);
}

# filesystem loop; nb: hostname will be later used for stats
ctlfs_loop(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, nil: string)
{
	# nametree; this is per mount
	(tree, treeop) := nametree->start();
	tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
	tree.create(Qroot, dir("ctl", 8r640, Qctl));
	tree.create(Qroot, dir("name", 8r444, Qname));
	tree.create(Qroot, dir("status", 8r440, Qstatus));
	tree.create(Qroot, dir("storage", 8r555|Sys->DMDIR, Qstorage));
	tree.create(Qroot, dir("nodes", 8r555|Sys->DMDIR, Qnodes));

	# styxserver start
	(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);

	# registry rx/tx
	(tx, rx) := regchan;

	(btos, nil) := convcs->getbtos(DCS);

	# Primary server loop
	loop:
	while((tm := <-tc) != nil) {
		# Switch on operations being performed on a given Fid
		pick t := tm {
		# Open operation
		Open =>
			srv.default(t);
		# Read operation
		Read =>
			(f, err) := srv.canread(t);
			if(f == nil) {
				srv.reply(ref Rmsg.Error(t.tag, err));
				break;
			}
			if(f.qtype & Sys->QTDIR){
				srv.read(t);
				break;
			}

			case f.path {

			# Qctl
			Qctl =>
				ctlmsg := joinstr(dbfeatures, "\n") + "\n";
				ctlmsgbuf := array of byte ctlmsg;
				rend := int t.offset + t.count;
				if(rend > len ctlmsg)
					rend = len ctlmsg;
				srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));

			# Qname
			Qname =>
				namemsg := cfg.name + "\n";
				namemsgbuf := array of byte namemsg;
				rend := int t.offset + t.count;
				if(rend > len namemsg)
					rend = len namemsg;
				srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
			# Qstatus
			Qstatus =>
				info: list of string;
				info = "name		" + cfg.name :: info;
				info = "sysname		" + cfg.sysn :: info;
				info = "addr		" + cfg.addr :: info;
				info = "storage		" + cfg.storage :: info;
				info = "fsworkers	" + sys->sprint("%d", cfg.fswrks) :: info;
				info = "" :: info;
				info = "nodes" :: info;

				tx <-= ref RegTMsg.GetNodes();
				reply := <- rx;

				pick r := reply {
					Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
					NodeList =>
						names := lists->reverse(r.names);
						while(len names != 0) {
							node := hd names;
							sline := "";

							tx <-= ref RegTMsg.Check(node);
							crep := <- rx;
							pick cr := crep {
								Error => sline = cr.err;
								Status =>
									up := cr.count;
									ps := cr.poolsize;
									sline = sys->sprint("%d	%d", up, ps);
								* => sline = "unsupported message";
							}

							info = node + "		" +  sline :: info;
							names = tl names;
						}
					* => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
				}

				statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
				statusmsgbuf := array of byte statusmsg;
				rend := int t.offset + t.count;
				if(rend > len statusmsg)
					rend = len statusmsg;
				srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));

			# Default reply
			* => srv.default(t);
			}
		# Write operation
		Write =>
			(f, nil) := srv.canwrite(t);
			if(f == nil) {
				srv.reply(ref Rmsg.Error(t.tag, Eperm));
				break;
			}

			case f.path {
			# Qctl
			Qctl =>
				(nil, csargs, nil) := btos->btos(nil, t.data, len t.data);
				(nil, crargs) := sys->tokenize(csargs, " 	\n");
				cargs := lists->filter(is_nonempty, crargs);
				case hd cargs {
				"refresh" =>
					case len cargs {
					1 =>
						tx <-= ref RegTMsg.RefreshAll();
						<-rx;
						srv.reply(ref Rmsg.Write(t.tag, len t.data));
					* =>
						pools := tl cargs;
						for(i := 0; i < len pools; i++) {
							pool := hd pools;
							pools = tl pools;
							tx <-= ref RegTMsg.Refresh(pool);
							<-rx;
						}
						srv.reply(ref Rmsg.Write(t.tag, len t.data));
					}
				"check" =>
					case len cargs {	
					1 => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
					* =>
						pools := tl cargs;
						for(i := 0; i < len pools; i++) {
							pool := hd pools;
							pools = tl pools;
							tx <-= ref RegTMsg.Check(pool);
							<-rx;
						}
						srv.reply(ref Rmsg.Write(t.tag, len t.data));
					}
				"close" =>
					case len tl cargs {
					1 =>
						pool := hd tl cargs;
						tx <-= ref RegTMsg.Close(pool);
						<-rx;
						srv.reply(ref Rmsg.Write(t.tag, len t.data));
					* => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
					}
				* => 
					if(hd cargs != "refresh") {
						sys->fprint(stderr, "is refresh\n");
					} 
					srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
				}
			# Default reply
			* => srv.default(t);
			}

		# Default action
		* => srv.default(t);
		}
	}

	tree.quit();
}