ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
parent: 349bee80a965f9bb2a67965ecaefe1a2abe54cf9
author: k <k@midgaard>
date: Tue Nov 7 13:37:27 EST 2023
WIP update
--- a/README
+++ b/README
@@ -29,4 +29,11 @@
* SQL parser - obviously
* statement optimizer - obviously
* wm/dddbmon - a nice admin monitor would be nice
+
+Mirrors
+
+https://git.disroot.org/kitzman/dddb
+
+http://shithub.us/kitzman/dddb/HEAD/info.html
+
* dddbjdbc/dddbodbc - JDBC and ODBC drivers are mandatory
--- a/appl/cmd/ctlfs.b
+++ b/appl/cmd/ctlfs.b
@@ -16,20 +16,28 @@
nametree: Nametree;
Tree: import nametree;
Styxserver, Fid, Navigator, Navop,
- Eperm, Ecount, Eoffset: import styxservers;
+ Eperm, Ecount, Eoffset, Ebadarg: import styxservers;
# Database features
dbfeatures: list of string;
# Initial fs files
-Qroot, Qctl, Qname, Qstatus: con big iota;
+Qroot, Qctl, Qname, Qstatus, Qstorage, Qnodes: con big iota;
+# helper functions
+is_nonempty(s: string): int
+{
+ if(len s == 0)
+ return 0;
+ return 1;
+}
+
# create ctlfs and the appropriate listeners
run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
sys->fprint(stderr, "setting up ctlfs\n");
- dbfeatures = DBVER :: dbfeatures;
+ dbfeatures = DBVER :: "export" :: dbfeatures;
styx = load Styx Styx->PATH;
styxservers = load Styxservers Styxservers->PATH;
@@ -80,21 +88,14 @@
sys->unmount(nil, "/mnt/keys");
sys->unmount(nil, "/mnt");
- # nametree; this is shared across all attachees
- (tree, treeop) := nametree->start();
- tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
- tree.create(Qroot, dir("ctl", 8r666, Qctl));
- tree.create(Qroot, dir("status", 8r444, Qstatus));
-
sys->fprint(stderr, "ctlfs: finished setting up; starting\n");
# listener entrypoint
- ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs);
- tree.quit();
+ ctlfs_listener(cfg, dbreg, c, authinfo, algs);
}
# dddbctl listener loop
-ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string)
+ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string)
{
loop: for (;;) {
nc := dial->listen(c);
@@ -116,13 +117,12 @@
hostname = hostname[0:len hostname - 1];
regchan := dbreg.changen();
- spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname);
+ spawn ctlfs_authenticator(cfg, nametree, regchan, dfd, authinfo, algs, hostname);
}
}
# authenticate a connection and set the user id.
-ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo,
- algs: list of string, hostname: string)
+ctlfs_authenticator(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo, algs: list of string, hostname: string)
{
# authenticate and change user id appropriately
(fd, err) := auth->server(algs, authinfo, dfd, 1);
@@ -134,17 +134,29 @@
if (debug)
sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);
- spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname);
+ spawn ctlfs_loop(cfg, nametree, regchan, fd, hostname);
}
# filesystem loop; nb: hostname will be later used for stats
-ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string)
+ctlfs_loop(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, nil: string)
{
+ # nametree; this is per mount
+ (tree, treeop) := nametree->start();
+ tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
+ tree.create(Qroot, dir("ctl", 8r640, Qctl));
+ tree.create(Qroot, dir("name", 8r444, Qname));
+ tree.create(Qroot, dir("status", 8r440, Qstatus));
+ tree.create(Qroot, dir("storage", 8r555|Sys->DMDIR, Qstorage));
+ tree.create(Qroot, dir("nodes", 8r555|Sys->DMDIR, Qnodes));
+
+ # styxserver start
(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);
# registry rx/tx
(tx, rx) := regchan;
+ (btos, nil) := convcs->getbtos(DCS);
+
# Primary server loop
loop:
while((tm := <-tc) != nil) {
@@ -152,50 +164,7 @@
pick t := tm {
# Open operation
Open =>
- (f, mode, d, err) := srv.canopen(t);
- if(f == nil){
- srv.reply(ref Rmsg.Error(t.tag, err));
- continue loop;
- }
- f.open(mode, d.qid);
-
- case f.path {
-
- # Qroot
- Qroot =>
- if(t.mode != Sys->OREAD) {
- srv.reply(ref Rmsg.Error(t.tag, Eperm));
- continue loop;
- }
- srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
-
- # Qctl
- Qctl =>
- if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
- srv.reply(ref Rmsg.Error(t.tag, Eperm));
- continue loop;
- }
- srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
- # Qname
- Qname =>
- if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
- srv.reply(ref Rmsg.Error(t.tag, Eperm));
- continue loop;
- }
- srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
-
- # Qstatus
- Qstatus =>
- if(t.mode != Sys->OREAD) {
- srv.reply(ref Rmsg.Error(t.tag, Eperm));
- continue loop;
- }
- srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
-
- # Default reply
- * => srv.default(t);
- }
-
+ srv.default(t);
# Read operation
Read =>
(f, err) := srv.canread(t);
@@ -205,7 +174,7 @@
}
if(f.qtype & Sys->QTDIR){
srv.read(t);
- continue loop;
+ break;
}
case f.path {
@@ -278,10 +247,71 @@
}
# Write operation
Write =>
- srv.default(t);
+ (f, nil) := srv.canwrite(t);
+ if(f == nil) {
+ srv.reply(ref Rmsg.Error(t.tag, Eperm));
+ break;
+ }
+ case f.path {
+ # Qctl
+ Qctl =>
+ (nil, csargs, nil) := btos->btos(nil, t.data, len t.data);
+ (nil, crargs) := sys->tokenize(csargs, " \n");
+ cargs := lists->filter(is_nonempty, crargs);
+ case hd cargs {
+ "refresh" =>
+ case len cargs {
+ 1 =>
+ tx <-= ref RegTMsg.RefreshAll();
+ <-rx;
+ srv.reply(ref Rmsg.Write(t.tag, len t.data));
+ * =>
+ pools := tl cargs;
+ for(i := 0; i < len pools; i++) {
+ pool := hd pools;
+ pools = tl pools;
+ tx <-= ref RegTMsg.Refresh(pool);
+ <-rx;
+ }
+ srv.reply(ref Rmsg.Write(t.tag, len t.data));
+ }
+ "check" =>
+ case len cargs {
+ 1 => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
+ * =>
+ pools := tl cargs;
+ for(i := 0; i < len pools; i++) {
+ pool := hd pools;
+ pools = tl pools;
+ tx <-= ref RegTMsg.Check(pool);
+ <-rx;
+ }
+ srv.reply(ref Rmsg.Write(t.tag, len t.data));
+ }
+ "close" =>
+ case len tl cargs {
+ 1 =>
+ pool := hd tl cargs;
+ tx <-= ref RegTMsg.Close(pool);
+ <-rx;
+ srv.reply(ref Rmsg.Write(t.tag, len t.data));
+ * => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
+ }
+ * =>
+ if(hd cargs != "refresh") {
+ sys->fprint(stderr, "is refresh\n");
+ }
+ srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
+ }
+ # Default reply
+ * => srv.default(t);
+ }
+
# Default action
* => srv.default(t);
}
}
+
+ tree.quit();
}
--- a/appl/cmd/dddb.b
+++ b/appl/cmd/dddb.b
@@ -4,10 +4,16 @@
sys: Sys;
include "arg.m";
include "draw.m";
+include "readdir.m";
+ readdir: Readdir;
include "string.m";
strm: String;
include "lists.m";
lists: Lists;
+include "arrays.m";
+ arrays: Arrays;
+include "convcs.m";
+ convcs: Convcs;
include "config.b";
include "ctlfs.b";
@@ -16,7 +22,8 @@
stderr: ref Sys->FD;
debug: int;
-DBVER: con "v0.1.0";
+DBVER: con "v0.1.0";
+DCS: con "utf-8";
error(s: string)
{
@@ -59,6 +66,7 @@
pick {
GetNodes =>
ChanClose =>
+ RefreshAll =>
Check or Refresh or Close =>
nodename: string;
}
@@ -68,6 +76,8 @@
pick {
Error =>
err: string;
+ StatusAll =>
+ status: list of ref RegRMsg.Status;
Status =>
count: int;
poolsize: int;
@@ -104,8 +114,11 @@
{
sys = load Sys Sys->PATH;
arg := load Arg Arg->PATH;
+ readdir = load Readdir Readdir->PATH;
strm = load String String->PATH;
lists = load Lists Lists->PATH;
+ arrays = load Arrays Arrays->PATH;
+ convcs = load Convcs Convcs->PATH;
dial = load Dial Dial->PATH;
auth = load Auth Auth->PATH;
keyring = load Keyring Keyring->PATH;
@@ -114,10 +127,14 @@
error("dddb: sys module not found");
if(arg == nil)
error("dddb: arg module not found");
+ if(readdir == nil)
+ error("dddb: readdir module not found");
if(strm == nil)
error("dddb: strm module not found");
if(lists == nil)
error("dddb: lists module not found");
+ if(convcs == nil)
+ error("dddb: convcs module not found");
if(dial == nil)
error("dddb: dial module not found");
if(auth == nil)
@@ -125,6 +142,14 @@
if(keyring == nil)
error("dddb: keyring module not found");
+ convcs->init(nil);
+ (btos, btoserr) := convcs->getbtos(DCS);
+ if(btos == nil)
+ error(sys->sprint("dddb: %s character set: %s", DCS, btoserr));
+ (stob, stoberr) := convcs->getstob(DCS);
+ if(stob == nil)
+ error(sys->sprint("dddb: %s character set: %s", DCS, stoberr));
+
stderr = sys->fildes(2);
cfgpath: string = "";
keyfile: string = nil;
@@ -147,13 +172,13 @@
args = arg->argv();
- nodename := hd args;
-
- if(nodename == nil) {
- sys->fprint(stderr, "dddb: no nodename supplied\n");
+ if(len args != 1) {
+ sys->fprint(stderr, "dddb: bad usage\n");
arg->usage();
}
+ nodename := hd args;
+
if(debug)
sys->fprint(stderr, "dddb: opening config file\n");
cfg := Config.open(nodename, cfgpath);
@@ -224,6 +249,21 @@
{
d := sys->zerodir;
user := user();
+ d.name = name;
+ d.uid = user;
+ d.gid = user;
+ d.qid.path = qid;
+ if (perm & Sys->DMDIR)
+ d.qid.qtype = Sys->QTDIR;
+ else
+ d.qid.qtype = Sys->QTFILE;
+ d.mode = perm;
+ return d;
+}
+
+diru(name: string, perm: int, qid: big, user: string): Sys->Dir
+{
+ d := sys->zerodir;
d.name = name;
d.uid = user;
d.gid = user;
--- a/appl/cmd/nodereg.b
+++ b/appl/cmd/nodereg.b
@@ -20,6 +20,8 @@
NodePool.check(r: self ref NodePool): int
{
sc := 0;
+ if(debug)
+ sys->fprint(stderr, "np: refreshing pool %s\n", r.cfg.name);
for(i := 0; i < r.cfg.psize; i++) {
mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
@@ -37,6 +39,8 @@
NodePool.refresh(r: self ref NodePool): int
{
sc := 0;
+ if(debug)
+ sys->fprint(stderr, "refreshing pool %s, with %d nodes\n", r.cfg.name, r.cfg.psize);
for(i := 0; i < r.cfg.psize; i++) {
mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
@@ -57,6 +61,21 @@
return sc;
}
+NodePool.close(r: self ref NodePool)
+{
+ if(debug)
+ sys->fprint(stderr, "np: closing pool %s\n", r.cfg.name);
+ instances := r.instances;
+ while(len instances != 0) {
+ instance := hd instances;
+ instances = tl instances;
+ if(debug)
+ sys->fprint(stderr, "np: closing %s\n", instance);
+ sys->unmount(nil, instance);
+ }
+ sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
+}
+
NodePool.newinst(r: self ref NodePool, mtpt: string): string
{
(ec, ae) := sys->tokenize(r.cfg.addr, "!");
@@ -65,8 +84,7 @@
nodename := r.cfg.name;
case ec {
- 1 =>
- 2 =>
+ 1 or 2 =>
defnet = hd ae;
* =>
defnet = hd ae;
@@ -87,7 +105,7 @@
addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc);
if(debug)
- sys->fprint(stderr, "np: %s: dialing %s\n", nodename, keyfile);
+ sys->fprint(stderr, "np: %s: dialing %s\n", nodename, addr);
(ok, c) := sys->dial(addr, nil);
if(ok < 0)
return sys->sprint("unable to dial %s", addr);
@@ -107,19 +125,6 @@
return nil;
}
-NodePool.close(r: self ref NodePool)
-{
- instances := r.instances;
- while(len instances != 0) {
- instance := hd instances;
- instances = tl instances;
- if(debug)
- sys->fprint(stderr, "np: closing %s\n", instance);
- sys->unmount(nil, instance);
- }
- sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
-}
-
# create an uninitialized registry
DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry
{
@@ -209,6 +214,17 @@
c := pool.refresh();
rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
+ RefreshAll =>
+ nodepools := r.nodepools;
+ status: list of ref RegRMsg.Status;
+ for(i := 0; i < len nodepools; i++) {
+ sys->fprint(stderr, "idx: %d\n", i);
+ pool := hd nodepools;
+ nodepools = tl nodepools;
+ c := pool.refresh();
+ status = ref RegRMsg.Status(c, pool.cfg.psize) :: status;
+ }
+ rx <-= ref RegRMsg.StatusAll(status);
Close =>
pool := get_pool(r, msg.nodename);
if(pool == nil)
--- a/appl/lib/mkfile
+++ b/appl/lib/mkfile
@@ -1,11 +1,14 @@
<../../mkconfig
TARG=\
+ mtptmirror.dis\
MODULES=\
SYSMODULES=\
+ hash.m\
+ mtptmirror.m\
-DISBIN=$ROOT/dis/lib
+DISBIN=$home/dis/lib
<$ROOT/mkfiles/mkdis
--- /dev/null
+++ b/appl/lib/mtptmirror.b
@@ -1,0 +1,73 @@
+implement MtptMirror;
+
+include "sys.m";
+ sys: Sys;
+include "hash.m";
+ hash: Hash;
+include "nametree.m"
+ nametree: Nametree;
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ hash = load Hash Hash->PATH;
+ nametree = load Nametree Nametree->PATH;
+}
+
+new(mtpts: list of string, lower, upper: big)
+{
+ m: ref MtptMirror;
+ m.mtpts = mtpts;
+ m.qidmap = hash->new(HSSZ);
+ m.lower = lower;
+ m.upper = upper ;
+ return m;
+}
+
+MtptMirror.refresh(m: self ref MtptMirror): string
+{
+ # find a non-empty mountpoint
+ mtpt := "";
+ for(i := 0; i < len m.mtpts; i++) {
+ fd := sys->open(m.mtpts[i], Sys->OREAD);
+ if(fd == nil)
+ continue;
+ (ec, nil) := sys->dirread(fd);
+ if(ec > 0) {
+ mtpt = m.mtpts[i];
+ break;
+ }
+ }
+ if(mtpt == "")
+ return Emmnopath;
+
+ queue: array of ref Sys->Dir;
+ visited_nodes: array of string;
+ qi := 1;
+
+ # queue[0] = mtpt;
+ walkloop:
+ for(i := 0; i < qi; i++) {
+ rpath := queue[i];
+ (content, nil) := readdir->init(rpath, readdir->NAME|readdir->COMPACT);
+ for(j := 0; j < len content; j++) {
+ # if(rpath == "nodes")
+ queue[qi] = content[j];
+ qi++;
+ }
+ }
+}
+
+MtptMirror.get_path(m: self ref MtptMirror, path: string): big
+{
+ v := m.qidmap.find(path);
+ if(v == nil)
+ return -1;
+ return v.i;
+}
+
+# mountpoint serving functions
+mtpt_walk(tree: Tree, mtpt: string, Cnode, Mnodes: int): int
+{
+
+}
--- a/mkfile
+++ b/mkfile
@@ -2,5 +2,6 @@
DIRS=\
appl\
+ module\
<$ROOT/mkfiles/mksubdirs