ref: c22ebef4e1221a3fb276524a86bc3b25d2662653
dir: /sys/src/cmd/venti/srv/arena.c/
#include "stdinc.h" #include "dat.h" #include "fns.h" typedef struct ASum ASum; struct ASum { Arena *arena; ASum *next; }; static void sealarena(Arena *arena); static int okarena(Arena *arena); static int loadarena(Arena *arena); static CIBlock *getcib(Arena *arena, int clump, int writing, CIBlock *rock); static void putcib(Arena *arena, CIBlock *cib); static void sumproc(void *); static void loadcig(Arena *arena); static QLock sumlock; static Rendez sumwait; static ASum *sumq; static ASum *sumqtail; static uchar zero[8192]; int arenasumsleeptime; int initarenasum(void) { needzeroscore(); /* OS X */ sumwait.l = &sumlock; if(vtproc(sumproc, nil) < 0){ seterr(EOk, "can't start arena checksum slave: %r"); return -1; } return 0; } /* * make an Arena, and initialize it based upon the disk header and trailer. */ Arena* initarena(Part *part, u64int base, u64int size, u32int blocksize) { Arena *arena; arena = MKZ(Arena); arena->part = part; arena->blocksize = blocksize; arena->clumpmax = arena->blocksize / ClumpInfoSize; arena->base = base + blocksize; arena->size = size - 2 * blocksize; if(loadarena(arena) < 0){ seterr(ECorrupt, "arena header or trailer corrupted"); freearena(arena); return nil; } if(okarena(arena) < 0){ freearena(arena); return nil; } if(arena->diskstats.sealed && scorecmp(zeroscore, arena->score)==0) sealarena(arena); return arena; } void freearena(Arena *arena) { if(arena == nil) return; free(arena); } Arena* newarena(Part *part, u32int vers, char *name, u64int base, u64int size, u32int blocksize) { int bsize; Arena *arena; if(nameok(name) < 0){ seterr(EOk, "illegal arena name", name); return nil; } arena = MKZ(Arena); arena->part = part; arena->version = vers; if(vers == ArenaVersion4) arena->clumpmagic = _ClumpMagic; else{ do arena->clumpmagic = fastrand(); while(arena->clumpmagic==_ClumpMagic || arena->clumpmagic==0); } arena->blocksize = blocksize; arena->clumpmax = arena->blocksize / ClumpInfoSize; arena->base = base + blocksize; arena->size = size - 2 * blocksize; namecp(arena->name, name); bsize = sizeof zero; if(bsize > arena->blocksize) bsize = arena->blocksize; if(wbarena(arena)<0 || wbarenahead(arena)<0 || writepart(arena->part, arena->base, zero, bsize)<0){ freearena(arena); return nil; } return arena; } int readclumpinfo(Arena *arena, int clump, ClumpInfo *ci) { CIBlock *cib, r; cib = getcib(arena, clump, 0, &r); if(cib == nil) return -1; unpackclumpinfo(ci, &cib->data->data[cib->offset]); putcib(arena, cib); return 0; } int readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n) { CIBlock *cib, r; int i; /* * because the clump blocks are laid out * in reverse order at the end of the arena, * it can be a few percent faster to read * the clumps backwards, which reads the * disk blocks forwards. */ for(i = n-1; i >= 0; i--){ cib = getcib(arena, clump + i, 0, &r); if(cib == nil){ n = i; continue; } unpackclumpinfo(&cis[i], &cib->data->data[cib->offset]); putcib(arena, cib); } return n; } /* * write directory information for one clump * must be called the arena locked */ int writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci) { CIBlock *cib, r; cib = getcib(arena, clump, 1, &r); if(cib == nil) return -1; dirtydblock(cib->data, DirtyArenaCib); packclumpinfo(ci, &cib->data->data[cib->offset]); putcib(arena, cib); return 0; } u64int arenadirsize(Arena *arena, u32int clumps) { return ((clumps / arena->clumpmax) + 1) * arena->blocksize; } /* * read a clump of data * n is a hint of the size of the data, not including the header * make sure it won't run off the end, then return the number of bytes actually read */ u32int readarena(Arena *arena, u64int aa, u8int *buf, long n) { DBlock *b; u64int a; u32int blocksize, off, m; long nn; if(n == 0) return -1; qlock(&arena->lock); a = arena->size - arenadirsize(arena, arena->memstats.clumps); qunlock(&arena->lock); if(aa >= a){ seterr(EOk, "reading beyond arena clump storage: clumps=%d aa=%lld a=%lld -1 clumps=%lld\n", arena->memstats.clumps, aa, a, arena->size - arenadirsize(arena, arena->memstats.clumps - 1)); return -1; } if(aa + n > a) n = a - aa; blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, OREAD); if(b == nil) return -1; m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&buf[nn], &b->data[off], m); putdblock(b); nn += m; if(nn == n) break; off = 0; a += blocksize; } return n; } /* * write some data to the clump section at a given offset * used to fix up corrupted arenas. */ u32int writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n) { DBlock *b; u64int a; u32int blocksize, off, m; long nn; int ok; if(n == 0) return -1; qlock(&arena->lock); a = arena->size - arenadirsize(arena, arena->memstats.clumps); if(aa >= a || aa + n > a){ qunlock(&arena->lock); seterr(EOk, "writing beyond arena clump storage"); return -1; } blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, off != 0 || off + n < blocksize ? ORDWR : OWRITE); if(b == nil){ qunlock(&arena->lock); return -1; } dirtydblock(b, DirtyArena); m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&b->data[off], &clbuf[nn], m); ok = 0; putdblock(b); if(ok < 0){ qunlock(&arena->lock); return -1; } nn += m; if(nn == n) break; off = 0; a += blocksize; } qunlock(&arena->lock); return n; } /* * allocate space for the clump and write it, * updating the arena directory ZZZ question: should this distinguish between an arena filling up and real errors writing the clump? */ u64int writeaclump(Arena *arena, Clump *c, u8int *clbuf) { DBlock *b; u64int a, aa; u32int clump, n, nn, m, off, blocksize; int ok; n = c->info.size + ClumpSize + U32Size; qlock(&arena->lock); aa = arena->memstats.used; if(arena->memstats.sealed || aa + n + U32Size + arenadirsize(arena, arena->memstats.clumps + 1) > arena->size){ if(!arena->memstats.sealed){ logerr(EOk, "seal memstats %s", arena->name); arena->memstats.sealed = 1; wbarena(arena); } qunlock(&arena->lock); return TWID64; } if(packclump(c, &clbuf[0], arena->clumpmagic) < 0){ qunlock(&arena->lock); return TWID64; } /* * write the data out one block at a time */ blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, off != 0 ? ORDWR : OWRITE); if(b == nil){ qunlock(&arena->lock); return TWID64; } dirtydblock(b, DirtyArena); m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&b->data[off], &clbuf[nn], m); ok = 0; putdblock(b); if(ok < 0){ qunlock(&arena->lock); return TWID64; } nn += m; if(nn == n) break; off = 0; a += blocksize; } arena->memstats.used += c->info.size + ClumpSize; arena->memstats.uncsize += c->info.uncsize; if(c->info.size < c->info.uncsize) arena->memstats.cclumps++; clump = arena->memstats.clumps; if(clump % ArenaCIGSize == 0){ if(arena->cig == nil){ loadcig(arena); if(arena->cig == nil) goto NoCIG; } /* add aa as start of next cig */ if(clump/ArenaCIGSize != arena->ncig){ fprint(2, "bad arena cig computation %s: writing clump %d but %d cigs\n", arena->name, clump, arena->ncig); arena->ncig = -1; vtfree(arena->cig); arena->cig = nil; goto NoCIG; } arena->cig = vtrealloc(arena->cig, (arena->ncig+1)*sizeof arena->cig[0]); arena->cig[arena->ncig++].offset = aa; } NoCIG: arena->memstats.clumps++; if(arena->memstats.clumps == 0) sysfatal("clumps wrapped"); arena->wtime = now(); if(arena->ctime == 0) arena->ctime = arena->wtime; writeclumpinfo(arena, clump, &c->info); wbarena(arena); qunlock(&arena->lock); return aa; } int atailcmp(ATailStats *a, ATailStats *b) { /* good test */ if(a->used < b->used) return -1; if(a->used > b->used) return 1; /* suspect tests - why order this way? (no one cares) */ if(a->clumps < b->clumps) return -1; if(a->clumps > b->clumps) return 1; if(a->cclumps < b->cclumps) return -1; if(a->cclumps > b->cclumps) return 1; if(a->uncsize < b->uncsize) return -1; if(a->uncsize > b->uncsize) return 1; if(a->sealed < b->sealed) return -1; if(a->sealed > b->sealed) return 1; /* everything matches */ return 0; } void setatailstate(AState *as) { int i, j, osealed; Arena *a; Index *ix; trace(0, "setatailstate %s 0x%llux clumps %d", as->arena->name, as->aa, as->stats.clumps); /* * Look up as->arena to find index. */ needmainindex(); /* OS X linker */ ix = mainindex; for(i=0; i<ix->narenas; i++) if(ix->arenas[i] == as->arena) break; if(i==ix->narenas || as->aa < ix->amap[i].start || as->aa >= ix->amap[i].stop || as->arena != ix->arenas[i]){ fprint(2, "funny settailstate 0x%llux\n", as->aa); return; } for(j=0; j<=i; j++){ a = ix->arenas[j]; if(atailcmp(&a->diskstats, &a->memstats) == 0) continue; qlock(&a->lock); osealed = a->diskstats.sealed; if(j == i) a->diskstats = as->stats; else a->diskstats = a->memstats; wbarena(a); if(a->diskstats.sealed != osealed && !a->inqueue) sealarena(a); qunlock(&a->lock); } } /* * once sealed, an arena never has any data added to it. * it should only be changed to fix errors. * this also syncs the clump directory. */ static void sealarena(Arena *arena) { arena->inqueue = 1; backsumarena(arena); } void backsumarena(Arena *arena) { ASum *as; if(sumwait.l == nil) return; as = MK(ASum); if(as == nil) return; qlock(&sumlock); as->arena = arena; as->next = nil; if(sumq) sumqtail->next = as; else sumq = as; sumqtail = as; rwakeup(&sumwait); qunlock(&sumlock); } static void sumproc(void *unused) { ASum *as; Arena *arena; USED(unused); for(;;){ qlock(&sumlock); while(sumq == nil) rsleep(&sumwait); as = sumq; sumq = as->next; qunlock(&sumlock); arena = as->arena; free(as); sumarena(arena); } } void sumarena(Arena *arena) { ZBlock *b; DigestState s; u64int a, e; u32int bs; int t; u8int score[VtScoreSize]; bs = MaxIoSize; if(bs < arena->blocksize) bs = arena->blocksize; /* * read & sum all blocks except the last one */ flushdcache(); memset(&s, 0, sizeof s); b = alloczblock(bs, 0, arena->part->blocksize); e = arena->base + arena->size; for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){ disksched(); while((t=arenasumsleeptime) == SleepForever){ sleep(1000); disksched(); } sleep(t); if(a + bs > e) bs = arena->blocksize; if(readpart(arena->part, a, b->data, bs) < 0) goto ReadErr; addstat(StatSumRead, 1); addstat(StatSumReadBytes, bs); sha1(b->data, bs, nil, &s); } /* * the last one is special, since it may already have the checksum included */ bs = arena->blocksize; if(readpart(arena->part, e, b->data, bs) < 0){ ReadErr: logerr(EOk, "sumarena can't sum %s, read at %lld failed: %r", arena->name, a); freezblock(b); return; } addstat(StatSumRead, 1); addstat(StatSumReadBytes, bs); sha1(b->data, bs-VtScoreSize, nil, &s); sha1(zeroscore, VtScoreSize, nil, &s); sha1(nil, 0, score, &s); /* * check for no checksum or the same */ if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0 && scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0) logerr(EOk, "overwriting mismatched checksums for arena=%s, found=%V calculated=%V", arena->name, &b->data[bs - VtScoreSize], score); freezblock(b); qlock(&arena->lock); scorecp(arena->score, score); wbarena(arena); qunlock(&arena->lock); } /* * write the arena trailer block to the partition */ int wbarena(Arena *arena) { DBlock *b; int bad; if((b = getdblock(arena->part, arena->base + arena->size, OWRITE)) == nil){ logerr(EAdmin, "can't write arena trailer: %r"); return -1; } dirtydblock(b, DirtyArenaTrailer); bad = okarena(arena)<0 || packarena(arena, b->data)<0; scorecp(b->data + arena->blocksize - VtScoreSize, arena->score); putdblock(b); if(bad) return -1; return 0; } int wbarenahead(Arena *arena) { ZBlock *b; ArenaHead head; int bad; namecp(head.name, arena->name); head.version = arena->version; head.size = arena->size + 2 * arena->blocksize; head.blocksize = arena->blocksize; head.clumpmagic = arena->clumpmagic; b = alloczblock(arena->blocksize, 1, arena->part->blocksize); if(b == nil){ logerr(EAdmin, "can't write arena header: %r"); /* ZZZ add error message? */ return -1; } /* * this writepart is okay because it only happens * during initialization. */ bad = packarenahead(&head, b->data)<0 || writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0 || flushpart(arena->part)<0; freezblock(b); if(bad) return -1; return 0; } /* * read the arena header and trailer blocks from disk */ static int loadarena(Arena *arena) { ArenaHead head; ZBlock *b; b = alloczblock(arena->blocksize, 0, arena->part->blocksize); if(b == nil) return -1; if(readpart(arena->part, arena->base + arena->size, b->data, arena->blocksize) < 0){ freezblock(b); return -1; } if(unpackarena(arena, b->data) < 0){ freezblock(b); return -1; } if(arena->version != ArenaVersion4 && arena->version != ArenaVersion5){ seterr(EAdmin, "unknown arena version %d", arena->version); freezblock(b); return -1; } scorecp(arena->score, &b->data[arena->blocksize - VtScoreSize]); if(readpart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize) < 0){ logerr(EAdmin, "can't read arena header: %r"); freezblock(b); return 0; } if(unpackarenahead(&head, b->data) < 0) logerr(ECorrupt, "corrupted arena header: %r"); else if(namecmp(arena->name, head.name)!=0 || arena->clumpmagic != head.clumpmagic || arena->version != head.version || arena->blocksize != head.blocksize || arena->size + 2 * arena->blocksize != head.size){ if(namecmp(arena->name, head.name)!=0) logerr(ECorrupt, "arena tail name %s head %s", arena->name, head.name); else if(arena->clumpmagic != head.clumpmagic) logerr(ECorrupt, "arena %d tail clumpmagic 0x%lux head 0x%lux", debugarena, (ulong)arena->clumpmagic, (ulong)head.clumpmagic); else if(arena->version != head.version) logerr(ECorrupt, "arena tail version %d head version %d", arena->version, head.version); else if(arena->blocksize != head.blocksize) logerr(ECorrupt, "arena tail block size %d head %d", arena->blocksize, head.blocksize); else if(arena->size+2*arena->blocksize != head.size) logerr(ECorrupt, "arena tail size %lud head %lud", (ulong)arena->size+2*arena->blocksize, head.size); else logerr(ECorrupt, "arena header inconsistent with arena data"); } freezblock(b); return 0; } static int okarena(Arena *arena) { u64int dsize; int ok; ok = 0; dsize = arenadirsize(arena, arena->diskstats.clumps); if(arena->diskstats.used + dsize > arena->size){ seterr(ECorrupt, "arena %s used > size", arena->name); ok = -1; } if(arena->diskstats.cclumps > arena->diskstats.clumps) logerr(ECorrupt, "arena %s has more compressed clumps than total clumps", arena->name); /* * This need not be true if some of the disk is corrupted. * if(arena->diskstats.uncsize + arena->diskstats.clumps * ClumpSize + arena->blocksize < arena->diskstats.used) logerr(ECorrupt, "arena %s uncompressed size inconsistent with used space %lld %d %lld", arena->name, arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used); */ /* * this happens; it's harmless. * if(arena->ctime > arena->wtime) logerr(ECorrupt, "arena %s creation time after last write time", arena->name); */ return ok; } static CIBlock* getcib(Arena *arena, int clump, int writing, CIBlock *rock) { int mode; CIBlock *cib; u32int block, off; if(clump >= arena->memstats.clumps){ seterr(EOk, "clump directory access out of range"); return nil; } block = clump / arena->clumpmax; off = (clump - block * arena->clumpmax) * ClumpInfoSize; cib = rock; cib->block = block; cib->offset = off; if(writing){ if(off == 0 && clump == arena->memstats.clumps-1) mode = OWRITE; else mode = ORDWR; }else mode = OREAD; cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, mode); if(cib->data == nil) return nil; return cib; } static void putcib(Arena *arena, CIBlock *cib) { USED(arena); putdblock(cib->data); cib->data = nil; } /* * For index entry readahead purposes, the arenas are * broken into smaller subpieces, called clump info groups * or cigs. Each cig has ArenaCIGSize clumps (ArenaCIGSize * is chosen to make the index entries take up about half * a megabyte). The index entries do not contain enough * information to determine what the clump index is for * a given address in an arena. That info is needed both for * figuring out which clump group an address belongs to * and for prefetching a clump group's index entries from * the arena table of contents. The first time clump groups * are accessed, we scan the entire arena table of contents * (which might be 10s of megabytes), recording the data * offset of each clump group. */ /* * load clump info group information by scanning entire toc. */ static void loadcig(Arena *arena) { u32int i, j, ncig, nci; ArenaCIG *cig; ClumpInfo *ci; u64int offset; int ms; if(arena->cig || arena->ncig < 0) return; // fprint(2, "loadcig %s\n", arena->name); ncig = (arena->memstats.clumps+ArenaCIGSize-1) / ArenaCIGSize; if(ncig == 0){ arena->cig = vtmalloc(1); arena->ncig = 0; return; } ms = msec(); cig = vtmalloc(ncig*sizeof cig[0]); ci = vtmalloc(ArenaCIGSize*sizeof ci[0]); offset = 0; for(i=0; i<ncig; i++){ nci = readclumpinfos(arena, i*ArenaCIGSize, ci, ArenaCIGSize); cig[i].offset = offset; for(j=0; j<nci; j++) offset += ClumpSize + ci[j].size; if(nci < ArenaCIGSize){ if(i != ncig-1){ vtfree(ci); vtfree(cig); arena->ncig = -1; fprint(2, "loadcig %s: got %ud cigs, expected %ud\n", arena->name, i+1, ncig); goto out; } } } vtfree(ci); arena->ncig = ncig; arena->cig = cig; out: ms = msec() - ms; addstat2(StatCigLoad, 1, StatCigLoadTime, ms); } /* * convert arena address into arena group + data boundaries. */ int arenatog(Arena *arena, u64int addr, u64int *gstart, u64int *glimit, int *g) { int r, l, m; qlock(&arena->lock); if(arena->cig == nil) loadcig(arena); if(arena->cig == nil || arena->ncig == 0){ qunlock(&arena->lock); return -1; } l = 1; r = arena->ncig - 1; while(l <= r){ m = (r + l) / 2; if(arena->cig[m].offset <= addr) l = m + 1; else r = m - 1; } l--; *g = l; *gstart = arena->cig[l].offset; if(l+1 < arena->ncig) *glimit = arena->cig[l+1].offset; else *glimit = arena->memstats.used; qunlock(&arena->lock); return 0; } /* * load the clump info for group g into the index entries. */ int asumload(Arena *arena, int g, IEntry *entries, int nentries) { int i, base, limit; u64int addr; ClumpInfo ci; IEntry *ie; if(nentries < ArenaCIGSize){ fprint(2, "asking for too few entries\n"); return -1; } qlock(&arena->lock); if(arena->cig == nil) loadcig(arena); if(arena->cig == nil || arena->ncig == 0 || g >= arena->ncig){ qunlock(&arena->lock); return -1; } addr = 0; base = g*ArenaCIGSize; limit = base + ArenaCIGSize; if(base > arena->memstats.clumps) base = arena->memstats.clumps; ie = entries; for(i=base; i<limit; i++){ if(readclumpinfo(arena, i, &ci) < 0) break; if(ci.type != VtCorruptType){ scorecp(ie->score, ci.score); ie->ia.type = ci.type; ie->ia.size = ci.uncsize; ie->ia.blocks = (ci.size + ClumpSize + (1<<ABlockLog) - 1) >> ABlockLog; ie->ia.addr = addr; ie++; } addr += ClumpSize + ci.size; } qunlock(&arena->lock); return ie - entries; }