shithub: dddb

Download patch

ref: 349bee80a965f9bb2a67965ecaefe1a2abe54cf9
parent: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
author: k <k@santiago>
date: Sun Jul 30 07:20:24 EDT 2023

added a basic node registry

--- /dev/null
+++ b/appl/cmd/nodereg.b
@@ -1,0 +1,231 @@
+# initialize a node pool
+NodePool.init(r: self ref NodePool): int
+{
+	if(debug)
+		sys->fprint(stderr, "np: init'ing pool %s\n", r.cfg.name);
+
+	if(len r.instances == r.cfg.psize) {
+		sys->fprint(stderr, "np: pool %s already initialized\n", r.cfg.name);
+		return 1;
+	}
+
+	c := r.refresh();
+
+	if(debug)
+		sys->fprint(stderr, "np: pool %s init'ed: %d/%d\n", r.cfg.name, c, r.cfg.psize);
+
+	return 0;
+}
+
+NodePool.check(r: self ref NodePool): int
+{
+	sc := 0;
+
+	for(i := 0; i < r.cfg.psize; i++) {
+		mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
+		ctlpath := mtpt + "/ctl";
+
+		if(sys->stat(ctlpath).t0 >= 0)
+			sc++;
+		else
+			sys->unmount(nil, mtpt);
+	}
+
+	return sc;
+}
+
+NodePool.refresh(r: self ref NodePool): int
+{
+	sc := 0;
+
+	for(i := 0; i < r.cfg.psize; i++) {
+		mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
+		ctlpath := mtpt + "/ctl";
+
+		if(sys->stat(ctlpath).t0 >= 0)
+			continue;
+
+		sys->unmount(nil, mtpt);
+
+		err := r.newinst(mtpt);
+		if(err != nil)
+			break;
+
+		sc++;
+	}
+
+	return sc;
+}
+
+NodePool.newinst(r: self ref NodePool, mtpt: string): string
+{
+	(ec, ae) := sys->tokenize(r.cfg.addr, "!");
+	defnet := "tcp";
+	defsvc := "dddbctl";
+	nodename := r.cfg.name;
+
+	case ec {
+		1 =>
+		2 =>
+			defnet = hd ae;
+		* =>
+			defnet = hd ae;
+			defsvc = hd tl tl ae;
+	}
+
+	keyfile := r.cfg.keyfile;
+	if(keyfile == "" || keyfile == nil)
+		keyfile = sys->sprint("/usr/%s/keyring/%s!%s!%s", user(), defnet, r.cfg.sysn, defsvc);
+	if(debug)
+		sys->fprint(stderr, "np: %s: reading keyfile %s\n", nodename, keyfile);
+	authinfo := keyring->readauthinfo(keyfile);
+	if (authinfo == nil) {
+		sys->fprint(stderr, "np: %s error: %r\n", nodename);
+		return sys->sprint("cannot read %s", keyfile);
+	}
+
+	addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc);
+
+	if(debug)
+		sys->fprint(stderr, "np: %s: dialing %s\n", nodename, keyfile);
+	(ok, c) := sys->dial(addr, nil);
+	if(ok < 0)
+		return sys->sprint("unable to dial %s", addr);
+
+	(fd, err) := auth->client("", authinfo, c.dfd);
+	if(fd == nil) {
+		sys->fprint(stderr, "np: %s: error authenticating: %s\n", nodename, err);
+		return err;
+	}
+
+	ok = sys->mount(fd, nil, mtpt, Sys->MREPL, nil);
+	if(ok < 0) {
+		sys->fprint(stderr, "np: %s: unable to mount %s\n", nodename, mtpt);
+		return sys->sprint("unable to mount %s\n", mtpt);
+	}
+
+	return nil;
+}
+
+NodePool.close(r: self ref NodePool)
+{
+	instances := r.instances;
+	while(len instances != 0) {
+		instance := hd instances;
+		instances = tl instances;
+		if(debug)
+			sys->fprint(stderr, "np: closing %s\n", instance);
+		sys->unmount(nil, instance);
+	}
+	sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
+}
+
+# create an uninitialized registry
+DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry
+{
+	sys->fprint(stderr, "dbreg: creating up database registry\n");
+
+	nodepools: list of ref NodePool;
+	while(len cfgs != 0) {
+		nodepools = ref NodePool(hd cfgs, nil) :: nodepools;
+		cfgs = tl cfgs;
+	}
+
+	# rchans: list of chan of ref RegRMsg;
+	# tchans: list of chan of ref RegTMsg;
+
+	return ref DbRegistry(nodepools);
+}
+
+# initialize the registry
+DbRegistry.init(r: self ref DbRegistry)
+{
+	nodepools := r.nodepools;
+	count := 0;
+
+	sys->fprint(stderr, "dbreg: initializing pools\n");
+	while(len nodepools != 0) {
+		pool := hd nodepools;
+		nodepools = tl nodepools;
+		err := pool.init();
+		if(err)
+			count++;
+	}
+
+	sys->fprint(stderr, "dbreg: initialized %d out of %d pools\n", count, len r.nodepools);
+}
+
+DbRegistry.close(r: self ref DbRegistry)
+{
+	nodepools := r.nodepools;
+	sys->fprint(stderr, "dbreg: closing all pools\n");
+	while(len nodepools != 0) {
+		pool := hd nodepools;
+		nodepools = tl nodepools;
+		spawn pool.close();
+	}
+}
+
+get_pool(r: ref DbRegistry, name: string): ref NodePool
+{
+	nodepools := r.nodepools;
+	while(len nodepools != 0) {
+		pool := hd nodepools;
+		nodepools = tl nodepools;
+		if(pool.cfg.name == name)
+			return pool;
+	}
+	return nil;
+}
+
+run_chans(r: ref DbRegistry, tx: chan of ref RegTMsg, rx: chan of ref RegRMsg)
+{
+	active := 1;
+	while(active) {
+		tm := <-tx;
+		pick msg := tm {
+			ChanClose =>
+				active = 0;
+			GetNodes =>
+				nodes: list of string;
+				nodepools := r.nodepools;
+				while(len nodepools != 0) {
+					pool := hd nodepools;
+					nodepools = tl nodepools;
+					nodes = pool.cfg.name :: nodes;
+				}
+				rx <-= ref RegRMsg.NodeList(nodes);
+			Check =>
+				pool := get_pool(r, msg.nodename);
+				if(pool == nil)
+					rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+				c := pool.check();
+				rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
+			Refresh =>
+				pool := get_pool(r, msg.nodename);
+				if(pool == nil)
+					rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+				c := pool.refresh();
+				rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
+			Close =>
+				pool := get_pool(r, msg.nodename);
+				if(pool == nil)
+					rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+				pool.close();
+				rx <-= ref RegRMsg.Status(0, pool.cfg.psize);
+		}
+	}
+}
+
+DbRegistry.changen(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg)
+{
+	tx := chan of ref RegTMsg;
+	rx := chan of ref RegRMsg;
+
+	spawn run_chans(r, tx, rx);
+
+	return (tx, rx);
+}