shithub: riscv

Download patch

ref: 58fa29447b845f91dfc2a6734f525ed47375393b
parent: 03e60450c2acc20866867cc5d3649aaed07d0326
author: aiju <devnull@localhost>
date: Sat Dec 8 10:07:53 EST 2018

dtracy: add support for aggregations

--- a/sys/include/dtracy.h
+++ b/sys/include/dtracy.h
@@ -9,7 +9,14 @@
 enum {
 	DTSTRMAX = 256,
 	DTRECMAX = 1024,
+	
+	DTMAXAGGBUF = 16,
+	
+	DTBUFSZ = 65536,
+	DTANUMBUCKETS = 1024,
+	DTABUCKETS = DTBUFSZ - 4 * DTANUMBUCKETS,
 };
+#define DTANIL ((u32int)-1)
 
 typedef struct DTName DTName;
 typedef struct DTProbe DTProbe;
@@ -21,6 +28,7 @@
 typedef struct DTChan DTChan;
 typedef struct DTExpr DTExpr;
 typedef struct DTProvider DTProvider;
+typedef struct DTAgg DTAgg;
 typedef struct DTBuf DTBuf;
 
 struct DTName {
@@ -30,7 +38,7 @@
 };
 
 /*
-	we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID.
+	we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID called EPID.
 	we could also use probe IDs and action group IDs but using a single 32-bit ID for both is more flexible/efficient.
 */
 struct DTEnab {
@@ -123,14 +131,52 @@
 	u32int *b;
 };
 
+/*
+	aggregation buffers are hashtables and use a different record format.
+	there are DTANUMBUCKETS 4-byte buckets at the end of the buffer.
+	each entry is (link,id,key,val) with a 4-byte link field for the hash chains and a 4-byte aggregation id.
+	
+	the aggregation id actually contains all the data in the DTAgg struct:
+	4-bit type
+	12-bit keysize in qwords
+	16-bit unique id
+	
+	the struct is just for kernel convenience
+*/
+
+enum {
+	AGGCNT,
+	AGGSUM,
+	AGGAVG,
+	AGGSTD,
+	AGGMIN,
+	AGGMAX,
+};
+	
+struct DTAgg {
+	int id;
+	u16int keysize; /* in bytes */
+	u16int recsize;
+	uchar type;
+};
+
 /* an action is an expression, plus info about what to do with the result */
 struct DTAct {
 	enum {
 		ACTTRACE, /* record the result. size is the number of bytes used. 0 <= size <= 8 */
 		ACTTRACESTR, /* take the result to be a pointer to a null-terminated string. store it as zero-padded char[size]. */
+		/*
+			ACTAGGKEY and ACTAGGVAL together record a value in an aggregation.
+			they must occur as a pair and targ must point to an already allocated aggregation buffer.
+			currently 0 <= size <= 8.
+		*/
+		ACTAGGKEY,
+		ACTAGGVAL,
+		ACTCANCEL, /* (must be last action) don't write anything into the main buffer. used to avoid pointless records when using aggregations. */
 	} type;
 	DTExpr *p;
 	int size;
+	DTAgg agg;
 };
 
 /* an action group is an optional predicate and a set of actions. */
@@ -144,7 +190,7 @@
 	int reclen; /* record size, including 12-byte header */
 };
 
-/* a clause list probe wildcard expressions and an action group. only used during set-up. */
+/* a clause lists probe wildcard expressions and an action group. only used during set-up. */
 struct DTClause {
 	int nprob;
 	char **probs;
@@ -151,7 +197,6 @@
 	DTActGr *gr;
 };
 
-enum { DTBUFSZ = 65536 };
 struct DTBuf {
 	int wr;
 	uchar data[DTBUFSZ];
@@ -170,6 +215,9 @@
 	/* we have 2 buffers per cpu, one for writing and one for reading. dtcread() swaps them if empty. */
 	DTBuf **wrbufs;
 	DTBuf **rdbufs;
+	/* aggregations use separate buffers */
+	DTBuf **aggwrbufs;
+	DTBuf **aggrdbufs;
 	
 	/* list of enablings. */
 	DTEnab *enab;
@@ -191,7 +239,7 @@
 /* action group functions */
 void dtgpack(Fmt *, DTActGr *);
 char *dtgunpack(char *, DTActGr **);
-int dtgverify(DTActGr *);
+int dtgverify(DTChan *, DTActGr *);
 void dtgfree(DTActGr *);
 
 /* clause functions */
@@ -205,8 +253,13 @@
 int dtcaddgr(DTChan *, DTName, DTActGr *);
 int dtcaddcl(DTChan *, DTClause *);
 int dtcread(DTChan *, void *, int);
+int dtcaggread(DTChan *, void *, int);
 void dtcreset(DTChan *);
 void dtcrun(DTChan *, int);
+
+/* aggbuf functions */
+int dtaunpackid(DTAgg *);
+void dtarecord(DTChan *, int, DTAgg *, uchar *, int, s64int);
 
 extern DTProvider *dtproviders[];
 extern int dtnmach;
--- a/sys/src/9/port/devdtracy.c
+++ b/sys/src/9/port/devdtracy.c
@@ -38,7 +38,7 @@
 		dtclfree(c);
 		if(rc < 0){
 			dtcreset(p->ch);
-			error("failed to add clause");
+			error(up->syserrstr);
 		}
 	}
 }
