shithub: dddb

Download patch

ref: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
parent: aee91c4ed0277e343a62cd124ee7175f54ffefb3
author: k <k@santiago>
date: Sun Jul 30 07:19:02 EDT 2023

added a basic node registry

--- a/appl/cmd/config.b
+++ b/appl/cmd/config.b
@@ -7,6 +7,7 @@
 DCFGPATH:	con "/lib/ndb/dddbcfg";
 
 DADDR:		con "tcp!*!dddbctl";
+DPSIZE:		con 3;
 DFSWRKS:	con 10;
 
 # Attrdb constants 
@@ -13,7 +14,9 @@
 KNAME:		con "nodename";
 KSYSNAME:	con "nodesysn";
 KADDR:		con "addr";
+KKEYFILE:	con "keyfile";
 KSTORAGE:	con "storage";
+KPSIZE:		con "psize";
 KFSWRKS:	con "readworkers";
 
 Config.open(nodename: string, path: string): Config
@@ -95,8 +98,11 @@
 	sysname := entry.findfirst(KSYSNAME);
 	addr := entry.findfirst(KADDR);
 	storage := entry.findfirst(KSTORAGE);
+	keyfile := entry.findfirst(KKEYFILE);
+	psize_s := entry.findfirst(KPSIZE);
 	fswrks_s := entry.findfirst(KFSWRKS);
 
+	psize := DPSIZE;
 	fswrks := DFSWRKS;
 
 	if(len name == 0)
@@ -111,8 +117,14 @@
 			error(sys->sprint("malformed fs workers count: %s", fswrks_s));
 		fswrks = fswrks_i;
 	}
+	if(len psize_s != 0) {
+		(psize_i, rm) := strm->toint(psize_s, 10);
+		if(rm != "")
+			error(sys->sprint("malformed pool size: %s", psize_s));
+		psize = psize_i;
+	}
 
 	return NodeConfig(
-		name, sysname, addr, storage,	# basic information
-		fswrks);						# tunable options
+		name, sysname, addr, keyfile, storage,	# basic information
+		psize, fswrks);							# tunable options
 }
--- a/appl/cmd/ctlfs.b
+++ b/appl/cmd/ctlfs.b
@@ -4,6 +4,9 @@
 include "security.m";
 	auth: Auth;
 
+include "keyring.m";
+	keyring: Keyring;
+
 include "styx.m";
 	styx: Styx;
 	Tmsg, Rmsg: import Styx;
@@ -10,100 +13,115 @@
 
 include "styxservers.m";
 	styxservers: Styxservers;
-	Styxserver, Fid, Navigator,
-	Navop, Enotfound, Enotdir: import styxservers;
+	nametree: Nametree;
+	Tree: import nametree;
+	Styxserver, Fid, Navigator, Navop,
+	Eperm, Ecount, Eoffset: import styxservers;
 
-# FS file index
-Qroot, Qctl, Qstats, Qmax: con iota;
-tab := array[] of {
-	(Qroot, ".", Sys->DMDIR|8r555),
-	(Qctl, "ctl", 8r222),
-	(Qstats, "stats", 8r111),
-};
+# Database features
+dbfeatures: list of string;
 
+# Initial fs files
+Qroot, Qctl, Qname, Qstatus: con big iota;
+
 # create ctlfs and the appropriate listeners
