ref: 58fa29447b845f91dfc2a6734f525ed47375393b
parent: 03e60450c2acc20866867cc5d3649aaed07d0326
author: aiju <devnull@localhost>
date: Sat Dec 8 10:07:53 EST 2018
dtracy: add support for aggregations
--- a/sys/include/dtracy.h
+++ b/sys/include/dtracy.h
@@ -9,7 +9,14 @@
enum {
DTSTRMAX = 256,
DTRECMAX = 1024,
+
+ DTMAXAGGBUF = 16,
+
+ DTBUFSZ = 65536,
+ DTANUMBUCKETS = 1024,
+ DTABUCKETS = DTBUFSZ - 4 * DTANUMBUCKETS,
};
+#define DTANIL ((u32int)-1)
typedef struct DTName DTName;
typedef struct DTProbe DTProbe;
@@ -21,6 +28,7 @@
typedef struct DTChan DTChan;
typedef struct DTExpr DTExpr;
typedef struct DTProvider DTProvider;
+typedef struct DTAgg DTAgg;
typedef struct DTBuf DTBuf;
struct DTName {
@@ -30,7 +38,7 @@
};
/*
- we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID.
+ we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID called EPID.
we could also use probe IDs and action group IDs but using a single 32-bit ID for both is more flexible/efficient.
*/
struct DTEnab {
@@ -123,14 +131,52 @@
u32int *b;
};
+/*
+ aggregation buffers are hashtables and use a different record format.
+ there are DTANUMBUCKETS 4-byte buckets at the end of the buffer.
+ each entry is (link,id,key,val) with a 4-byte link field for the hash chains and a 4-byte aggregation id.
+
+ the aggregation id actually contains all the data in the DTAgg struct:
+ 4-bit type
+ 12-bit keysize in qwords
+ 16-bit unique id
+
+ the struct is just for kernel convenience
+*/
+
+enum {
+ AGGCNT,
+ AGGSUM,
+ AGGAVG,
+ AGGSTD,
+ AGGMIN,
+ AGGMAX,
+};
+
+struct DTAgg {
+ int id;
+ u16int keysize; /* in bytes */
+ u16int recsize;
+ uchar type;
+};
+
/* an action is an expression, plus info about what to do with the result */
struct DTAct {
enum {
ACTTRACE, /* record the result. size is the number of bytes used. 0 <= size <= 8 */
ACTTRACESTR, /* take the result to be a pointer to a null-terminated string. store it as zero-padded char[size]. */
+ /*
+ ACTAGGKEY and ACTAGGVAL together record a value in an aggregation.
+ they must occur as a pair and targ must point to an already allocated aggregation buffer.
+ currently 0 <= size <= 8.
+ */
+ ACTAGGKEY,
+ ACTAGGVAL,
+ ACTCANCEL, /* (must be last action) don't write anything into the main buffer. used to avoid pointless records when using aggregations. */
} type;
DTExpr *p;
int size;
+ DTAgg agg;
};
/* an action group is an optional predicate and a set of actions. */
@@ -144,7 +190,7 @@
int reclen; /* record size, including 12-byte header */
};
-/* a clause list probe wildcard expressions and an action group. only used during set-up. */
+/* a clause lists probe wildcard expressions and an action group. only used during set-up. */
struct DTClause {
int nprob;
char **probs;
@@ -151,7 +197,6 @@
DTActGr *gr;
};
-enum { DTBUFSZ = 65536 };
struct DTBuf {
int wr;
uchar data[DTBUFSZ];
@@ -170,6 +215,9 @@
/* we have 2 buffers per cpu, one for writing and one for reading. dtcread() swaps them if empty. */
DTBuf **wrbufs;
DTBuf **rdbufs;
+ /* aggregations use separate buffers */
+ DTBuf **aggwrbufs;
+ DTBuf **aggrdbufs;
/* list of enablings. */
DTEnab *enab;
@@ -191,7 +239,7 @@
/* action group functions */
void dtgpack(Fmt *, DTActGr *);
char *dtgunpack(char *, DTActGr **);
-int dtgverify(DTActGr *);
+int dtgverify(DTChan *, DTActGr *);
void dtgfree(DTActGr *);
/* clause functions */
@@ -205,8 +253,13 @@
int dtcaddgr(DTChan *, DTName, DTActGr *);
int dtcaddcl(DTChan *, DTClause *);
int dtcread(DTChan *, void *, int);
+int dtcaggread(DTChan *, void *, int);
void dtcreset(DTChan *);
void dtcrun(DTChan *, int);
+
+/* aggbuf functions */
+int dtaunpackid(DTAgg *);
+void dtarecord(DTChan *, int, DTAgg *, uchar *, int, s64int);
extern DTProvider *dtproviders[];
extern int dtnmach;
--- a/sys/src/9/port/devdtracy.c
+++ b/sys/src/9/port/devdtracy.c
@@ -38,7 +38,7 @@
dtclfree(c);
if(rc < 0){
dtcreset(p->ch);
- error("failed to add clause");
+ error(up->syserrstr);
}
}
}
@@ -54,6 +54,7 @@
Qprog,
Qbuf,
Qepid,
+ Qaggbuf,
};
static Dirtab dtracydir[] = {
@@ -61,6 +62,7 @@
"prog", { Qprog, 0, 0 }, 0, 0660,
"buf", { Qbuf, 0, 0, }, 0, 0440,
"epid", { Qepid, 0, 0 }, 0, 0440,
+ "aggbuf", { Qaggbuf, 0, 0 }, 0, 0440,
};
enum {
@@ -270,10 +272,49 @@
}
static long
+lockedread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
+{
+ long rc;
+
+ if(waserror()){
+ qunlock(&dtracylock);
+ nexterror();
+ }
+ eqlock(&dtracylock);
+ rc = readf(c, a, n);
+ qunlock(&dtracylock);
+ poperror();
+ return rc;
+}
+
+static long
+handleread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
+{
+ long rc, m;
+ int i;
+
+ for(;;){
+ rc = lockedread(c, a, n, readf);
+ if(rc < 0) return -1;
+ if(rc > 0) break;
+ tsleep(&up->sleep, return0, 0, 250);
+ }
+ m = rc;
+ for(i = 0; i < 3 && m < n/2; i++){
+ tsleep(&up->sleep, return0, 0, 50);
+ rc = lockedread(c, (uchar *)a + m, n - m, readf);
+ if(rc < 0) break;
+ m += rc;
+ }
+ return m;
+}
+
+static long
dtracyread(Chan *c, void *a, long n, vlong off)
{
int rc;
DTKChan *p;
+ DTChan *ch;
eqlock(&dtracylock);
if(waserror()){
@@ -299,9 +340,15 @@
rc = readstr(off, a, n, up->genbuf);
break;
case Qbuf:
- while(rc = dtcread(p->ch, a, n), rc == 0)
- tsleep(&up->sleep, return0, 0, 250);
- break;
+ ch = p->ch;
+ qunlock(&dtracylock);
+ poperror();
+ return handleread(ch, a, n, dtcread);
+ case Qaggbuf:
+ ch = p->ch;
+ qunlock(&dtracylock);
+ poperror();
+ return handleread(ch, a, n, dtcaggread);
case Qepid:
rc = epidread(c->aux, p->ch, a, n, off);
break;
@@ -460,8 +507,6 @@
switch(v){
case DTV_PID:
return up != nil ? up->pid : 0;
- case DTV_MACHNO:
- return m->machno;
default:
return 0;
}
--- a/sys/src/cmd/dtracy/act.c
+++ b/sys/src/cmd/dtracy/act.c
@@ -55,6 +55,27 @@
clause->probs[clause->nprob++] = strdup(s);
}
+static char *aggtypes[] = {
+ [AGGCNT] "count",
+ [AGGMIN] "min",
+ [AGGMAX] "max",
+ [AGGSUM] "sum",
+ [AGGAVG] "avg",
+ [AGGSTD] "std",
+};
+
+int
+aggtype(Symbol *s)
+{
+ int i;
+
+ for(i = 0; i < nelem(aggtypes); i++)
+ if(strcmp(s->name, aggtypes[i]) == 0)
+ return i;
+ error("%s unknown aggregation type", s->name);
+ return 0;
+}
+
void
addstat(int type, ...)
{
@@ -73,6 +94,19 @@
case STATPRINT:
case STATPRINTF:
break;
+ case STATAGG:
+ s->agg.name = va_arg(va, Symbol *);
+ s->agg.key = va_arg(va, Node *);
+ s->agg.type = aggtype(va_arg(va, Symbol *));
+ s->agg.value = va_arg(va, Node *);
+ if(s->agg.type == AGGCNT){
+ if(s->agg.value != nil)
+ error("too many arguments for count()");
+ }else{
+ if(s->agg.value == nil)
+ error("need argument for %s()", aggtypes[s->agg.type]);
+ }
+ break;
default:
sysfatal("addstat: unknown type %d", type);
}
@@ -158,6 +192,19 @@
(*arg)->str = fmtstrflush(&f);
}
+int aggid;
+
+int
+allagg(Clause *c)
+{
+ Stat *s;
+
+ for(s = c->stats; s < c->stats + c->nstats; s++)
+ if(s->type != STATAGG)
+ return 0;
+ return 1;
+}
+
DTClause *
mkdtclause(Clause *c)
{
@@ -164,6 +211,7 @@
DTClause *d;
Stat *s;
int recoff, i;
+ Node *n;
d = emalloc(sizeof(DTClause));
d->nprob = c->nprob;
@@ -175,7 +223,7 @@
for(s = c->stats; s < c->stats + c->nstats; s++)
switch(s->type){
case STATEXPR:
- actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0});
+ actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0, noagg});
break;
case STATPRINT:
for(i = 0; i < s->narg; i++)
@@ -184,7 +232,22 @@
case STATPRINTF:
prepprintf(s->arg, s->narg, d->gr, &recoff);
break;
+ case STATAGG: {
+ DTAgg agg = {.id = s->agg.type << 28 | 1 << 16 | aggid++};
+ assert(dtaunpackid(&agg) >= 0);
+ aggs = realloc(aggs, sizeof(Agg) * aggid);
+ memset(&aggs[aggid-1], 0, sizeof(Agg));
+ aggs[aggid-1].DTAgg = agg;
+ aggs[aggid-1].name = strdup(s->agg.name == nil ? "" : s->agg.name->name);
+ actgradd(d->gr, (DTAct){ACTAGGKEY, codegen(s->agg.key), 8, agg});
+ n = s->agg.value;
+ if(n == nil) n = node(ONUM, 0ULL);
+ actgradd(d->gr, (DTAct){ACTAGGVAL, codegen(n), 8, agg});
+ break;
}
+ }
+ if(allagg(c))
+ actgradd(d->gr, (DTAct){ACTCANCEL, codegen(node(ONUM, 0)), 0, noagg});
return d;
}
@@ -392,6 +455,7 @@
case STATPRINTF:
execprintf(s->arg, s->narg, p, e, en);
break;
+ case STATAGG: break;
default:
sysfatal("parseclause: unknown type %d", s->type);
}
@@ -546,6 +610,17 @@
print("\t\ttrace string (%d bytes)\n", a->size);
dumpexpr(a->p, "\t\t\t");
break;
+ case ACTAGGKEY:
+ print("\t\taggregation key (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
+ dumpexpr(a->p, "\t\t\t");
+ break;
+ case ACTAGGVAL:
+ print("\t\taggregation value (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
+ dumpexpr(a->p, "\t\t\t");
+ break;
+ case ACTCANCEL:
+ print("\t\tcancel record\n");
+ break;
default:
print("\t\t??? %d\n", a->type);
}
@@ -563,6 +638,8 @@
print("\t\tprintf\n");
for(j = 0; j < s->narg; j++)
print("\t\t\targ %ε\n", s->arg[j]);
+ break;
+ case STATAGG:
break;
default:
print("\t\t??? %d\n", s->type);
--- /dev/null
+++ b/sys/src/cmd/dtracy/agg.c
@@ -1,0 +1,199 @@
+#include <u.h>
+#include <libc.h>
+#include <dtracy.h>
+#include <bio.h>
+#include <avl.h>
+#include <mp.h>
+#include "dat.h"
+#include "fns.h"
+
+typedef struct ANode ANode;
+
+struct ANode {
+ Avl;
+ s64int val, cnt;
+ u64int sq[2];
+ int keysize;
+ uchar key[1];
+};
+
+Agg *aggs;
+static Avltree **trees;
+static ANode *key;
+int interrupted;
+
+static int
+aggcmp(Avl *ap, Avl *bp)
+{
+ ANode *a, *b;
+
+ a = (ANode *) ap;
+ b = (ANode *) bp;
+ return memcmp(a->key, b->key, a->keysize);
+}
+
+static void
+createrecord(int type, ANode *n, s64int *q)
+{
+ switch(type){
+ case AGGCNT: n->cnt = q[0]; break;
+ case AGGSUM: case AGGMIN: case AGGMAX: n->val = q[0]; break;
+ case AGGAVG: n->cnt = q[1]; n->val = q[0]; break;
+ case AGGSTD: n->cnt = q[1]; n->val = q[0]; n->sq[0] = q[2]; n->sq[1] = q[3]; break;
+ default: abort();
+ }
+}
+
+static void
+updaterecord(int type, ANode *n, s64int *q)
+{
+ u64int r;
+
+ switch(type){
+ case AGGCNT: n->cnt += q[0]; break;
+ case AGGSUM: n->val += q[0]; break;
+ case AGGAVG: n->cnt += q[1]; n->val += q[0]; break;
+ case AGGSTD:
+ n->cnt += q[1];
+ n->val += q[0];
+ r = n->sq[0] + q[2];
+ if(r < q[2]) n->sq[1]++;
+ n->sq[0] = r;
+ n->sq[1] += q[3];
+ break;
+ default: abort();
+ }
+}
+
+
+int
+aggparsebuf(uchar *p, int n)
+{
+ uchar *e;
+ Agg *a;
+ u32int id;
+ Avltree *tp;
+ ANode *np;
+
+ e = p + n;
+ for(; p + 8 < e; p += a->recsize){
+ id = *(u32int*)&p[4];
+ if((u16int)id >= aggid){
+ inval:
+ fprint(2, "invalid record in aggregation buffer\n");
+ return -1;
+ }
+ a = &aggs[(u16int)id];
+ if(a->type != id>>28) goto inval;
+ if(a->keysize != (id>>13&0x7ff8)) goto inval;
+ if(p + a->recsize > e) goto inval;
+ tp = trees[(u16int)id];
+ key->keysize = a->keysize;
+ memcpy(key->key, &p[8], a->keysize);
+ np = (ANode *) avllookup(tp, key, 0);
+ if(np == nil){
+ np = emalloc(sizeof(ANode) - 1 + a->keysize);
+ *np = *key;
+ createrecord(a->type, np, (s64int*)&p[8+a->keysize]);
+ avlinsert(tp, np);
+ }else
+ updaterecord(a->type, np, (s64int*)&p[8+a->keysize]);
+ }
+ return 0;
+}
+
+void
+agginit(void)
+{
+ int i, m;
+
+ trees = emalloc(sizeof(Avltree *) * aggid);
+ m = 0;
+ for(i = 0; i < aggid; i++){
+ trees[i] = avlcreate(aggcmp);
+ if(aggs[i].keysize > m)
+ m = aggs[i].keysize;
+ }
+ key = emalloc(sizeof(ANode) - 1 + m);
+}
+
+int
+aggnote(void *, char *note)
+{
+ if(strcmp(note, "interrupt") != 0 || interrupted)
+ return 0;
+ interrupted = 1;
+ return 1;
+}
+
+void
+aggkeyprint(Fmt *f, Agg *, ANode *a)
+{
+ fmtprint(f, "%20lld ", *(u64int*)a->key);
+}
+
+static double
+variance(ANode *a)
+{
+ mpint *x, *y, *z;
+ double r;
+
+ x = vtomp(a->val, nil);
+ y = uvtomp(a->sq[0], nil);
+ z = vtomp(a->sq[1], nil);
+ mpleft(z, 64, z);
+ mpadd(z, y, y);
+ vtomp(a->cnt, z);
+ mpmul(x, x, x);
+ mpmul(y, z, y);
+ mpsub(y, x, x);
+ r = mptod(x) / a->cnt;
+ mpfree(x);
+ mpfree(y);
+ mpfree(z);
+ return r;
+}
+
+void
+aggvalprint(Fmt *f, int type, ANode *a)
+{
+ double x, s;
+
+ switch(type){
+ case AGGCNT: fmtprint(f, "%20lld", a->cnt); break;
+ case AGGSUM: case AGGMIN: case AGGMAX: fmtprint(f, "%20lld", a->val); break;
+ case AGGAVG: fmtprint(f, "%20g", (double)a->val / a->cnt); break;
+ case AGGSTD:
+ x = (double)a->val / a->cnt;
+ s = variance(a);
+ if(s < 0)
+ fmtprint(f, "%20g %20s", x, "NaN");
+ else{
+ fmtprint(f, "%20g %20g", x, sqrt(s));
+ }
+ break;
+ default:
+ abort();
+ }
+}
+
+void
+aggdump(void)
+{
+ Fmt f;
+ char buf[8192];
+ int i;
+ ANode *a;
+
+ fmtfdinit(&f, 1, buf, sizeof(buf));
+ for(i = 0; i < aggid; i++){
+ a = (ANode *) avlmin(trees[i]);
+ for(; a != nil; a = (ANode *) avlnext(a)){
+ fmtprint(&f, "%s\t", aggs[i].name);
+ aggkeyprint(&f, &aggs[i], a);
+ aggvalprint(&f, aggs[i].type, a);
+ fmtprint(&f, "\n");
+ }
+ }
+ fmtfdflush(&f);
+}
--- a/sys/src/cmd/dtracy/cgen.c
+++ b/sys/src/cmd/dtracy/cgen.c
@@ -296,10 +296,10 @@
case ORECORD:
switch(n->typ->type){
case TYPINT:
- actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size});
+ actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size, noagg});
break;
case TYPSTRING:
- actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size});
+ actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size, noagg});
break;
default:
sysfatal("tracegen: don't know how to record %τ", n->typ);
--- a/sys/src/cmd/dtracy/dat.h
+++ b/sys/src/cmd/dtracy/dat.h
@@ -5,6 +5,7 @@
typedef struct Enab Enab;
typedef struct Stat Stat;
typedef struct Type Type;
+typedef struct Agg Agg;
enum {
SYMHASH = 256,
@@ -89,10 +90,19 @@
STATEXPR,
STATPRINT,
STATPRINTF,
+ STATAGG,
} type;
+ /* STATEXPR */
Node *n;
+ /* STATPRINT, STATPRINTF */
int narg;
Node **arg;
+ /* STATAGG */
+ struct {
+ Symbol *name;
+ int type;
+ Node *key, *value;
+ } agg;
};
struct Clause {
@@ -112,6 +122,11 @@
Enab *next;
};
+struct Agg {
+ DTAgg;
+ char *name;
+};
+
extern int errors;
#pragma varargck type "α" int
@@ -121,3 +136,6 @@
#pragma varargck argpos error 1
extern int dflag;
+extern DTAgg noagg;
+extern int aggid;
+extern Agg *aggs;
--- a/sys/src/cmd/dtracy/dtracy.c
+++ b/sys/src/cmd/dtracy/dtracy.c
@@ -5,6 +5,8 @@
#include "dat.h"
#include "fns.h"
+DTAgg noagg;
+
char *dtracyroot = "#Δ";
int dtracyno;
int ctlfd, buffd;
@@ -160,7 +162,7 @@
}
-void
+int
bufread(Biobuf *bp)
{
static uchar buf[65536];
@@ -167,16 +169,49 @@
int n;
n = read(buffd, buf, sizeof(buf));
- if(n < 0) sysfatal("bufread: %r");
+ if(n < 0)
+ sysfatal("bufread: %r");
if(parsebuf(buf, n, bp) < 0)
sysfatal("parsebuf: %r");
Bflush(bp);
+ return 0;
}
+void
+aggproc(void)
+{
+ char buf[65536];
+ int buffd, n;
+ extern int interrupted;
+
+ switch(rfork(RFPROC|RFMEM)){
+ case -1: sysfatal("rfork: %r");
+ case 0: return;
+ default: break;
+ }
+ snprint(buf, sizeof(buf), "%s/%d/aggbuf", dtracyroot, dtracyno);
+ buffd = open(buf, OREAD);
+ if(buffd < 0) sysfatal("open: %r");
+ agginit();
+ atnotify(aggnote, 1);
+ while(!interrupted){
+ n = read(buffd, buf, sizeof(buf));
+ if(n < 0){
+ if(interrupted)
+ break;
+ sysfatal("aggbufread: %r");
+ }
+ if(aggparsebuf((uchar *) buf, n) < 0)
+ exits("error");
+ }
+ aggdump();
+ exits(nil);
+}
+
static void
usage(void)
{
- fprint(2, "usage: %s [ -cd ] script\n", argv0);
+ fprint(2, "usage: %s [ -d ] script\n", argv0);
exits("usage");
}
@@ -217,6 +252,8 @@
fprint(ctlfd, "go");
out = Bfdopen(1, OWRITE);
if(out == nil) sysfatal("Bfdopen: %r");
+ if(aggid > 0)
+ aggproc();
for(;;)
bufread(out);
}
--- a/sys/src/cmd/dtracy/fns.h
+++ b/sys/src/cmd/dtracy/fns.h
@@ -33,3 +33,7 @@
int min(int, int);
int max(int, int);
Node *addtype(Type *, Node *);
+int aggparsebuf(uchar *, int);
+int aggnote(void *, char *);
+void aggdump(void);
+void agginit(void);
--- a/sys/src/cmd/dtracy/mkfile
+++ b/sys/src/cmd/dtracy/mkfile
@@ -9,6 +9,7 @@
cgen.$O\
act.$O\
type.$O\
+ agg.$O\
YFILES=parse.y
--- a/sys/src/cmd/dtracy/parse.y
+++ b/sys/src/cmd/dtracy/parse.y
@@ -16,7 +16,8 @@
Type *t;
}
-%type <n> expr
+%type <n> expr optexpr
+%type <sym> optsym
%type <t> type
%token <sym> TSYM
@@ -63,7 +64,9 @@
stat: expr { addstat(STATEXPR, exprcheck($1, 0)); }
| TPRINT { addstat(STATPRINT); } pelist
| TPRINTF { addstat(STATPRINTF); } pelist
-
+| '@' optsym '[' expr ']' '=' TSYM '(' optexpr ')' { addstat(STATAGG, $2, $4, $7, $9); }
+optsym: TSYM | { $$ = nil; }
+optexpr: expr | { $$ = nil; }
pelist:
'(' ')'
--- /dev/null
+++ b/sys/src/libdtracy/agg.c
@@ -1,0 +1,138 @@
+#include <u.h>
+#include <libc.h>
+#include <dtracy.h>
+
+int
+dtaunpackid(DTAgg *a)
+{
+ a->type = a->id >> 28 & 15;
+ a->keysize = a->id >> 13 & 0x7ff8;
+ switch(a->type){
+ case AGGCNT:
+ case AGGSUM:
+ case AGGMIN:
+ case AGGMAX:
+ a->recsize = 8 + a->keysize + 8;
+ return 0;
+ case AGGAVG:
+ a->recsize = 8 + a->keysize + 16;
+ return 0;
+ case AGGSTD:
+ a->recsize = 8 + a->keysize + 32;
+ return 0;
+ default:
+ return -1;
+ }
+}
+
+static u64int
+hash(uchar *s, int n, int m)
+{
+ u64int h;
+ int i;
+
+ h = 0xcbf29ce484222325ULL;
+ for(i = 0; i < n; i++){
+ h ^= s[i];
+ h *= 0x100000001b3ULL;
+ }
+ for(; i < m; i++)
+ h *= 0x100000001b3ULL;
+ return h;
+}
+
+static int
+keyeq(uchar *a, uchar *b, int n, int m)
+{
+ int i;
+
+ for(i = 0; i < n; i++)
+ if(a[i] != b[i])
+ return 0;
+ for(; i < m; i++)
+ if(a[i] != 0)
+ return 0;
+ return 1;
+}
+
+/* calculate v*v with 128 bits precision and add it to the 128-bit word at q */
+static void
+addsquare(u64int *q, s64int v)
+{
+ u32int v0;
+ s32int v1;
+ s64int s0, s1, s2;
+ u64int r;
+
+ v0 = v;
+ v1 = v>>32;
+ s0 = (s64int)v0 * (s64int)v0;
+ s1 = (s64int)v0 * (s64int)v1;
+ s2 = (s64int)v1 * (s64int)v1;
+ r = s0 + (s1<<33);
+ if(r < (u64int)s0) q[1]++;
+ q[0] += r;
+ if(q[0] < r) q[1]++;
+ q[1] += s2 + (s1>>31);
+}
+
+static void
+updaterecord(int type, u64int *q, s64int val)
+{
+ switch(type){
+ case AGGCNT: q[0] += 1; break;
+ case AGGSUM: q[0] += val; break;
+ case AGGAVG: q[0] += val; q[1]++; break;
+ case AGGMIN: if(val < q[0]) q[0] = val; break;
+ case AGGMAX: if(val > q[0]) q[0] = val; break;
+ case AGGSTD: q[0] += val; q[1]++; addsquare(&q[2], val); break;
+ }
+}
+
+static void
+createrecord(int type, u64int *q, s64int val)
+{
+ switch(type){
+ case AGGCNT: q[0] = 1; break;
+ case AGGSUM: case AGGMIN: case AGGMAX: q[0] = val; break;
+ case AGGAVG: q[0] = val; q[1] = 1; break;
+ case AGGSTD: q[0] = val; q[1] = 1; q[2] = 0; q[3] = 0; addsquare(&q[2], val); break;
+ }
+}
+
+/* runs in probe context */
+void
+dtarecord(DTChan *ch, int mach, DTAgg *a, uchar *key, int nkey, s64int val)
+{
+ u64int h;
+ u32int *p, *q;
+ DTBuf *c;
+
+ c = ch->aggwrbufs[mach];
+ h = hash(key, nkey, a->keysize);
+ p = (u32int*)(c->data + DTABUCKETS + (h % DTANUMBUCKETS) * 4);
+ while(*p != DTANIL){
+ assert((uint)*p < DTABUCKETS);
+ q = (u32int*)(c->data + *p);
+ if(q[1] == a->id && keyeq((uchar*)(q + 2), key, nkey, a->keysize) == 0){
+ updaterecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
+ return;
+ }
+ p = q;
+ }
+ if(c->wr + a->recsize > DTABUCKETS)
+ return;
+ *p = c->wr;
+ q = (u32int*)(c->data + c->wr);
+ q[0] = DTANIL;
+ q[1] = a->id;
+ if(nkey == a->keysize)
+ memmove(&q[2], key, nkey);
+ else if(nkey > a->keysize){
+ memmove(&q[2], key, nkey);
+ memset((uchar*)q + 8 + nkey, 0, a->keysize - nkey);
+ }else
+ memmove(&q[2], key, a->keysize);
+ createrecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
+ c->wr += a->recsize;
+}
--- a/sys/src/libdtracy/chan.c
+++ b/sys/src/libdtracy/chan.c
@@ -44,6 +44,14 @@
c->rdbufs[i] = dtmalloc(sizeof(DTBuf));
c->wrbufs[i] = dtmalloc(sizeof(DTBuf));
}
+ c->aggrdbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
+ c->aggwrbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
+ for(i = 0; i < dtnmach; i++){
+ c->aggrdbufs[i] = dtmalloc(sizeof(DTBuf));
+ c->aggwrbufs[i] = dtmalloc(sizeof(DTBuf));
+ memset(c->aggrdbufs[i]->data, -1, DTBUFSZ);
+ memset(c->aggwrbufs[i]->data, -1, DTBUFSZ);
+ }
return c;
}
@@ -63,6 +71,12 @@
}
free(ch->rdbufs);
free(ch->wrbufs);
+ for(i = 0; i < dtnmach; i++){
+ free(ch->aggrdbufs[i]);
+ free(ch->aggwrbufs[i]);
+ }
+ free(ch->aggrdbufs);
+ free(ch->aggwrbufs);
free(ch);
}
@@ -73,7 +87,7 @@
DTEnab *ep;
int i, nl, n;
- if(dtgverify(gr) < 0)
+ if(dtgverify(c, gr) < 0)
return -1;
gr->chan = c;
@@ -194,6 +208,47 @@
return 0;
}
+static void
+dtcaggbufswap(DTChan *c, int n)
+{
+ DTBuf *z;
+
+ dtmachlock(n);
+ z = c->aggrdbufs[n];
+ c->aggrdbufs[n] = c->aggwrbufs[n];
+ c->aggwrbufs[n] = z;
+ dtmachunlock(n);
+}
+
+int
+dtcaggread(DTChan *c, void *buf, int n)
+{
+ int i, swapped;
+
+ if(c->state == DTCFAULT){
+ werrstr("%s", c->errstr);
+ return -1;
+ }
+ for(i = 0; i < dtnmach; i++){
+ if(swapped = c->aggrdbufs[i]->wr == 0)
+ dtcaggbufswap(c, i);
+ if(c->aggrdbufs[i]->wr != 0){
+ if(c->aggrdbufs[i]->wr > n){
+ werrstr("short read");
+ return -1;
+ }
+ n = c->aggrdbufs[i]->wr;
+ memmove(buf, c->aggrdbufs[i]->data, n);
+ c->aggrdbufs[i]->wr = 0;
+ memset(c->aggrdbufs[i]->data + DTABUCKETS, -1, 4 * DTANUMBUCKETS);
+ if(!swapped)
+ dtcaggbufswap(c, i);
+ return n;
+ }
+ }
+ return 0;
+}
+
void
dtcreset(DTChan *c)
{
@@ -200,7 +255,7 @@
DTEnab *ep, *eq;
for(ep = c->enab; ep != nil; ep = ep->channext){
- /* careful! has to look atomic for etptrigger */
+ /* careful! has to look atomic for dtptrigger */
ep->probprev->probnext = ep->probnext;
ep->probnext->probprev = ep->probprev;
}
--- a/sys/src/libdtracy/mkfile
+++ b/sys/src/libdtracy/mkfile
@@ -8,6 +8,7 @@
dtefmt.$O\
pack.$O\
chan.$O\
+ agg.$O\
HFILES=\
/sys/include/dtracy.h\
--- a/sys/src/libdtracy/pack.c
+++ b/sys/src/libdtracy/pack.c
@@ -27,6 +27,12 @@
fmtprint(f, "t%d\n", g->acts[i].type);
fmtprint(f, "s%d\n", g->acts[i].size);
dtepack(f, g->acts[i].p);
+ switch(g->acts[i].type){
+ case ACTAGGKEY:
+ case ACTAGGVAL:
+ fmtprint(f, "A%#.8ux\n", g->acts[i].agg.id);
+ break;
+ }
}
fmtprint(f, "G");
}
@@ -132,6 +138,18 @@
case ACTTRACESTR:
g->reclen += g->acts[i].size;
break;
+ case ACTAGGKEY:
+ if(*s++ != 'A') goto fail;
+ s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
+ if(s == nil) goto fail;
+ break;
+ case ACTAGGVAL:
+ if(*s++ != 'A') goto fail;
+ s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
+ if(s == nil) goto fail;
+ break;
+ case ACTCANCEL:
+ break;
default:
goto fail;
}
@@ -182,7 +200,7 @@
int i;
if(c == nil) return;
- if(--c->gr->ref == 0)
+ if(c->gr != nil && --c->gr->ref == 0)
dtgfree(c->gr);
for(i = 0; i < c->nprob; i++)
free(c->probs[i]);
--- a/sys/src/libdtracy/prog.c
+++ b/sys/src/libdtracy/prog.c
@@ -80,7 +80,7 @@
}
int
-dtgverify(DTActGr *g)
+dtgverify(DTChan *, DTActGr *g)
{
int i;
@@ -96,6 +96,26 @@
if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > DTRECMAX)
return -1;
break;
+ case ACTAGGKEY:
+ if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
+ return -1;
+ if(i == g->nact - 1 || g->acts[i+1].type != ACTAGGVAL || g->acts[i+1].agg.id != g->acts[i].agg.id)
+ return -1;
+ break;
+ case ACTAGGVAL:
+ if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
+ return -1;
+ if(i == 0 || g->acts[i-1].type != ACTAGGKEY)
+ return -1;
+ if(dtaunpackid(&g->acts[i].agg) < 0)
+ return -1;
+ break;
+ case ACTCANCEL:
+ if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0)
+ return -1;
+ if(i != g->nact - 1)
+ return -1;
+ break;
default:
return -1;
}
@@ -225,6 +245,7 @@
DTBuf *b;
u8int *bp;
s64int v;
+ uchar aggkey[8];
int i, j;
b = g->chan->wrbufs[info->machno];
@@ -240,6 +261,8 @@
PUT4(info->epid);
PUT8(info->ts);
for(i = 0; i < g->nact; i++){
+ if(g->acts[i].type == ACTCANCEL)
+ return 0;
if(dteexec(g->acts[i].p, info, &v) < 0)
return -1;
switch(g->acts[i].type){
@@ -255,6 +278,15 @@
return -1;
}
bp += g->acts[i].size;
+ break;
+ case ACTAGGKEY:
+ for(j = 0; j < g->acts[i].size; j++){
+ aggkey[j] = v;
+ v >>= 8;
+ }
+ break;
+ case ACTAGGVAL:
+ dtarecord(g->chan, info->machno, &g->acts[i].agg, aggkey, g->acts[i-1].size, v);
break;
}
}