@@ -54,6 +54,7 @@
 	Qprog,
 	Qbuf,
 	Qepid,
+	Qaggbuf,
 };
 
 static Dirtab dtracydir[] = {
@@ -61,6 +62,7 @@
 	"prog", { Qprog, 0, 0 }, 0,	0660,
 	"buf",	{ Qbuf, 0, 0, }, 0,	0440,
 	"epid",	{ Qepid, 0, 0 }, 0,	0440,
+	"aggbuf",	{ Qaggbuf, 0, 0 }, 0,	0440,
 };
 
 enum {
@@ -270,10 +272,49 @@
 }
 
 static long
+lockedread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
+{
+	long rc;
+
+	if(waserror()){
+		qunlock(&dtracylock);
+		nexterror();
+	}
+	eqlock(&dtracylock);
+	rc = readf(c, a, n);
+	qunlock(&dtracylock);
+	poperror();
+	return rc;
+}
+
+static long
+handleread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
+{
+	long rc, m;
+	int i;
+
+	for(;;){
+		rc = lockedread(c, a, n, readf);
+		if(rc < 0) return -1;
+		if(rc > 0) break;
+		tsleep(&up->sleep, return0, 0, 250);
+	}
+	m = rc;
+	for(i = 0; i < 3 && m < n/2; i++){
+		tsleep(&up->sleep, return0, 0, 50);
+		rc = lockedread(c, (uchar *)a + m, n - m, readf);
+		if(rc < 0) break;
+		m += rc;
+	}
+	return m;
+}
+
+static long
 dtracyread(Chan *c, void *a, long n, vlong off)
 {
 	int rc;
 	DTKChan *p;
+	DTChan *ch;
 
 	eqlock(&dtracylock);
 	if(waserror()){
@@ -299,9 +340,15 @@
 		rc = readstr(off, a, n, up->genbuf);
 		break;
 	case Qbuf:
-		while(rc = dtcread(p->ch, a, n), rc == 0)
-			tsleep(&up->sleep, return0, 0, 250);
-		break;
+		ch = p->ch;
+		qunlock(&dtracylock);
+		poperror();
+		return handleread(ch, a, n, dtcread);
+	case Qaggbuf:
+		ch = p->ch;
+		qunlock(&dtracylock);
+		poperror();
+		return handleread(ch, a, n, dtcaggread);
 	case Qepid:
 		rc = epidread(c->aux, p->ch, a, n, off);
 		break;
@@ -460,8 +507,6 @@
 	switch(v){
 	case DTV_PID:
 		return up != nil ? up->pid : 0;
-	case DTV_MACHNO:
-		return m->machno;
 	default:
 		return 0;
 	}
--- a/sys/src/cmd/dtracy/act.c
+++ b/sys/src/cmd/dtracy/act.c
@@ -55,6 +55,27 @@
 	clause->probs[clause->nprob++] = strdup(s);
 }
 
+static char *aggtypes[] = {
+	[AGGCNT] "count",
+	[AGGMIN] "min",
+	[AGGMAX] "max",
+	[AGGSUM] "sum",
+	[AGGAVG] "avg",
+	[AGGSTD] "std",
+};
+
+int
+aggtype(Symbol *s)
+{
+	int i;
+
+	for(i = 0; i < nelem(aggtypes); i++)
+		if(strcmp(s->name, aggtypes[i]) == 0)
+			return i;
+	error("%s unknown aggregation type", s->name);
+	return 0;
+}
+
 void
 addstat(int type, ...)
 {
@@ -73,6 +94,19 @@
 	case STATPRINT:
 	case STATPRINTF:
 		break;
+	case STATAGG:
+		s->agg.name = va_arg(va, Symbol *);
+		s->agg.key = va_arg(va, Node *);
+		s->agg.type = aggtype(va_arg(va, Symbol *));
+		s->agg.value = va_arg(va, Node *);
+		if(s->agg.type == AGGCNT){
+			if(s->agg.value != nil)
+				error("too many arguments for count()");
+		}else{
+			if(s->agg.value == nil)
+				error("need argument for %s()", aggtypes[s->agg.type]);
+		}
+		break;
 	default:
 		sysfatal("addstat: unknown type %d", type);
 	}
@@ -158,6 +192,19 @@
 	(*arg)->str = fmtstrflush(&f);
 }
 
