ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/nodereg.b/
# initialize a node pool NodePool.init(r: self ref NodePool): int { if(debug) sys->fprint(stderr, "np: init'ing pool %s\n", r.cfg.name); if(len r.instances == r.cfg.psize) { sys->fprint(stderr, "np: pool %s already initialized\n", r.cfg.name); return 1; } c := r.refresh(); if(debug) sys->fprint(stderr, "np: pool %s init'ed: %d/%d\n", r.cfg.name, c, r.cfg.psize); return 0; } NodePool.check(r: self ref NodePool): int { sc := 0; if(debug) sys->fprint(stderr, "np: refreshing pool %s\n", r.cfg.name); for(i := 0; i < r.cfg.psize; i++) { mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i); ctlpath := mtpt + "/ctl"; if(sys->stat(ctlpath).t0 >= 0) sc++; else sys->unmount(nil, mtpt); } return sc; } NodePool.refresh(r: self ref NodePool): int { sc := 0; if(debug) sys->fprint(stderr, "refreshing pool %s, with %d nodes\n", r.cfg.name, r.cfg.psize); for(i := 0; i < r.cfg.psize; i++) { mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i); ctlpath := mtpt + "/ctl"; if(sys->stat(ctlpath).t0 >= 0) continue; sys->unmount(nil, mtpt); err := r.newinst(mtpt); if(err != nil) break; sc++; } return sc; } NodePool.close(r: self ref NodePool) { if(debug) sys->fprint(stderr, "np: closing pool %s\n", r.cfg.name); instances := r.instances; while(len instances != 0) { instance := hd instances; instances = tl instances; if(debug) sys->fprint(stderr, "np: closing %s\n", instance); sys->unmount(nil, instance); } sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name); } NodePool.newinst(r: self ref NodePool, mtpt: string): string { (ec, ae) := sys->tokenize(r.cfg.addr, "!"); defnet := "tcp"; defsvc := "dddbctl"; nodename := r.cfg.name; case ec { 1 or 2 => defnet = hd ae; * => defnet = hd ae; defsvc = hd tl tl ae; } keyfile := r.cfg.keyfile; if(keyfile == "" || keyfile == nil) keyfile = sys->sprint("/usr/%s/keyring/%s!%s!%s", user(), defnet, r.cfg.sysn, defsvc); if(debug) sys->fprint(stderr, "np: %s: reading keyfile %s\n", nodename, keyfile); authinfo := keyring->readauthinfo(keyfile); if (authinfo == nil) { sys->fprint(stderr, "np: %s error: %r\n", nodename); return sys->sprint("cannot read %s", keyfile); } addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc); if(debug) sys->fprint(stderr, "np: %s: dialing %s\n", nodename, addr); (ok, c) := sys->dial(addr, nil); if(ok < 0) return sys->sprint("unable to dial %s", addr); (fd, err) := auth->client("", authinfo, c.dfd); if(fd == nil) { sys->fprint(stderr, "np: %s: error authenticating: %s\n", nodename, err); return err; } ok = sys->mount(fd, nil, mtpt, Sys->MREPL, nil); if(ok < 0) { sys->fprint(stderr, "np: %s: unable to mount %s\n", nodename, mtpt); return sys->sprint("unable to mount %s\n", mtpt); } return nil; } # create an uninitialized registry DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry { sys->fprint(stderr, "dbreg: creating up database registry\n"); nodepools: list of ref NodePool; while(len cfgs != 0) { nodepools = ref NodePool(hd cfgs, nil) :: nodepools; cfgs = tl cfgs; } # rchans: list of chan of ref RegRMsg; # tchans: list of chan of ref RegTMsg; return ref DbRegistry(nodepools); } # initialize the registry DbRegistry.init(r: self ref DbRegistry) { nodepools := r.nodepools; count := 0; sys->fprint(stderr, "dbreg: initializing pools\n"); while(len nodepools != 0) { pool := hd nodepools; nodepools = tl nodepools; err := pool.init(); if(err) count++; } sys->fprint(stderr, "dbreg: initialized %d out of %d pools\n", count, len r.nodepools); } DbRegistry.close(r: self ref DbRegistry) { nodepools := r.nodepools; sys->fprint(stderr, "dbreg: closing all pools\n"); while(len nodepools != 0) { pool := hd nodepools; nodepools = tl nodepools; spawn pool.close(); } } get_pool(r: ref DbRegistry, name: string): ref NodePool { nodepools := r.nodepools; while(len nodepools != 0) { pool := hd nodepools; nodepools = tl nodepools; if(pool.cfg.name == name) return pool; } return nil; } run_chans(r: ref DbRegistry, tx: chan of ref RegTMsg, rx: chan of ref RegRMsg) { active := 1; while(active) { tm := <-tx; pick msg := tm { ChanClose => active = 0; GetNodes => nodes: list of string; nodepools := r.nodepools; while(len nodepools != 0) { pool := hd nodepools; nodepools = tl nodepools; nodes = pool.cfg.name :: nodes; } rx <-= ref RegRMsg.NodeList(nodes); Check => pool := get_pool(r, msg.nodename); if(pool == nil) rx <-= ref RegRMsg.Error(Epoolnotfound); c := pool.check(); rx <-= ref RegRMsg.Status(c, pool.cfg.psize); Refresh => pool := get_pool(r, msg.nodename); if(pool == nil) rx <-= ref RegRMsg.Error(Epoolnotfound); c := pool.refresh(); rx <-= ref RegRMsg.Status(c, pool.cfg.psize); RefreshAll => nodepools := r.nodepools; status: list of ref RegRMsg.Status; for(i := 0; i < len nodepools; i++) { sys->fprint(stderr, "idx: %d\n", i); pool := hd nodepools; nodepools = tl nodepools; c := pool.refresh(); status = ref RegRMsg.Status(c, pool.cfg.psize) :: status; } rx <-= ref RegRMsg.StatusAll(status); Close => pool := get_pool(r, msg.nodename); if(pool == nil) rx <-= ref RegRMsg.Error(Epoolnotfound); pool.close(); rx <-= ref RegRMsg.Status(0, pool.cfg.psize); } } } DbRegistry.changen(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg) { tx := chan of ref RegTMsg; rx := chan of ref RegRMsg; spawn run_chans(r, tx, rx); return (tx, rx); }