-init_ctlfs(cfg: Config, keyfile: string, algs: list of string)
+run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
 {
-	dial = load Dial Dial->PATH;
-	auth = load Auth Auth->PATH;
+	sys->fprint(stderr, "setting up ctlfs\n");
+
+	dbfeatures = DBVER :: dbfeatures;
+
 	styx = load Styx Styx->PATH;
+	styxservers = load Styxservers Styxservers->PATH;
+	nametree = load Nametree Nametree->PATH;
 
-	if(dial == nil)
-		error("ctlfs: dial module not found");
-	if(auth == nil)
-		error("ctlfs: auth module not found");
+	if(debug)
+		sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");
+
 	if(styx == nil)
 		error("ctlfs: styx module not found");
+	if(styxservers == nil)
+		error("ctlfs: styxservers module not found");
+	if(nametree == nil)
+		error("ctlfs: nametree module not found");
 
+	if(debug)
+		sys->fprint(stderr, "ctlfs: initializing modules\n");
+
 	auth->init();
 
 	styx->init();
 	styxservers->init(styx);
-	styxservers->traceset(chatty);
 
+	nametree->init();
+
 	# authinfo init
-	if(debug)
-		sys->fprint(stderr, "ctlfs: reading authinfo");
+
 	authinfo: ref Keyring->Authinfo;
-	if (doauth) {
-		if (keyfile == nil)
-			keyfile = "/usr/" + user() + "/keyring/default";
-		authinfo = keyring->readauthinfo(keyfile);
-		if (authinfo == nil)
-			error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
-	}
+	if (keyfile == nil)
+		keyfile = "/usr/" + user() + "/keyring/default";
+	if(debug)
+		sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
+	authinfo = keyring->readauthinfo(keyfile);
+	if (authinfo == nil)
+		error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
 
 	# announcing
 	if(debug)
-		sys->fprint(stderr, "ctlfs: announcing dddbctl");
-	addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
-	c := dial->announce(addr);
+		sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
+	# addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
+	c := dial->announce(cfg.addr);
 	if(c == nil)
-		error(sys->sprint("ctlfs: cannot listen on %s\n", addr));
+		error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));
 
 	# bootstrapping
 	if(debug)
-		sys->fprint
+		sys->fprint(stderr, "ctlfs: bootstrapping\n");
 	sys->unmount(nil, "/mnt/keys");
+	sys->unmount(nil, "/mnt");
 
-	navch := chan of ref Navop;
-	spawn ctlfs_navigator(navch);
+	# nametree; this is shared across all attachees
+	(tree, treeop) := nametree->start();
+	tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
+	tree.create(Qroot, dir("ctl", 8r666, Qctl));
+	tree.create(Qroot, dir("status", 8r444, Qstatus));
 
-	nav := Navigator.new(navch);
-	(tc, srv) := Styxserver.new(fildes(0), nav, big Qroot);
+	sys->fprint(stderr, "ctlfs: finished setting up; starting\n");
 
 	# listener entrypoint
-	listener(c, authinfo, algs);
+	ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs);
+	tree.quit();
 }
 
 # dddbctl listener loop
-ctlfs_listener(c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string)
+ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string)
 {
-	for (;;) {
+	loop: for (;;) {
 		nc := dial->listen(c);
 		if (nc == nil)
 			error(sys->sprint("listen failed: %r"));
 		if (debug)
-			sys->fprint(stderr, "ctlfs: got connection from %s\n",
+			sys->fprint(stderr, "ctlfs: got connection from %s",
 						readfile(nc.dir + "/remote"));
 		dfd := dial->accept(nc);
-		if (dfd != nil) {
-			if(nc.cfd != nil)
-				sys->fprint(nc.cfd, "keepalive");
-			hostname: string;
-			if(passhostnames){
-				hostname = readfile(nc.dir + "/remote");
-				if(hostname != nil)
-					hostname = hostname[0:len hostname - 1];
-			}
+		if (dfd == nil)
+			continue loop;
 
-			spawn ctlfs_authenticator(dfd, authinfo, algs, hostname);
-		}
+		if(nc.cfd != nil)
+			sys->fprint(nc.cfd, "keepalive");
+
+		hostname: string;
+		hostname = readfile(nc.dir + "/remote");
+		if(hostname != nil)
+			hostname = hostname[0:len hostname - 1];
+
+		regchan := dbreg.changen();
+		spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname);
 	}
 }
 
 # authenticate a connection and set the user id.
-ctlfs_authenticator(dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo,
+ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo,
 		algs: list of string, hostname: string)
 {
 	# authenticate and change user id appropriately
@@ -110,113 +128,160 @@
 	(fd, err) := auth->server(algs, authinfo, dfd, 1);
 	if (fd == nil) {
 		if (debug)
-			sys->fprint(stderr(), "ctlfs: authentication failed: %s\n", err);
+			sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
 		return;
 	}
 	if (debug)
-		sys->fprint(stderr(), "ctlfs: client authenticated as %s\n", err);
+		sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);
 
-	spawn exportproc(sync, mfd, err, hostname, fd);
+	spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname);
 }
 