+int aggid;
+
+int
+allagg(Clause *c)
+{
+	Stat *s;
+
+	for(s = c->stats; s < c->stats + c->nstats; s++)
+		if(s->type != STATAGG)
+			return 0;
+	return 1;
+}
+
 DTClause *
 mkdtclause(Clause *c)
 {
@@ -164,6 +211,7 @@
 	DTClause *d;
 	Stat *s;
 	int recoff, i;
+	Node *n;
 	
 	d = emalloc(sizeof(DTClause));
 	d->nprob = c->nprob;
@@ -175,7 +223,7 @@
 	for(s = c->stats; s < c->stats + c->nstats; s++)
 		switch(s->type){
 		case STATEXPR:
-			actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0});
+			actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0, noagg});
 			break;
 		case STATPRINT:
 			for(i = 0; i < s->narg; i++)
@@ -184,7 +232,22 @@
 		case STATPRINTF:
 			prepprintf(s->arg, s->narg, d->gr, &recoff);
 			break;
+		case STATAGG: {
+			DTAgg agg = {.id = s->agg.type << 28 | 1 << 16 | aggid++};
+			assert(dtaunpackid(&agg) >= 0);
+			aggs = realloc(aggs, sizeof(Agg) * aggid);
+			memset(&aggs[aggid-1], 0, sizeof(Agg));
+			aggs[aggid-1].DTAgg = agg;
+			aggs[aggid-1].name = strdup(s->agg.name == nil ? "" : s->agg.name->name);
+			actgradd(d->gr, (DTAct){ACTAGGKEY, codegen(s->agg.key), 8, agg});
+			n = s->agg.value;
+			if(n == nil) n = node(ONUM, 0ULL);
+			actgradd(d->gr, (DTAct){ACTAGGVAL, codegen(n), 8, agg});
+			break;
 		}
+		}
+	if(allagg(c))
+		actgradd(d->gr, (DTAct){ACTCANCEL, codegen(node(ONUM, 0)), 0, noagg});
 	return d;
 }
 
@@ -392,6 +455,7 @@
 		case STATPRINTF:
 			execprintf(s->arg, s->narg, p, e, en);
 			break;
+		case STATAGG: break;
 		default:
 			sysfatal("parseclause: unknown type %d", s->type);
 		}
@@ -546,6 +610,17 @@
 				print("\t\ttrace string (%d bytes)\n", a->size);
 				dumpexpr(a->p, "\t\t\t");
 				break;
