ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/ctlfs.b/
include "dial.m"; dial: Dial; include "security.m"; auth: Auth; include "keyring.m"; keyring: Keyring; include "styx.m"; styx: Styx; Tmsg, Rmsg: import Styx; include "styxservers.m"; styxservers: Styxservers; nametree: Nametree; Tree: import nametree; Styxserver, Fid, Navigator, Navop, Eperm, Ecount, Eoffset, Ebadarg: import styxservers; # Database features dbfeatures: list of string; # Initial fs files Qroot, Qctl, Qname, Qstatus, Qstorage, Qnodes: con big iota; # helper functions is_nonempty(s: string): int { if(len s == 0) return 0; return 1; } # create ctlfs and the appropriate listeners run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string) { sys->fprint(stderr, "setting up ctlfs\n"); dbfeatures = DBVER :: "export" :: dbfeatures; styx = load Styx Styx->PATH; styxservers = load Styxservers Styxservers->PATH; nametree = load Nametree Nametree->PATH; if(debug) sys->fprint(stderr, "ctlfs: checking if modules are loaded\n"); if(styx == nil) error("ctlfs: styx module not found"); if(styxservers == nil) error("ctlfs: styxservers module not found"); if(nametree == nil) error("ctlfs: nametree module not found"); if(debug) sys->fprint(stderr, "ctlfs: initializing modules\n"); auth->init(); styx->init(); styxservers->init(styx); nametree->init(); # authinfo init authinfo: ref Keyring->Authinfo; if (keyfile == nil) keyfile = "/usr/" + user() + "/keyring/default"; if(debug) sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile); authinfo = keyring->readauthinfo(keyfile); if (authinfo == nil) error(sys->sprint("ctlfs: cannot read %s: %r", keyfile)); # announcing if(debug) sys->fprint(stderr, "ctlfs: announcing dddbctl\n"); # addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl"); c := dial->announce(cfg.addr); if(c == nil) error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr)); # bootstrapping if(debug) sys->fprint(stderr, "ctlfs: bootstrapping\n"); sys->unmount(nil, "/mnt/keys"); sys->unmount(nil, "/mnt"); sys->fprint(stderr, "ctlfs: finished setting up; starting\n"); # listener entrypoint ctlfs_listener(cfg, dbreg, c, authinfo, algs); } # dddbctl listener loop ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string) { loop: for (;;) { nc := dial->listen(c); if (nc == nil) error(sys->sprint("listen failed: %r")); if (debug) sys->fprint(stderr, "ctlfs: got connection from %s", readfile(nc.dir + "/remote")); dfd := dial->accept(nc); if (dfd == nil) continue loop; if(nc.cfd != nil) sys->fprint(nc.cfd, "keepalive"); hostname: string; hostname = readfile(nc.dir + "/remote"); if(hostname != nil) hostname = hostname[0:len hostname - 1]; regchan := dbreg.changen(); spawn ctlfs_authenticator(cfg, nametree, regchan, dfd, authinfo, algs, hostname); } } # authenticate a connection and set the user id. ctlfs_authenticator(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo, algs: list of string, hostname: string) { # authenticate and change user id appropriately (fd, err) := auth->server(algs, authinfo, dfd, 1); if (fd == nil) { if (debug) sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err); return; } if (debug) sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err); spawn ctlfs_loop(cfg, nametree, regchan, fd, hostname); } # filesystem loop; nb: hostname will be later used for stats ctlfs_loop(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, nil: string) { # nametree; this is per mount (tree, treeop) := nametree->start(); tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot)); tree.create(Qroot, dir("ctl", 8r640, Qctl)); tree.create(Qroot, dir("name", 8r444, Qname)); tree.create(Qroot, dir("status", 8r440, Qstatus)); tree.create(Qroot, dir("storage", 8r555|Sys->DMDIR, Qstorage)); tree.create(Qroot, dir("nodes", 8r555|Sys->DMDIR, Qnodes)); # styxserver start (tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot); # registry rx/tx (tx, rx) := regchan; (btos, nil) := convcs->getbtos(DCS); # Primary server loop loop: while((tm := <-tc) != nil) { # Switch on operations being performed on a given Fid pick t := tm { # Open operation Open => srv.default(t); # Read operation Read => (f, err) := srv.canread(t); if(f == nil) { srv.reply(ref Rmsg.Error(t.tag, err)); break; } if(f.qtype & Sys->QTDIR){ srv.read(t); break; } case f.path { # Qctl Qctl => ctlmsg := joinstr(dbfeatures, "\n") + "\n"; ctlmsgbuf := array of byte ctlmsg; rend := int t.offset + t.count; if(rend > len ctlmsg) rend = len ctlmsg; srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend])); # Qname Qname => namemsg := cfg.name + "\n"; namemsgbuf := array of byte namemsg; rend := int t.offset + t.count; if(rend > len namemsg) rend = len namemsg; srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend])); # Qstatus Qstatus => info: list of string; info = "name " + cfg.name :: info; info = "sysname " + cfg.sysn :: info; info = "addr " + cfg.addr :: info; info = "storage " + cfg.storage :: info; info = "fsworkers " + sys->sprint("%d", cfg.fswrks) :: info; info = "" :: info; info = "nodes" :: info; tx <-= ref RegTMsg.GetNodes(); reply := <- rx; pick r := reply { Error => srv.reply(ref Rmsg.Error(t.tag, r.err)); NodeList => names := lists->reverse(r.names); while(len names != 0) { node := hd names; sline := ""; tx <-= ref RegTMsg.Check(node); crep := <- rx; pick cr := crep { Error => sline = cr.err; Status => up := cr.count; ps := cr.poolsize; sline = sys->sprint("%d %d", up, ps); * => sline = "unsupported message"; } info = node + " " + sline :: info; names = tl names; } * => srv.reply(ref Rmsg.Error(t.tag, "unsupported version")); } statusmsg := joinstr(lists->reverse(info), "\n") + "\n"; statusmsgbuf := array of byte statusmsg; rend := int t.offset + t.count; if(rend > len statusmsg) rend = len statusmsg; srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend])); # Default reply * => srv.default(t); } # Write operation Write => (f, nil) := srv.canwrite(t); if(f == nil) { srv.reply(ref Rmsg.Error(t.tag, Eperm)); break; } case f.path { # Qctl Qctl => (nil, csargs, nil) := btos->btos(nil, t.data, len t.data); (nil, crargs) := sys->tokenize(csargs, " \n"); cargs := lists->filter(is_nonempty, crargs); case hd cargs { "refresh" => case len cargs { 1 => tx <-= ref RegTMsg.RefreshAll(); <-rx; srv.reply(ref Rmsg.Write(t.tag, len t.data)); * => pools := tl cargs; for(i := 0; i < len pools; i++) { pool := hd pools; pools = tl pools; tx <-= ref RegTMsg.Refresh(pool); <-rx; } srv.reply(ref Rmsg.Write(t.tag, len t.data)); } "check" => case len cargs { 1 => srv.reply(ref Rmsg.Error(t.tag, Ebadarg)); * => pools := tl cargs; for(i := 0; i < len pools; i++) { pool := hd pools; pools = tl pools; tx <-= ref RegTMsg.Check(pool); <-rx; } srv.reply(ref Rmsg.Write(t.tag, len t.data)); } "close" => case len tl cargs { 1 => pool := hd tl cargs; tx <-= ref RegTMsg.Close(pool); <-rx; srv.reply(ref Rmsg.Write(t.tag, len t.data)); * => srv.reply(ref Rmsg.Error(t.tag, Ebadarg)); } * => if(hd cargs != "refresh") { sys->fprint(stderr, "is refresh\n"); } srv.reply(ref Rmsg.Error(t.tag, Ebadarg)); } # Default reply * => srv.default(t); } # Default action * => srv.default(t); } } tree.quit(); }