-ctlfs_loop()
+# filesystem loop; nb: hostname will be later used for stats
+ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string)
 {
+	(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);
+
+	# registry rx/tx
+	(tx, rx) := regchan;
+
 	# Primary server loop
 	loop:
-	while((tmsg := <-tc) != nil) {
+	while((tm := <-tc) != nil) {
 		# Switch on operations being performed on a given Fid
-		pick msg := tmsg {
+		pick t := tm {
+		# Open operation
 		Open =>
-			srv.default(msg);
-		Read =>
-			fid := srv.getfid(msg.fid);
-
-			if(fid.qtype & Sys->QTDIR) {
-				# This is a directory read
-				srv.default(msg);
+			(f, mode, d, err) := srv.canopen(t);
+			if(f == nil){
+				srv.reply(ref Rmsg.Error(t.tag, err));
 				continue loop;
 			}
+			f.open(mode, d.qid);
 
-			case int fid.path {
-			Qlog =>
-				# A read on our log file, tell them what they've already said ?
-				s := "";
+			case f.path {
 
-				for(l := log; l != nil; l = tl l)
-					s = hd l + s;
+			# Qroot
+			Qroot =>
+				if(t.mode != Sys->OREAD) {
+					srv.reply(ref Rmsg.Error(t.tag, Eperm));
+					continue loop;
+				}
+				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
 
-				srv.reply(styxservers->readstr(msg, s));
+			# Qctl
+			Qctl =>
+				if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
+					srv.reply(ref Rmsg.Error(t.tag, Eperm));
+					continue loop;
+				}
+				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
+			# Qname
+			Qname =>
+				if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
+					srv.reply(ref Rmsg.Error(t.tag, Eperm));
+					continue loop;
+				}
+				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
 
-			* =>
-				srv.default(msg);
+			# Qstatus
+			Qstatus => 
+				if(t.mode != Sys->OREAD) {
+					srv.reply(ref Rmsg.Error(t.tag, Eperm));
+					continue loop;
+				}
+				srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
+
+			# Default reply
+			* => srv.default(t);
 			}
 
-		Write =>
-			fid := srv.getfid(msg.fid);
+		# Read operation
+		Read =>
+			(f, err) := srv.canread(t);
+			if(f == nil) {
+				srv.reply(ref Rmsg.Error(t.tag, err));
+				break;
+			}
+			if(f.qtype & Sys->QTDIR){
+				srv.read(t);
+				continue loop;
+			}
 
-			case int fid.path {
+			case f.path {
+
+			# Qctl
 			Qctl =>
-				# Don't care about offset
-				cmd := string msg.data;
+				ctlmsg := joinstr(dbfeatures, "\n") + "\n";
+				ctlmsgbuf := array of byte ctlmsg;
+				rend := int t.offset + t.count;
+				if(rend > len ctlmsg)
+					rend = len ctlmsg;
+				srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));
 
-				reply: ref Rmsg = ref Rmsg.Write(msg.tag, len msg.data);
+			# Qname
+			Qname =>
+				namemsg := cfg.name + "\n";
+				namemsgbuf := array of byte namemsg;
+				rend := int t.offset + t.count;
+				if(rend > len namemsg)
+					rend = len namemsg;
+				srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
+			# Qstatus
+			Qstatus =>
+				info: list of string;
+				info = "name		" + cfg.name :: info;
+				info = "sysname		" + cfg.sysn :: info;
+				info = "addr		" + cfg.addr :: info;
+				info = "storage		" + cfg.storage :: info;
+				info = "fsworkers	" + sys->sprint("%d", cfg.fswrks) :: info;
+				info = "" :: info;
+				info = "nodes" :: info;
 
-				case cmd {
-				* =>
-					# Ignore empty writes
-					if(cmd != nil)
-						log = cmd :: log;
-					else
-						reply = ref Rmsg.Error(msg.tag, "empty write!");
-				}
-				srv.reply(reply);
-				
-			* =>
-				srv.default(msg);
-			}
+				tx <-= ref RegTMsg.GetNodes();
+				reply := <- rx;
 
-		* =>
-			srv.default(msg);
-		}
-	}
+				pick r := reply {
+					Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
+					NodeList =>
+						names := lists->reverse(r.names);
+						while(len names != 0) {
+							node := hd names;
+							sline := "";
 
-	exit;
-}
+							tx <-= ref RegTMsg.Check(node);
+							crep := <- rx;
+							pick cr := crep {
+								Error => sline = cr.err;
+								Status =>
+									up := cr.count;
+									ps := cr.poolsize;
+									sline = sys->sprint("%d	%d", up, ps);
+								* => sline = "unsupported message";
+							}
 
-# Navigator function for moving around under /
-ctlfs_navigator(c: chan of ref Navop) {
-	loop: 
-	for(;;) {
-		navop := <-c;
-		pick op := navop {
-		Stat =>
-			op.reply <-= (dir(int op.path), nil);
-			
-		Walk =>
-			if(op.name == "..") {
-				op.reply <-= (dir(Qroot), nil);
-				continue loop;
-			}
+							info = node + "		" +  sline :: info;
+							names = tl names;
+						}
+					* => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
+				}
 
-			case int op.path&16rff {
+				statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
+				statusmsgbuf := array of byte statusmsg;
+				rend := int t.offset + t.count;
+				if(rend > len statusmsg)
+					rend = len statusmsg;
+				srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));
 
-			Qroot =>
-				for(i := 1; i < Qmax; i++)
-					if(tab[i].t1 == op.name) {
-						op.reply <-= (dir(i), nil);
-						continue loop;
-					}
-
-				op.reply <-= (nil, Enotfound);
-			* =>
-				op.reply <-= (nil, Enotdir);
+			# Default reply
+			* => srv.default(t);
 			}
-			
-		Readdir =>
-			for(i := 0; i < op.count && i + op.offset < (len tab) - 1; i++)
-				op.reply <-= (dir(Qroot+1+i+op.offset), nil);
+		# Write operation
+		Write =>
+			srv.default(t);
 
-			op.reply <-= (nil, nil);
+		# Default action
+		* => srv.default(t);
 		}
 	}
 }