+			case ACTAGGKEY:
+				print("\t\taggregation key (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
+				dumpexpr(a->p, "\t\t\t");
+				break;
+			case ACTAGGVAL:
+				print("\t\taggregation value (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
+				dumpexpr(a->p, "\t\t\t");
+				break;
+			case ACTCANCEL:
+				print("\t\tcancel record\n");
+				break;
 			default:
 				print("\t\t??? %d\n", a->type);
 			}
@@ -563,6 +638,8 @@
 				print("\t\tprintf\n");
 				for(j = 0; j < s->narg; j++)
 					print("\t\t\targ %ε\n", s->arg[j]);
+				break;
+			case STATAGG:
 				break;
 			default:
 				print("\t\t??? %d\n", s->type);
--- /dev/null
+++ b/sys/src/cmd/dtracy/agg.c
@@ -1,0 +1,199 @@
+#include <u.h>
+#include <libc.h>
+#include <dtracy.h>
+#include <bio.h>
+#include <avl.h>
+#include <mp.h>
+#include "dat.h"
+#include "fns.h"
+
+typedef struct ANode ANode;
+
+struct ANode {
+	Avl;
+	s64int val, cnt;
+	u64int sq[2];
+	int keysize;
+	uchar key[1];
+};
+
+Agg *aggs;
+static Avltree **trees;
+static ANode *key;
+int interrupted;
+
+static int
+aggcmp(Avl *ap, Avl *bp)
+{
+	ANode *a, *b;
+	
+	a = (ANode *) ap;
+	b = (ANode *) bp;
+	return memcmp(a->key, b->key, a->keysize);
+}
+
+static void
+createrecord(int type, ANode *n, s64int *q)
+{
+	switch(type){
+	case AGGCNT: n->cnt = q[0]; break;
+	case AGGSUM: case AGGMIN: case AGGMAX: n->val = q[0]; break;
+	case AGGAVG: n->cnt = q[1]; n->val = q[0]; break;
+	case AGGSTD: n->cnt = q[1]; n->val = q[0]; n->sq[0] = q[2]; n->sq[1] = q[3]; break;
+	default: abort();
+	}
+}
+
+static void
+updaterecord(int type, ANode *n, s64int *q)
+{
+	u64int r;
+
+	switch(type){
+	case AGGCNT: n->cnt += q[0]; break;
+	case AGGSUM: n->val += q[0]; break;
+	case AGGAVG: n->cnt += q[1]; n->val += q[0]; break;
+	case AGGSTD:
+		n->cnt += q[1];
+		n->val += q[0];
+		r = n->sq[0] + q[2];
+		if(r < q[2]) n->sq[1]++;
+		n->sq[0] = r;
+		n->sq[1] += q[3];
+		break;
+	default: abort();
+	}
+}
+
+
+int
+aggparsebuf(uchar *p, int n)
+{
+	uchar *e;
+	Agg *a;
+	u32int id;
+	Avltree *tp;
+	ANode *np;
+	
+	e = p + n;
+	for(; p + 8 < e; p += a->recsize){
+		id = *(u32int*)&p[4];
+		if((u16int)id >= aggid){
+		inval:
+			fprint(2, "invalid record in aggregation buffer\n");
+			return -1;
+		}
+		a = &aggs[(u16int)id];
+		if(a->type != id>>28) goto inval;
+		if(a->keysize != (id>>13&0x7ff8)) goto inval;
+		if(p + a->recsize > e) goto inval;
+		tp = trees[(u16int)id];
+		key->keysize = a->keysize;
+		memcpy(key->key, &p[8], a->keysize);
+		np = (ANode *) avllookup(tp, key, 0);
+		if(np == nil){
+			np = emalloc(sizeof(ANode) - 1 + a->keysize);
+			*np = *key;
+			createrecord(a->type, np, (s64int*)&p[8+a->keysize]);
+			avlinsert(tp, np);
+		}else
+			updaterecord(a->type, np, (s64int*)&p[8+a->keysize]);
+	}
+	return 0;
+}
+
+void
+agginit(void)
+{
+	int i, m;
+	
+	trees = emalloc(sizeof(Avltree *) * aggid);
+	m = 0;
+	for(i = 0; i < aggid; i++){
+		trees[i] = avlcreate(aggcmp);
+		if(aggs[i].keysize > m)
+			m = aggs[i].keysize;
+	}
+	key = emalloc(sizeof(ANode) - 1 + m);
+}
+
+int
+aggnote(void *, char *note)
+{
+	if(strcmp(note, "interrupt") != 0 || interrupted)
+		return 0;
+	interrupted = 1;
+	return 1;
+}
+
+void
+aggkeyprint(Fmt *f, Agg *, ANode *a)
+{
+	fmtprint(f, "%20lld ", *(u64int*)a->key);
+}
+
+static double
+variance(ANode *a)
+{
+	mpint *x, *y, *z;
+	double r;
+	
+	x = vtomp(a->val, nil);
+	y = uvtomp(a->sq[0], nil);
+	z = vtomp(a->sq[1], nil);
+	mpleft(z, 64, z);
+	mpadd(z, y, y);
+	vtomp(a->cnt, z);
+	mpmul(x, x, x);
+	mpmul(y, z, y);
+	mpsub(y, x, x);
+	r = mptod(x) / a->cnt;
+	mpfree(x);
+	mpfree(y);
+	mpfree(z);
+	return r;
+}
+
+void
+aggvalprint(Fmt *f, int type, ANode *a)
+{
+	double x, s;
+
+	switch(type){
+	case AGGCNT: fmtprint(f, "%20lld", a->cnt); break;
+	case AGGSUM: case AGGMIN: case AGGMAX: fmtprint(f, "%20lld", a->val); break;
+	case AGGAVG: fmtprint(f, "%20g", (double)a->val / a->cnt); break;
+	case AGGSTD:
+		x = (double)a->val / a->cnt;
+		s = variance(a);
+		if(s < 0)
+			fmtprint(f, "%20g %20s", x, "NaN");
+		else{
+			fmtprint(f, "%20g %20g", x, sqrt(s));
+		}
+		break;
+	default:
+		abort();
+	}
+}
+
+void
+aggdump(void)
+{
+	Fmt f;
+	char buf[8192];
+	int i;
+	ANode *a;
+	
+	fmtfdinit(&f, 1, buf, sizeof(buf));
+	for(i = 0; i < aggid; i++){
+		a = (ANode *) avlmin(trees[i]);
+		for(; a != nil; a = (ANode *) avlnext(a)){
+			fmtprint(&f, "%s\t", aggs[i].name);
+			aggkeyprint(&f, &aggs[i], a);
+			aggvalprint(&f, aggs[i].type, a);
+			fmtprint(&f, "\n");
+		}
+	}
+	fmtfdflush(&f);
+}
--- a/sys/src/cmd/dtracy/cgen.c
+++ b/sys/src/cmd/dtracy/cgen.c
@@ -296,10 +296,10 @@
 	case ORECORD:
 		switch(n->typ->type){
 		case TYPINT:
-			actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size});
+			actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size, noagg});
 			break;
 		case TYPSTRING:
