ref: a411870ee4640241e3c494367d922847da84f972
dir: /appl/cmd/lockfs.b/
implement Lockfs; include "sys.m"; sys: Sys; stderr: ref Sys->FD; include "draw.m"; include "styx.m"; styx: Styx; Tmsg, Rmsg: import styx; include "styxlib.m"; styxlib: Styxlib; Dirtab, Styxserver, Chan, devdir, Eperm, Ebadfid, Eexists, Enotdir, Enotfound, Einuse: import styxlib; include "arg.m"; include "keyring.m"; keyring: Keyring; include "security.m"; auth: Auth; include "dial.m"; dial: Dial; Lockfs: module { init: fn(nil: ref Draw->Context, argv: list of string); dirgen: fn(srv: ref Styxlib->Styxserver, c: ref Styxlib->Chan, tab: array of Styxlib->Dirtab, i: int): (int, Sys->Dir); }; Elocked: con "file is locked"; devgen: Dirgenmod; Openreq: adt { srv: ref Styxserver; tag: int; omode: int; c: ref Chan; uproc: Uproc; }; Lockqueue: adt { h: list of ref Openreq; t: list of ref Openreq; put: fn(q: self ref Lockqueue, s: ref Openreq); get: fn(q: self ref Lockqueue): ref Openreq; peek: fn(q: self ref Lockqueue): ref Openreq; flush: fn(q: self ref Lockqueue, srv: ref Styxserver, tag: int); }; Lockfile: adt { waitq: ref Lockqueue; fd: ref Sys->FD; readers: int; writers: int; d: Sys->Dir; }; Ureq: adt { fname: string; pick { Open => omode: int; Create => omode: int; perm: int; Remove => Wstat => dir: Sys->Dir; } }; Uproc: type chan of (ref Ureq, chan of (ref Sys->FD, string)); maxqidpath := big 1; locks: list of ref Lockfile; lockdir: string; authinfo: ref Keyring->Authinfo; timefd: ref Sys->FD; MAXCONN: con 20; verbose := 0; usage() { sys->fprint(stderr, "usage: lockfs [-A] [-a alg]... [-p addr] dir [mountpoint]\n"); raise "fail:usage"; } badmodule(p: string) { sys->fprint(stderr, "lockfs: cannot load %s: %r\n", p); raise "fail:bad module"; } init(nil: ref Draw->Context, argv: list of string) { sys = load Sys Sys->PATH; stderr = sys->fildes(2); styx = load Styx Styx->PATH; if (styx == nil) badmodule(Styx->PATH); dial = load Dial Dial->PATH; if (dial == nil) badmodule(Dial->PATH); styx->init(); styxlib = load Styxlib Styxlib->PATH; if (styxlib == nil) badmodule(Styxlib->PATH); styxlib->init(styx); devgen = load Dirgenmod "$self"; if (devgen == nil) badmodule("self as Dirgenmod"); timefd = sys->open("/dev/time", sys->OREAD); if (timefd == nil) { sys->fprint(stderr, "lockfs: cannot open /dev/time: %r\n"); raise "fail:no time"; } arg := load Arg Arg->PATH; if (arg == nil) badmodule(Arg->PATH); arg->init(argv); addr := ""; doauth := 1; algs: list of string; while ((opt := arg->opt()) != 0) { case opt { 'p' => addr = arg->arg(); 'a' => alg := arg->arg(); if (alg == nil) usage(); algs = alg :: algs; 'A' => doauth = 0; 'v' => verbose = 1; * => usage(); } } argv = arg->argv(); if (argv == nil || (addr != nil && tl argv != nil)) usage(); if (addr == nil) doauth = 0; # no authentication necessary for local mount if (doauth) { auth = load Auth Auth->PATH; if (auth == nil) badmodule(Auth->PATH); if ((e := auth->init()) != nil) { sys->fprint(stderr, "lockfs: cannot init auth: %s\n", e); raise "fail:errors"; } keyring = load Keyring Keyring->PATH; if (keyring == nil) badmodule(Keyring->PATH); authinfo = keyring->readauthinfo("/usr/" + user() + "/keyring/default"); } mountpoint := lockdir = hd argv; if (tl argv != nil) mountpoint = hd tl argv; if (addr != nil) { if (doauth && algs == nil) algs = "none" :: nil; # XXX is this default a bad idea? srvrq := chan of (ref Sys->FD, string, Uproc); srvsync := chan of (int, string); spawn listener(addr, srvrq, srvsync, algs); (srvpid, err) := <-srvsync; srvsync = nil; if (srvpid == -1) { sys->fprint(stderr, "lockfs: failed to start listener: %s\n", err); raise "fail:errors"; } sync := chan of int; spawn server(srvrq, sync); <-sync; } else { rq := chan of (ref Sys->FD, string, Uproc); fds := array[2] of ref Sys->FD; sys->pipe(fds); sync := chan of int; spawn server(rq, sync); <-sync; rq <-= (fds[0], "lock", nil); rq <-= (nil, nil, nil); if (sys->mount(fds[1], nil, mountpoint, Sys->MREPL | Sys->MCREATE, nil) == -1) { sys->fprint(stderr, "lockfs: cannot mount: %r\n"); raise "fail:cannot mount"; } } } server(srvrq: chan of (ref Sys->FD, string, Uproc), sync: chan of int) { sys->pctl(Sys->FORKNS, nil); sync <-= 1; down := 0; nclient := 0; tchans := array[MAXCONN] of chan of ref Tmsg; srv := array[MAXCONN] of ref Styxserver; uprocs := array[MAXCONN] of Uproc; lockinit(); Service: for (;;) alt { (fd, reqstr, uprocch) := <-srvrq => if (fd == nil) { if (verbose && reqstr != nil) sys->print("lockfs: localserver going down (reason: %s)\n", reqstr); down = 1; } else { if (verbose) sys->print("lockfs: got new connection (s == '%s')\n", reqstr); for (i := 0; i < len tchans; i++) if (tchans[i] == nil) { (tchans[i], srv[i]) = Styxserver.new(fd); if(verbose) sys->print("svc started\n"); uprocs[i] = uprocch; break; } if (i == len tchans) { sys->fprint(stderr, "lockfs: too many clients\n"); # XXX expand arrays if (uprocch != nil) uprocch <-= (nil, nil); } else nclient++; } (n, gm) := <-tchans => if (handletmsg(srv[n], gm, uprocs[n]) == -1) { tchans[n] = nil; srv[n] = nil; if (uprocs[n] != nil) { uprocs[n] <-= (nil, nil); uprocs[n] = nil; } if (nclient-- <= 1 && down) break Service; } } if (verbose) sys->print("lockfs: finished\n"); } dirgen(nil: ref Styxserver, nil: ref Styxlib->Chan, nil: array of Dirtab, s: int): (int, Sys->Dir) { d: Sys->Dir; ll := locks; for (i := 0; i < s && ll != nil; i++) ll = tl ll; if (ll == nil) return (-1, d); return (1, (hd ll).d); } handletmsg(srv: ref Styxserver, gm: ref Tmsg, uproc: Uproc): int { { if (gm == nil) gm = ref Tmsg.Readerror(-1, "eof"); if(verbose) sys->print("<- %s\n", gm.text()); pick m := gm { Readerror => # could be more efficient... for (cl := srv.chanlist(); cl != nil; cl = tl cl) { c := hd cl; for (ll := locks; ll != nil; ll = tl ll) { if ((hd ll).d.qid.path == c.qid.path) { l := hd ll; l.waitq.flush(srv, -1); if (c.open) unlocked(l); break; } } } if (m.error != "eof") sys->fprint(stderr, "lockfs: read error: %s\n", m.error); return -1; Version => srv.devversion(m); Auth => srv.devauth(m); Walk => c := fid2chan(srv, m.fid); qids: array of Sys->Qid; cc := ref *c; if (len m.names > 0) { qids = array[1] of Sys->Qid; # it's just one level if ((cc.qid.qtype & Sys->QTDIR) == 0) { srv.reply(ref Rmsg.Error(m.tag, Enotdir)); break; } for (ll := locks; ll != nil; ll = tl ll) if ((hd ll).d.name == m.names[0]) break; if (ll == nil) { srv.reply(ref Rmsg.Error(m.tag, Enotfound)); break; } d := (hd ll).d; cc.qid = d.qid; cc.path = d.name; qids[0] = c.qid; } if(m.newfid != m.fid){ nc := srv.clone(cc, m.newfid); if(nc == nil){ srv.reply(ref Rmsg.Error(m.tag, Einuse)); break; } }else{ c.qid = cc.qid; c.path = cc.path; } srv.reply(ref Rmsg.Walk(m.tag, qids)); Open => c := fid2chan(srv, m.fid); if (c.qid.qtype & Sys->QTDIR) { srv.reply(ref Rmsg.Open(m.tag, c.qid, Styx->MAXFDATA)); break; } for (ll := locks; ll != nil; ll = tl ll) if ((hd ll).d.qid.path == c.qid.path) break; if (ll == nil) { srv.reply(ref Rmsg.Error(m.tag, Enotfound)); break; } l := hd ll; req := ref Openreq(srv, m.tag, m.mode, c, uproc); if (l.fd == nil || (m.mode == Sys->OREAD && l.writers == 0)) { openlockfile(l, req); } else { l.waitq.put(req); } req = nil; Create => c := fid2chan(srv, m.fid); if ((c.qid.qtype & Sys->QTDIR) == 0) { srv.reply(ref Rmsg.Error(m.tag, Enotdir)); break; } if (m.perm & Sys->DMDIR) { srv.reply(ref Rmsg.Error(m.tag, Eperm)); break; } for (ll := locks; ll != nil; ll = tl ll) if ((hd ll).d.name == m.name) break; if (ll != nil) { srv.reply(ref Rmsg.Error(m.tag, Eexists)); break; } (fd, err) := create(uproc, lockdir + "/" + m.name, m.mode, m.perm); if (fd == nil) { srv.reply(ref Rmsg.Error(m.tag, err)); break; } (ok, d) := sys->fstat(fd); if (ok == -1) { srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); break; } l := ref Lockfile(ref Lockqueue, fd, 0, 0, d); l.d.qid = (maxqidpath++, 0, Sys->QTFILE); l.d.mtime = l.d.atime = now(); if (m.mode == Sys->OREAD) l.readers = 1; else l.writers = 1; locks = l :: locks; c.qid.path = (hd locks).d.qid.path; c.open = 1; srv.reply(ref Rmsg.Create(m.tag, c.qid, Styx->MAXFDATA)); Read => c := fid2chan(srv, m.fid); if (c.qid.qtype & Sys->QTDIR) srv.devdirread(m, devgen, nil); else { l := qid2lock(c.qid); if (l == nil) srv.reply(ref Rmsg.Error(m.tag, Enotfound)); else { d := array[m.count] of byte; sys->seek(l.fd, m.offset, Sys->SEEKSTART); n := sys->read(l.fd, d, m.count); if (n == -1) srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); else { srv.reply(ref Rmsg.Read(m.tag, d[0:n])); l.d.atime = now(); } } } Write => c := fid2chan(srv, m.fid); if (c.qid.qtype & Sys->QTDIR) { srv.reply(ref Rmsg.Error(m.tag, Eperm)); break; } l := qid2lock(c.qid); if (l == nil) { srv.reply(ref Rmsg.Error(m.tag, Enotfound)); break; } sys->seek(l.fd, m.offset, Sys->SEEKSTART); n := sys->write(l.fd, m.data, len m.data); if (n == -1) srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); else { srv.reply(ref Rmsg.Write(m.tag, n)); nlength := m.offset + big n; if (nlength > l.d.length) l.d.length = nlength; l.d.mtime = now(); l.d.qid.vers++; } Clunk => c := srv.devclunk(m); if (c != nil && c.open && (l := qid2lock(c.qid)) != nil) unlocked(l); Flush => for (ll := locks; ll != nil; ll = tl ll) (hd ll).waitq.flush(srv, m.tag); srv.reply(ref Rmsg.Flush(m.tag)); Stat => srv.devstat(m, devgen, nil); Remove => c := fid2chan(srv, m.fid); srv.chanfree(c); if (c.qid.qtype & Sys->QTDIR) { srv.reply(ref Rmsg.Error(m.tag, Eperm)); break; } l := qid2lock(c.qid); if (l == nil) { srv.reply(ref Rmsg.Error(m.tag, Enotfound)); break; } if (l.fd != nil) { srv.reply(ref Rmsg.Error(m.tag, Elocked)); break; } if ((err := remove(uproc, lockdir + "/" + l.d.name)) == nil) { srv.reply(ref Rmsg.Error(m.tag, err)); break; } ll: list of ref Lockfile; for (; locks != nil; locks = tl locks) if (hd locks != l) ll = hd locks :: ll; locks = ll; srv.reply(ref Rmsg.Remove(m.tag)); Wstat => c := fid2chan(srv, m.fid); if (c.qid.qtype & Sys->QTDIR) { srv.reply(ref Rmsg.Error(m.tag, Eperm)); break; } l := qid2lock(c.qid); if (l == nil) { srv.reply(ref Rmsg.Error(m.tag, Enotfound)); break; } if ((err := wstat(uproc, lockdir + "/" + l.d.name, m.stat)) != nil) { srv.reply(ref Rmsg.Error(m.tag, err)); break; } (ok, d) := sys->stat(lockdir + "/" + m.stat.name); if (ok == -1) { srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); break; } d.qid = l.d.qid; l.d = d; srv.reply(ref Rmsg.Wstat(m.tag)); Attach => srv.devattach(m); } return 0; } exception e{ "panic:*" => sys->fprint(stderr, "lockfs: %s\n", e); srv.reply(ref Rmsg.Error(gm.tag, e[len "panic:":])); return 0; } } unlocked(l: ref Lockfile) { if (l.readers > 0) l.readers--; else l.writers--; if (l.readers > 0) return; l.fd = nil; # unblock all readers at the head of the queue. # XXX should we queuejump other readers? while ((nreq := l.waitq.peek()) != nil && l.writers == 0) { if (nreq.omode != Sys->OREAD && l.readers > 0) break; openlockfile(l, nreq); l.waitq.get(); } } openlockfile(l: ref Lockfile, req: ref Openreq): int { err: string; (l.fd, err) = open(req.uproc, lockdir + "/" + l.d.name, req.omode); if (l.fd == nil) { req.srv.reply(ref Rmsg.Error(req.tag, err)); return -1; } req.c.open = 1; if (req.omode & Sys->OTRUNC) l.d.length = big 0; req.srv.reply(ref Rmsg.Open(req.tag, l.d.qid, Styx->MAXFDATA)); if (req.omode == Sys->OREAD) l.readers++; else l.writers++; return 0; } qid2lock(q: Sys->Qid): ref Lockfile { for (ll := locks; ll != nil; ll = tl ll) if ((hd ll).d.qid.path == q.path) return hd ll; return nil; } lockinit() { fd := sys->open(lockdir, Sys->OREAD); if (fd == nil) return; lockl: list of ref Lockfile; # XXX if O(n²) behaviour is a problem, use Readdir module for(;;){ (n, e) := sys->dirread(fd); if(n <= 0) break; for (i := 0; i < n; i++) { for (l := lockl; l != nil; l = tl l) if ((hd l).d.name == e[i].name) break; if (l == nil) { e[i].qid = (maxqidpath++, 0, Sys->QTFILE); lockl = ref Lockfile(ref Lockqueue, nil, 0, 0, e[i]) :: lockl; } } } # remove all directories from list for (locks = nil; lockl != nil; lockl = tl lockl) if (((hd lockl).d.mode & Sys->DMDIR) == 0) locks = hd lockl :: locks; } fid2chan(srv: ref Styxserver, fid: int): ref Chan { c := srv.fidtochan(fid); if (c == nil) raise "panic:bad fid"; return c; } Lockqueue.put(q: self ref Lockqueue, s: ref Openreq) { q.t = s :: q.t; } Lockqueue.get(q: self ref Lockqueue): ref Openreq { s: ref Openreq; if(q.h == nil) (q.h, q.t) = (revrqlist(q.t), nil); if(q.h != nil) (s, q.h) = (hd q.h, tl q.h); return s; } Lockqueue.peek(q: self ref Lockqueue): ref Openreq { s := q.get(); if (s != nil) q.h = s :: q.h; return s; } doflush(l: list of ref Openreq, srv: ref Styxserver, tag: int): list of ref Openreq { oldl := l; nl: list of ref Openreq; doneone := 0; while (l != nil) { oreq := hd l; if (oreq.srv != srv || (tag != -1 && oreq.tag != tag)) nl = oreq :: nl; else doneone = 1; l = tl l; } if (doneone) return revrqlist(nl); else return oldl; } Lockqueue.flush(q: self ref Lockqueue, srv: ref Styxserver, tag: int) { q.h = doflush(q.h, srv, tag); q.t = doflush(q.t, srv, tag); } # or inline revrqlist(ls: list of ref Openreq) : list of ref Openreq { rs: list of ref Openreq; while(ls != nil){ rs = hd ls :: rs; ls = tl ls; } return rs; } # addr should be, e.g. tcp!*!2345 listener(addr: string, ch: chan of (ref Sys->FD, string, Uproc), sync: chan of (int, string), algs: list of string) { addr = dial->netmkaddr(addr, "tcp", "33234"); c := dial->announce(addr); if (c == nil) { sync <-= (-1, sys->sprint("cannot anounce on %s: %r", addr)); return; } sync <-= (sys->pctl(0, nil), nil); for (;;) { nc := dial->listen(c); if (nc == nil) { ch <-= (nil, sys->sprint("listen failed: %r"), nil); return; } dfd := sys->open(nc.dir + "/data", Sys->ORDWR); if (dfd != nil) { if (algs == nil) ch <-= (dfd, nil, nil); else spawn authenticator(dfd, ch, algs); } } } # authenticate a connection, setting the user id appropriately, # and then act as a server, performing file operations # on behalf of the central process. authenticator(dfd: ref Sys->FD, ch: chan of (ref Sys->FD, string, Uproc), algs: list of string) { (fd, err) := auth->server(algs, authinfo, dfd, 1); if (fd == nil) { if (verbose) sys->fprint(stderr, "lockfs: authentication failed: %s\n", err); return; } uproc := chan of (ref Ureq, chan of (ref Sys->FD, string)); ch <-= (fd, err, uproc); for (;;) { (req, reply) := <-uproc; if (req == nil) exit; reply <-= doreq(req); } } create(uproc: Uproc, file: string, omode: int, perm: int): (ref Sys->FD, string) { return proxydoreq(uproc, ref Ureq.Create(file, omode, perm)); } open(uproc: Uproc, file: string, omode: int): (ref Sys->FD, string) { return proxydoreq(uproc, ref Ureq.Open(file, omode)); } remove(uproc: Uproc, file: string): string { return proxydoreq(uproc, ref Ureq.Remove(file)).t1; } wstat(uproc: Uproc, file: string, d: Sys->Dir): string { return proxydoreq(uproc, ref Ureq.Wstat(file, d)).t1; } proxydoreq(uproc: Uproc, req: ref Ureq): (ref Sys->FD, string) { if (uproc == nil) return doreq(req); reply := chan of (ref Sys->FD, string); uproc <-= (req, reply); return <-reply; } doreq(greq: ref Ureq): (ref Sys->FD, string) { fd: ref Sys->FD; err: string; pick req := greq { Open => if ((fd = sys->open(req.fname, req.omode)) == nil) err = sys->sprint("%r"); Create => if ((fd = sys->create(req.fname, req.omode, req.perm)) == nil) err = sys->sprint("%r"); Remove => if (sys->remove(req.fname) == -1) err = sys->sprint("%r"); Wstat => if (sys->wstat(req.fname, req.dir) == -1) err = sys->sprint("%r"); } return (fd, err); } user(): string { fd := sys->open("/dev/user", sys->OREAD); if(fd == nil){ sys->fprint(stderr, "lockfs: can't open /dev/user: %r\n"); raise "fail:no user"; } buf := array[Sys->NAMEMAX] of byte; n := sys->read(fd, buf, len buf); if(n < 0) { sys->fprint(stderr, "lockfs: failed to read /dev/user: %r\n"); raise "fail:no user"; } return string buf[0:n]; } now(): int { buf := array[128] of byte; sys->seek(timefd, big 0, 0); if ((n := sys->read(timefd, buf, len buf)) < 0) return 0; return int (big string buf[0:n] / big 1000000); }