--- a/appl/cmd/dddb.b
+++ b/appl/cmd/dddb.b
@@ -6,13 +6,18 @@
 include "draw.m";
 include "string.m";
 	strm: String;
+include "lists.m";
+	lists: Lists;
 
 include "config.b";
 include "ctlfs.b";
+include "nodereg.b";
 
 stderr: ref Sys->FD;
 debug: int;
 
+DBVER: con "v0.1.0";
+
 error(s: string)
 {
 	sys->fprint(stderr, "dddb: %s\n", s);
@@ -19,10 +24,13 @@
 	raise "dddb:error";
 }
 
+Epoolnotfound	: con "pool not found";
+Epoolavail		: con "pool not available";
+
 Dddb: module {
 	init: fn(nil: ref Draw->Context, args: list of string);
-	run_fs: fn(cfg: Config);
 
+	# Configuration section
 	Config: adt {
 		name:		string;
 		sysn:		string;
@@ -31,7 +39,7 @@
 		fswrks:		int;
 		nodes:		list of NodeConfig;
 
-		open:		fn(nodename: string, path: string): Config;
+		open:		fn(nodename: string, mtpt: string): Config;
 	};
 
 	NodeConfig: adt {
@@ -38,12 +46,58 @@
 		name:		string;
 		sysn:		string;
 		addr:		string;
+		keyfile:	string;
 		storage:	string;
+		psize:		int;
 		fswrks:		int;
 
 		new:		fn(entry: ref Dbentry): NodeConfig;
 	};
 
+	# Registry section
+	RegTMsg: adt {
+		pick {
+		GetNodes =>
+		ChanClose =>
+		Check or Refresh or Close =>
+			nodename:	string;
+		}
+	};
+
+	RegRMsg: adt {
+		pick {
+		Error =>
+			err:		string;
+		Status =>
+			count:		int;
+			poolsize:	int;
+		NodeList =>
+			names:		list of string;
+		}
+	};
+
+	NodePool: adt {
+		cfg:		NodeConfig;
+		instances:	list of string;
+
+		init:		fn(r: self ref NodePool): int;
+		check:		fn(r: self ref NodePool): int;
+		refresh:	fn(r: self ref NodePool): int;
+		newinst:	fn(r: self ref NodePool, mtpt: string): string;
+		close:		fn(r: self ref NodePool);
+	};
+
+	DbRegistry: adt {
+		nodepools:	list of ref NodePool;
+		# rchans:		list of chan of ref RegRMsg;
+		# tchans:		list of chan of ref RegTMsg;
+
+		new:		fn(cfgs: list of NodeConfig): ref DbRegistry;
+		init:		fn(r: self ref DbRegistry);
+		# run:		fn(r: self ref DbRegistry);
+		changen:	fn(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg);
+		close:		fn(r: self ref DbRegistry);
+	};
 };
 
 init(nil: ref Draw->Context, args: list of string)