-			actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size});
+			actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size, noagg});
 			break;
 		default:
 			sysfatal("tracegen: don't know how to record %τ", n->typ);
--- a/sys/src/cmd/dtracy/dat.h
+++ b/sys/src/cmd/dtracy/dat.h
@@ -5,6 +5,7 @@
 typedef struct Enab Enab;
 typedef struct Stat Stat;
 typedef struct Type Type;
+typedef struct Agg Agg;
 
 enum {
 	SYMHASH = 256,
@@ -89,10 +90,19 @@
 		STATEXPR,
 		STATPRINT,
 		STATPRINTF,
+		STATAGG,
 	} type;
+	/* STATEXPR */
 	Node *n;
+	/* STATPRINT, STATPRINTF */
 	int narg;
 	Node **arg;
+	/* STATAGG */
+	struct {
+		Symbol *name;
+		int type;
+		Node *key, *value;
+	} agg;
 };
 
 struct Clause {
@@ -112,6 +122,11 @@
 	Enab *next;
 };
 
+struct Agg {
+	DTAgg;
+	char *name;
+};
+
 extern int errors;
 
 #pragma	varargck	type	"α"	int
@@ -121,3 +136,6 @@
 #pragma varargck	argpos error 1
 
 extern int dflag;
+extern DTAgg noagg;
+extern int aggid;
+extern Agg *aggs;
--- a/sys/src/cmd/dtracy/dtracy.c
+++ b/sys/src/cmd/dtracy/dtracy.c
@@ -5,6 +5,8 @@
 #include "dat.h"
 #include "fns.h"
 
+DTAgg noagg;
+
 char *dtracyroot = "#Δ";
 int dtracyno;
 int ctlfd, buffd;
@@ -160,7 +162,7 @@
 	
 }
 
-void
+int
 bufread(Biobuf *bp)
 {
 	static uchar buf[65536];
@@ -167,16 +169,49 @@
 	int n;
 	
 	n = read(buffd, buf, sizeof(buf));
-	if(n < 0) sysfatal("bufread: %r");
+	if(n < 0)
+		sysfatal("bufread: %r");
 	if(parsebuf(buf, n, bp) < 0)
 		sysfatal("parsebuf: %r");
 	Bflush(bp);
+	return 0;
 }
 
+void
+aggproc(void)
+{
+	char buf[65536];
+	int buffd, n;
+	extern int interrupted;
+
+	switch(rfork(RFPROC|RFMEM)){
+	case -1: sysfatal("rfork: %r");
+	case 0: return;
+	default: break;
+	}
+	snprint(buf, sizeof(buf), "%s/%d/aggbuf", dtracyroot, dtracyno);
+	buffd = open(buf, OREAD);
+	if(buffd < 0) sysfatal("open: %r");
+	agginit();
+	atnotify(aggnote, 1);
+	while(!interrupted){
+		n = read(buffd, buf, sizeof(buf));
+		if(n < 0){
+			if(interrupted)
+				break;
+			sysfatal("aggbufread: %r");
+		}
+		if(aggparsebuf((uchar *) buf, n) < 0)
+			exits("error");
+	}
+	aggdump();
+	exits(nil);
+}
+
 static void
 usage(void)
 {
-	fprint(2, "usage: %s [ -cd ] script\n", argv0);
+	fprint(2, "usage: %s [ -d ] script\n", argv0);
 	exits("usage");
 }
 
