shithub: dddb

ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/nodereg.b/

View raw version
# initialize a node pool
NodePool.init(r: self ref NodePool): int
{
	if(debug)
		sys->fprint(stderr, "np: init'ing pool %s\n", r.cfg.name);

	if(len r.instances == r.cfg.psize) {
		sys->fprint(stderr, "np: pool %s already initialized\n", r.cfg.name);
		return 1;
	}

	c := r.refresh();

	if(debug)
		sys->fprint(stderr, "np: pool %s init'ed: %d/%d\n", r.cfg.name, c, r.cfg.psize);

	return 0;
}

NodePool.check(r: self ref NodePool): int
{
	sc := 0;
	if(debug)
		sys->fprint(stderr, "np: refreshing pool %s\n", r.cfg.name);

	for(i := 0; i < r.cfg.psize; i++) {
		mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
		ctlpath := mtpt + "/ctl";

		if(sys->stat(ctlpath).t0 >= 0)
			sc++;
		else
			sys->unmount(nil, mtpt);
	}

	return sc;
}

NodePool.refresh(r: self ref NodePool): int
{
	sc := 0;
	if(debug)
		sys->fprint(stderr, "refreshing pool %s, with %d nodes\n", r.cfg.name, r.cfg.psize);

	for(i := 0; i < r.cfg.psize; i++) {
		mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
		ctlpath := mtpt + "/ctl";

		if(sys->stat(ctlpath).t0 >= 0)
			continue;

		sys->unmount(nil, mtpt);

		err := r.newinst(mtpt);
		if(err != nil)
			break;

		sc++;
	}

	return sc;
}

NodePool.close(r: self ref NodePool)
{
	if(debug)
		sys->fprint(stderr, "np: closing pool %s\n", r.cfg.name);
	instances := r.instances;
	while(len instances != 0) {
		instance := hd instances;
		instances = tl instances;
		if(debug)
			sys->fprint(stderr, "np: closing %s\n", instance);
		sys->unmount(nil, instance);
	}
	sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
}

NodePool.newinst(r: self ref NodePool, mtpt: string): string
{
	(ec, ae) := sys->tokenize(r.cfg.addr, "!");
	defnet := "tcp";
	defsvc := "dddbctl";
	nodename := r.cfg.name;

	case ec {
		1 or 2 =>
			defnet = hd ae;
		* =>
			defnet = hd ae;
			defsvc = hd tl tl ae;
	}

	keyfile := r.cfg.keyfile;
	if(keyfile == "" || keyfile == nil)
		keyfile = sys->sprint("/usr/%s/keyring/%s!%s!%s", user(), defnet, r.cfg.sysn, defsvc);
	if(debug)
		sys->fprint(stderr, "np: %s: reading keyfile %s\n", nodename, keyfile);
	authinfo := keyring->readauthinfo(keyfile);
	if (authinfo == nil) {
		sys->fprint(stderr, "np: %s error: %r\n", nodename);
		return sys->sprint("cannot read %s", keyfile);
	}

	addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc);

	if(debug)
		sys->fprint(stderr, "np: %s: dialing %s\n", nodename, addr);
	(ok, c) := sys->dial(addr, nil);
	if(ok < 0)
		return sys->sprint("unable to dial %s", addr);

	(fd, err) := auth->client("", authinfo, c.dfd);
	if(fd == nil) {
		sys->fprint(stderr, "np: %s: error authenticating: %s\n", nodename, err);
		return err;
	}

	ok = sys->mount(fd, nil, mtpt, Sys->MREPL, nil);
	if(ok < 0) {
		sys->fprint(stderr, "np: %s: unable to mount %s\n", nodename, mtpt);
		return sys->sprint("unable to mount %s\n", mtpt);
	}

	return nil;
}

# create an uninitialized registry
DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry
{
	sys->fprint(stderr, "dbreg: creating up database registry\n");

	nodepools: list of ref NodePool;
	while(len cfgs != 0) {
		nodepools = ref NodePool(hd cfgs, nil) :: nodepools;
		cfgs = tl cfgs;
	}

	# rchans: list of chan of ref RegRMsg;
	# tchans: list of chan of ref RegTMsg;

	return ref DbRegistry(nodepools);
}

# initialize the registry
DbRegistry.init(r: self ref DbRegistry)
{
	nodepools := r.nodepools;
	count := 0;

	sys->fprint(stderr, "dbreg: initializing pools\n");
	while(len nodepools != 0) {
		pool := hd nodepools;
		nodepools = tl nodepools;
		err := pool.init();
		if(err)
			count++;
	}

	sys->fprint(stderr, "dbreg: initialized %d out of %d pools\n", count, len r.nodepools);
}

DbRegistry.close(r: self ref DbRegistry)
{
	nodepools := r.nodepools;
	sys->fprint(stderr, "dbreg: closing all pools\n");
	while(len nodepools != 0) {
		pool := hd nodepools;
		nodepools = tl nodepools;
		spawn pool.close();
	}
}

get_pool(r: ref DbRegistry, name: string): ref NodePool
{
	nodepools := r.nodepools;
	while(len nodepools != 0) {
		pool := hd nodepools;
		nodepools = tl nodepools;
		if(pool.cfg.name == name)
			return pool;
	}
	return nil;
}

run_chans(r: ref DbRegistry, tx: chan of ref RegTMsg, rx: chan of ref RegRMsg)
{
	active := 1;
	while(active) {
		tm := <-tx;
		pick msg := tm {
			ChanClose =>
				active = 0;
			GetNodes =>
				nodes: list of string;
				nodepools := r.nodepools;
				while(len nodepools != 0) {
					pool := hd nodepools;
					nodepools = tl nodepools;
					nodes = pool.cfg.name :: nodes;
				}
				rx <-= ref RegRMsg.NodeList(nodes);
			Check =>
				pool := get_pool(r, msg.nodename);
				if(pool == nil)
					rx <-= ref RegRMsg.Error(Epoolnotfound);

				c := pool.check();
				rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
			Refresh =>
				pool := get_pool(r, msg.nodename);
				if(pool == nil)
					rx <-= ref RegRMsg.Error(Epoolnotfound);

				c := pool.refresh();
				rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
			RefreshAll =>
				nodepools := r.nodepools;
				status: list of ref RegRMsg.Status;
				for(i := 0; i < len nodepools; i++) {
					sys->fprint(stderr, "idx: %d\n", i);
					pool := hd nodepools;
					nodepools = tl nodepools;
					c := pool.refresh();
					status = ref RegRMsg.Status(c, pool.cfg.psize) :: status;
				}
				rx <-= ref RegRMsg.StatusAll(status);
			Close =>
				pool := get_pool(r, msg.nodename);
				if(pool == nil)
					rx <-= ref RegRMsg.Error(Epoolnotfound);

				pool.close();
				rx <-= ref RegRMsg.Status(0, pool.cfg.psize);
		}
	}
}

DbRegistry.changen(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg)
{
	tx := chan of ref RegTMsg;
	rx := chan of ref RegRMsg;

	spawn run_chans(r, tx, rx);

	return (tx, rx);
}