@@ -51,19 +105,43 @@
 	sys = load Sys Sys->PATH;
 	arg := load Arg Arg->PATH;
 	strm = load String String->PATH;
+	lists = load Lists Lists->PATH;
+	dial = load Dial Dial->PATH;
+	auth = load Auth Auth->PATH;
+	keyring = load Keyring Keyring->PATH;
 
+	if(sys == nil)
+		error("dddb: sys module not found");
+	if(arg == nil)
+		error("dddb: arg module not found");
+	if(strm == nil)
+		error("dddb: strm module not found");
+	if(lists == nil)
+		error("dddb: lists module not found");
+	if(dial == nil)
+		error("dddb: dial module not found");
+	if(auth == nil)
+		error("dddb: auth module not found");
+	if(keyring == nil)
+		error("dddb: keyring module not found");
+
 	stderr = sys->fildes(2);
 	cfgpath: string = "";
+	keyfile: string = nil;
+	algs: list of string = nil;
 
 	arg->init(args);
-	arg->setusage(arg->progname()+ " [-d] [-c config] nodename");
+	arg->setusage(arg->progname()+ " [-d] [-k keyfile] [-C algs] [-c config] nodename");
 	while((c := arg->opt()) != 0)
 		case c {
 		'd' => debug++;
-		'c' =>
-			cfgpath = arg->earg();
+		'c' => cfgpath = arg->earg();
+		'k' => keyfile = arg->earg();
+		'C' =>
+			algsstr := arg->earg();
+			(nil, algs) = sys->tokenize(algsstr, ",");
 		* =>
-			sys->fprint(sys->fildes(2), "bad option: -%c\n", c);
+			sys->fprint(stderr, "bad option: -%c\n", c);
 			arg->usage();
 		}
 
@@ -88,7 +166,86 @@
 		sys->fprint(stderr, "cfg.fswrks: %d\n", cfg.fswrks);
 	}
 
-	run_fs(cfg);
+	if(debug)
+		sys->fprint(stderr, "dddb: creating and running node registry\n");
+
+	sys->pctl(Sys->NEWPGRP, nil);
+
+	dbreg := DbRegistry.new(cfg.nodes);
+	spawn dbreg.init();
+	# spawn dbreg.run();
+
+	if(debug)
+		sys->fprint(stderr, "dddb: running ctlfs\n");
+
+	run_ctlfs(cfg, dbreg, keyfile, algs);
+
+	sys->fprint(stderr, "dddb: performing shutdown\n");
+	dbreg.close();
+	sys->fprint(stderr, "dddb: all components shut off\n");
 }
 
+user(): string
+{
+	user := readfile("#c/user");
+	if(user == nil)
+		return "none";
 
+	return user;
+}
+
+readfile(file: string): string
+{
+	fd := sys->open(file, Sys->OREAD);
+	if(fd == nil)
+		return nil;
+
+	buf := array[1024] of byte;
+	n := sys->read(fd, buf, len buf);
+	if(n < 0)
+		return nil;
+
+	return string buf[0:n];
+}
+
+writefile(file: string, s: string): int
+{
+	fd := sys->open(file, Sys->OWRITE);
+	if(fd == nil)
+		return -1;
+
+	buf := array of byte s;
+	n := sys->write(fd, buf, len buf);
+
+	return n;
+}
+
+dir(name: string, perm: int, qid: big): Sys->Dir
+{
+	d := sys->zerodir;
+	user := user();
+	d.name = name;
+	d.uid = user;
+	d.gid = user;
+	d.qid.path = qid;
+	if (perm & Sys->DMDIR)
+		d.qid.qtype = Sys->QTDIR;
+	else
+		d.qid.qtype = Sys->QTFILE;
+	d.mode = perm;
+	return d;
+}
+
+joinstr(items: list of string, sep: string): string
+{
+	s := "";
+	citem := hd items;
+	items = tl items;
+	s = s + citem;
+	while(items != nil) {
+		citem = hd items;
+		items = tl items;
+		s = s + sep + citem;
+	}
+	return s;
+}