@@ -217,6 +252,8 @@
 		fprint(ctlfd, "go");
 		out = Bfdopen(1, OWRITE);
 		if(out == nil) sysfatal("Bfdopen: %r");
+		if(aggid > 0)
+			aggproc();
 		for(;;)
 			bufread(out);
 	}
--- a/sys/src/cmd/dtracy/fns.h
+++ b/sys/src/cmd/dtracy/fns.h
@@ -33,3 +33,7 @@
 int min(int, int);
 int max(int, int);
 Node *addtype(Type *, Node *);
+int aggparsebuf(uchar *, int);
+int aggnote(void *, char *);
+void aggdump(void);
+void agginit(void);
--- a/sys/src/cmd/dtracy/mkfile
+++ b/sys/src/cmd/dtracy/mkfile
@@ -9,6 +9,7 @@
 	cgen.$O\
 	act.$O\
 	type.$O\
+	agg.$O\
 
 YFILES=parse.y
 
--- a/sys/src/cmd/dtracy/parse.y
+++ b/sys/src/cmd/dtracy/parse.y
@@ -16,7 +16,8 @@
 	Type *t;
 }
 
-%type <n> expr
+%type <n> expr optexpr
+%type <sym> optsym
 %type <t> type
 
 %token <sym> TSYM
@@ -63,7 +64,9 @@
 stat: expr { addstat(STATEXPR, exprcheck($1, 0)); }
 | TPRINT { addstat(STATPRINT); } pelist
 | TPRINTF { addstat(STATPRINTF); } pelist
-
+| '@' optsym '[' expr ']' '=' TSYM '(' optexpr ')' { addstat(STATAGG, $2, $4, $7, $9); }
+optsym: TSYM | { $$ = nil; }
+optexpr: expr | { $$ = nil; }
 
 pelist:
 	'(' ')'
--- /dev/null
+++ b/sys/src/libdtracy/agg.c
@@ -1,0 +1,138 @@
+#include <u.h>
+#include <libc.h>
+#include <dtracy.h>
+
+int
+dtaunpackid(DTAgg *a)
+{
+	a->type = a->id >> 28 & 15;
+	a->keysize = a->id >> 13 & 0x7ff8;
+	switch(a->type){
+	case AGGCNT:
+	case AGGSUM:
+	case AGGMIN:
+	case AGGMAX:
+		a->recsize = 8 + a->keysize + 8;
+		return 0;
+	case AGGAVG:
+		a->recsize = 8 + a->keysize + 16;
+		return 0;
+	case AGGSTD:
+		a->recsize = 8 + a->keysize + 32;
+		return 0;
+	default:
+		return -1;
+	}
+}
+
+static u64int
+hash(uchar *s, int n, int m)
+{
+	u64int h;
+	int i;
+	
+	h = 0xcbf29ce484222325ULL;
+	for(i = 0; i < n; i++){
+		h ^= s[i];
+		h *= 0x100000001b3ULL;
+	}
+	for(; i < m; i++)
+		h *= 0x100000001b3ULL;
+	return h;
+}
+
+static int
+keyeq(uchar *a, uchar *b, int n, int m)
+{
+	int i;
+	
+	for(i = 0; i < n; i++)
+		if(a[i] != b[i])
+			return 0;
+	for(; i < m; i++)
+		if(a[i] != 0)
+			return 0;
+	return 1;
+}
+
+/* calculate v*v with 128 bits precision and add it to the 128-bit word at q */
+static void
+addsquare(u64int *q, s64int v)
+{
+	u32int v0;
+	s32int v1;
+	s64int s0, s1, s2;
+	u64int r;
+	
+	v0 = v;
+	v1 = v>>32;
+	s0 = (s64int)v0 * (s64int)v0;
+	s1 = (s64int)v0 * (s64int)v1;
+	s2 = (s64int)v1 * (s64int)v1;
+	r = s0 + (s1<<33);
+	if(r < (u64int)s0) q[1]++;
+	q[0] += r;
+	if(q[0] < r) q[1]++;
+	q[1] += s2 + (s1>>31);
+}
+
+static void
+updaterecord(int type, u64int *q, s64int val)
+{
+	switch(type){
+	case AGGCNT: q[0] += 1; break;
+	case AGGSUM: q[0] += val; break;
+	case AGGAVG: q[0] += val; q[1]++; break;
+	case AGGMIN: if(val < q[0]) q[0] = val; break;
+	case AGGMAX: if(val > q[0]) q[0] = val; break;
+	case AGGSTD: q[0] += val; q[1]++; addsquare(&q[2], val); break;
+	}
+}
+
+static void
+createrecord(int type, u64int *q, s64int val)
+{
+	switch(type){
+	case AGGCNT: q[0] = 1; break;
+	case AGGSUM: case AGGMIN: case AGGMAX: q[0] = val; break;
+	case AGGAVG: q[0] = val; q[1] = 1; break;
+	case AGGSTD: q[0] = val; q[1] = 1; q[2] = 0; q[3] = 0; addsquare(&q[2], val); break;
+	}
+}
+
+/* runs in probe context */
+void
+dtarecord(DTChan *ch, int mach, DTAgg *a, uchar *key, int nkey, s64int val)
+{
+	u64int h;
+	u32int *p, *q;
+	DTBuf *c;
+	
+	c = ch->aggwrbufs[mach];
+	h = hash(key, nkey, a->keysize);
+	p = (u32int*)(c->data + DTABUCKETS + (h % DTANUMBUCKETS) * 4);
+	while(*p != DTANIL){
+		assert((uint)*p < DTABUCKETS);
+		q = (u32int*)(c->data + *p);
+		if(q[1] == a->id && keyeq((uchar*)(q + 2), key, nkey, a->keysize) == 0){
+			updaterecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
+			return;
+		}
+		p = q;
+	}
+	if(c->wr + a->recsize > DTABUCKETS)
+		return;
+	*p = c->wr;
+	q = (u32int*)(c->data + c->wr);
+	q[0] = DTANIL;
+	q[1] = a->id;
+	if(nkey == a->keysize)
+		memmove(&q[2], key, nkey);
+	else if(nkey > a->keysize){
+		memmove(&q[2], key, nkey);
+		memset((uchar*)q + 8 + nkey, 0, a->keysize - nkey);
+	}else
+		memmove(&q[2], key, a->keysize);
+	createrecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
+	c->wr += a->recsize;
+}
--- a/sys/src/libdtracy/chan.c
+++ b/sys/src/libdtracy/chan.c
@@ -44,6 +44,14 @@
 		c->rdbufs[i] = dtmalloc(sizeof(DTBuf));
 		c->wrbufs[i] = dtmalloc(sizeof(DTBuf));
 	}
+	c->aggrdbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
+	c->aggwrbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
+	for(i = 0; i < dtnmach; i++){
+		c->aggrdbufs[i] = dtmalloc(sizeof(DTBuf));
+		c->aggwrbufs[i] = dtmalloc(sizeof(DTBuf));
+		memset(c->aggrdbufs[i]->data, -1, DTBUFSZ);
+		memset(c->aggwrbufs[i]->data, -1, DTBUFSZ);
+	}
 	return c;
 }
 
@@ -63,6 +71,12 @@
 	}
 	free(ch->rdbufs);
 	free(ch->wrbufs);
+	for(i = 0; i < dtnmach; i++){
+		free(ch->aggrdbufs[i]);
+		free(ch->aggwrbufs[i]);
+	}
+	free(ch->aggrdbufs);
+	free(ch->aggwrbufs);
 	free(ch);
 }
 
@@ -73,7 +87,7 @@
 	DTEnab *ep;
 	int i, nl, n;
 	
-	if(dtgverify(gr) < 0)
+	if(dtgverify(c, gr) < 0)
 		return -1;
 	gr->chan = c;
 	
@@ -194,6 +208,47 @@
 	return 0;
 }
 
+static void
+dtcaggbufswap(DTChan *c, int n)
+{
+	DTBuf *z;
+
+	dtmachlock(n);
+	z = c->aggrdbufs[n];
+	c->aggrdbufs[n] = c->aggwrbufs[n];
+	c->aggwrbufs[n] = z;
+	dtmachunlock(n);
+}
+
+int
+dtcaggread(DTChan *c, void *buf, int n)
+{
+	int i, swapped;
+	
+	if(c->state == DTCFAULT){
+		werrstr("%s", c->errstr);
+		return -1;
+	}
+	for(i = 0; i < dtnmach; i++){
+		if(swapped = c->aggrdbufs[i]->wr == 0)
+			dtcaggbufswap(c, i);
+		if(c->aggrdbufs[i]->wr != 0){
+			if(c->aggrdbufs[i]->wr > n){
+				werrstr("short read");
+				return -1;
+			}
+			n = c->aggrdbufs[i]->wr;
+			memmove(buf, c->aggrdbufs[i]->data, n);
+			c->aggrdbufs[i]->wr = 0;
+			memset(c->aggrdbufs[i]->data + DTABUCKETS, -1, 4 * DTANUMBUCKETS);
+			if(!swapped)
+				dtcaggbufswap(c, i);
+			return n;
+		}
+	}
+	return 0;
+}
+
 void
 dtcreset(DTChan *c)
 {
@@ -200,7 +255,7 @@
 	DTEnab *ep, *eq;
 	
 	for(ep = c->enab; ep != nil; ep = ep->channext){
-		/* careful! has to look atomic for etptrigger */
+		/* careful! has to look atomic for dtptrigger */
 		ep->probprev->probnext = ep->probnext;
 		ep->probnext->probprev = ep->probprev;
 	}
--- a/sys/src/libdtracy/mkfile
+++ b/sys/src/libdtracy/mkfile
@@ -8,6 +8,7 @@
 	dtefmt.$O\
 	pack.$O\
 	chan.$O\
+	agg.$O\
 
 HFILES=\
 	/sys/include/dtracy.h\
--- a/sys/src/libdtracy/pack.c
+++ b/sys/src/libdtracy/pack.c
@@ -27,6 +27,12 @@
 		fmtprint(f, "t%d\n", g->acts[i].type);
 		fmtprint(f, "s%d\n", g->acts[i].size);
 		dtepack(f, g->acts[i].p);
+		switch(g->acts[i].type){
+		case ACTAGGKEY:
+		case ACTAGGVAL:
+			fmtprint(f, "A%#.8ux\n", g->acts[i].agg.id);
+			break;
+		}
 	}
 	fmtprint(f, "G");
 }
@@ -132,6 +138,18 @@
 				case ACTTRACESTR:
 					g->reclen += g->acts[i].size;
 					break;
+				case ACTAGGKEY:
+					if(*s++ != 'A') goto fail;
+					s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
+					if(s == nil) goto fail;
+					break;
+				case ACTAGGVAL:
+					if(*s++ != 'A') goto fail;
+					s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
+					if(s == nil) goto fail;
+					break;
+				case ACTCANCEL:
+					break;
 				default:
 					goto fail;
 				}
@@ -182,7 +200,7 @@
 	int i;
 
 	if(c == nil) return;
-	if(--c->gr->ref == 0)
+	if(c->gr != nil && --c->gr->ref == 0)
 		dtgfree(c->gr);
 	for(i = 0; i < c->nprob; i++)
 		free(c->probs[i]);
--- a/sys/src/libdtracy/prog.c
+++ b/sys/src/libdtracy/prog.c
@@ -80,7 +80,7 @@
 }
 
 int
-dtgverify(DTActGr *g)
+dtgverify(DTChan *, DTActGr *g)
 {
 	int i;
 
@@ -96,6 +96,26 @@
 			if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > DTRECMAX)
 				return -1;
 			break;
+		case ACTAGGKEY:
+			if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
+				return -1;
+			if(i == g->nact - 1 || g->acts[i+1].type != ACTAGGVAL || g->acts[i+1].agg.id != g->acts[i].agg.id)
+				return -1;
+			break;
+		case ACTAGGVAL:
+			if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
+				return -1;
+			if(i == 0 || g->acts[i-1].type != ACTAGGKEY)
+				return -1;
+			if(dtaunpackid(&g->acts[i].agg) < 0)
+				return -1;
+			break;
+		case ACTCANCEL:
+			if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0)
+				return -1;
+			if(i != g->nact - 1)
+				return -1;
+			break;
 		default:
 			return -1;
 		}
@@ -225,6 +245,7 @@
 	DTBuf *b;
 	u8int *bp;
 	s64int v;
+	uchar aggkey[8];
 	int i, j;
 	
 	b = g->chan->wrbufs[info->machno];
@@ -240,6 +261,8 @@
 	PUT4(info->epid);
 	PUT8(info->ts);
 	for(i = 0; i < g->nact; i++){
+		if(g->acts[i].type == ACTCANCEL)
+			return 0;
 		if(dteexec(g->acts[i].p, info, &v) < 0)
 			return -1;
 		switch(g->acts[i].type){
@@ -255,6 +278,15 @@
 				return -1;
 			}
 			bp += g->acts[i].size;
+			break;
+		case ACTAGGKEY:
+			for(j = 0; j < g->acts[i].size; j++){
+				aggkey[j] = v;
+				v >>= 8;
+			}
+			break;
+		case ACTAGGVAL:
+			dtarecord(g->chan, info->machno, &g->acts[i].agg, aggkey, g->acts[i-1].size, v);
 			break;
 		}
 	}