shithub: threadpool

Download patch

ref: 877f2c0b88321839b939d1031ad7e5a386d89dc0
author: rodri <rgl@antares-labs.eu>
date: Mon Sep 2 09:13:03 EDT 2024

parallel experiments lain.

--- /dev/null
+++ b/main1.c
@@ -1,0 +1,132 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+	void (*fn)(void*);
+	void *arg;
+};
+
+struct Tpool
+{
+	ulong nprocs;
+	Ref nworking;
+
+	Channel *subq;	/* task submission queue */
+	Channel *done;	/* task completion signal */
+};
+
+void
+threadloop(void *arg)
+{
+	Tpool *pool;
+	Ttask *task;
+
+	pool = arg;
+
+	while((task = recvp(pool->subq)) != nil){
+		incref(&pool->nworking);
+		task->fn(task->arg);
+		decref(&pool->nworking);
+		nbsend(pool->done, nil);
+	}
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+	Tpool *tp;
+
+	tp = malloc(sizeof *tp);
+	memset(tp, 0, sizeof *tp);
+	tp->nprocs = nprocs;
+	tp->subq = chancreate(sizeof(void*), nprocs);
+	tp->done = chancreate(sizeof(void*), 0);
+	while(nprocs--)
+		proccreate(threadloop, tp, mainstacksize);
+	return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+	Ttask *t;
+
+	t = malloc(sizeof *t);
+	t->fn = fn;
+	t->arg = arg;
+
+	sendp(tp->subq, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+	int a;
+	int b;
+};
+void
+sum(void *arg)
+{
+	Tsum *sum;
+	int cnt;
+
+	sum = arg;
+	cnt = 100;
+	while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+	exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+	static int W = 1000, H = 1000;
+	Tpool *pool;
+	Tsum *t;
+	int i, j;
+	int threaded;
+	int nprocs;
+
+	threaded = 0;
+	nprocs = 8;
+	ARGBEGIN{
+	case 't': threaded++; break;
+	case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+	default: usage();
+	}ARGEND;
+	if(argc != 0)
+		usage();
+
+	t = malloc(W*H*sizeof(*t));
+	if(threaded){
+		pool = mkthreadpool(nprocs);
+
+		for(i = 0; i < H; i++)
+		for(j = 0; j < W; j++){
+			t[i*W+j] = (Tsum){i, j};
+			threadpoolexec(pool, sum, &t[i*W+j]);
+		}
+
+		while(pool->nworking.ref > 0)
+			recvp(pool->done);
+
+		threadexitsall(nil);
+	}
+
+	for(i = 0; i < H; i++)
+	for(j = 0; j < W; j++){
+		t[i*W+j] = (Tsum){i, j};
+		sum(&t[i*W+j]);
+	}
+	exits(nil);
+}
--- /dev/null
+++ b/main2.c
@@ -1,0 +1,180 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Taskq Taskq;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+	void (*fn)(void*);
+	void *arg;
+	Ttask *next;
+};
+
+struct Taskq
+{
+	Ttask *hd;
+	Ttask *tl;
+};
+
+struct Tpool
+{
+	QLock;
+	Rendez empty;
+	ulong nprocs;
+	Ref nworking;
+
+	Taskq subq;	/* task submission queue */
+	Channel *done;	/* task completion signal */
+};
+
+void
+taskqput(Tpool *tp, Ttask *t)
+{
+	qlock(tp);
+	if(tp->subq.tl == nil){
+		tp->subq.hd = tp->subq.tl = t;
+		rwakeup(&tp->empty);
+		qunlock(tp);
+		return;
+	}
+
+	tp->subq.tl->next = t;
+	tp->subq.tl = t;
+	rwakeup(&tp->empty);
+	qunlock(tp);
+}
+
+Ttask *
+taskqget(Tpool *tp)
+{
+	Ttask *t;
+
+	qlock(tp);
+	while(tp->subq.hd == nil)
+		rsleep(&tp->empty);
+
+	t = tp->subq.hd;
+	tp->subq.hd = t->next;
+	t->next = nil;
+	if(tp->subq.hd == nil)
+		tp->subq.tl = nil;
+	qunlock(tp);
+	return t;
+}
+
+void
+threadloop(void *arg)
+{
+	Tpool *pool;
+	Ttask *task;
+
+	pool = arg;
+
+	for(;;){
+		task = taskqget(pool);
+		if(task == nil)
+			continue;
+		incref(&pool->nworking);
+		task->fn(task->arg);
+		decref(&pool->nworking);
+		nbsend(pool->done, nil);
+	}
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+	Tpool *tp;
+
+	tp = malloc(sizeof *tp);
+	memset(tp, 0, sizeof *tp);
+	tp->empty.l = &tp->QLock;
+	tp->nprocs = nprocs;
+	tp->done = chancreate(sizeof(void*), 0);
+	while(nprocs--)
+		proccreate(threadloop, tp, mainstacksize);
+	return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+	Ttask *t;
+
+	t = malloc(sizeof *t);
+	memset(t, 0, sizeof *t);
+	t->fn = fn;
+	t->arg = arg;
+	taskqput(tp, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+	int a;
+	int b;
+};
+void
+sum(void *arg)
+{
+	Tsum *sum;
+	int cnt;
+
+	sum = arg;
+	cnt = 100;
+	while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+	exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+	static int W = 1000, H = 1000;
+	Tpool *pool;
+	Tsum *t;
+	int i, j;
+	int threaded;
+	int nprocs;
+
+	threaded = 0;
+	nprocs = 8;
+	ARGBEGIN{
+	case 't': threaded++; break;
+	case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+	default: usage();
+	}ARGEND;
+	if(argc != 0)
+		usage();
+
+	t = malloc(W*H*sizeof(*t));
+	if(threaded){
+		pool = mkthreadpool(nprocs);
+
+		for(i = 0; i < H; i++)
+		for(j = 0; j < W; j++){
+			t[i*W+j] = (Tsum){i, j};
+			threadpoolexec(pool, sum, &t[i*W+j]);
+		}
+
+		while(pool->nworking.ref > 0)
+			recvp(pool->done);
+
+		threadexitsall(nil);
+	}
+
+	for(i = 0; i < H; i++)
+	for(j = 0; j < W; j++){
+		t[i*W+j] = (Tsum){i, j};
+		sum(&t[i*W+j]);
+	}
+	exits(nil);
+}
--- /dev/null
+++ b/main3.c
@@ -1,0 +1,178 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Taskq Taskq;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+	void (*fn)(void*);
+	void *arg;
+	Ttask *next;
+};
+
+struct Taskq
+{
+	Ttask *hd;
+	Ttask *tl;
+};
+
+struct Tpool
+{
+	QLock;
+	ulong nprocs;
+	Ref nworking;
+
+	Taskq subq;	/* task submission queue */
+	Channel *done;	/* task completion signal */
+};
+
+void
+taskqput(Tpool *tp, Ttask *t)
+{
+	qlock(tp);
+	if(tp->subq.tl == nil){
+		tp->subq.hd = tp->subq.tl = t;
+		qunlock(tp);
+		return;
+	}
+
+	tp->subq.tl->next = t;
+	tp->subq.tl = t;
+	qunlock(tp);
+}
+
+Ttask *
+taskqget(Tpool *tp)
+{
+	Ttask *t;
+
+	qlock(tp);
+	if(tp->subq.hd == nil){
+		qunlock(tp);
+		return nil;
+	}
+
+	t = tp->subq.hd;
+	tp->subq.hd = t->next;
+	t->next = nil;
+	if(tp->subq.hd == nil)
+		tp->subq.tl = nil;
+	qunlock(tp);
+	return t;
+}
+
+void
+threadloop(void *arg)
+{
+	Tpool *pool;
+	Ttask *task;
+
+	pool = arg;
+
+	for(;;){
+		task = taskqget(pool);
+		if(task == nil)
+			continue;
+		incref(&pool->nworking);
+		task->fn(task->arg);
+		decref(&pool->nworking);
+		nbsend(pool->done, nil);
+	}
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+	Tpool *tp;
+
+	tp = malloc(sizeof *tp);
+	memset(tp, 0, sizeof *tp);
+	tp->nprocs = nprocs;
+	tp->done = chancreate(sizeof(void*), 0);
+	while(nprocs--)
+		proccreate(threadloop, tp, mainstacksize);
+	return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+	Ttask *t;
+
+	t = malloc(sizeof *t);
+	memset(t, 0, sizeof *t);
+	t->fn = fn;
+	t->arg = arg;
+	taskqput(tp, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+	int a;
+	int b;
+};
+void
+sum(void *arg)
+{
+	Tsum *sum;
+	int cnt;
+
+	sum = arg;
+	cnt = 100;
+	while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+	exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+	static int W = 10, H = 10;
+	Tpool *pool;
+	Tsum *t;
+	int i, j;
+	int threaded;
+	int nprocs;
+
+	threaded = 0;
+	nprocs = 8;
+	ARGBEGIN{
+	case 't': threaded++; break;
+	case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+	default: usage();
+	}ARGEND;
+	if(argc != 0)
+		usage();
+
+	t = malloc(W*H*sizeof(*t));
+	if(threaded){
+		pool = mkthreadpool(nprocs);
+
+		for(i = 0; i < H; i++)
+		for(j = 0; j < W; j++){
+			t[i*W+j] = (Tsum){i, j};
+			threadpoolexec(pool, sum, &t[i*W+j]);
+		}
+
+		while(pool->nworking.ref > 0)
+			recvp(pool->done);
+
+		threadexitsall(nil);
+	}
+
+	for(i = 0; i < H; i++)
+	for(j = 0; j < W; j++){
+		t[i*W+j] = (Tsum){i, j};
+		sum(&t[i*W+j]);
+	}
+	exits(nil);
+}
--- /dev/null
+++ b/main4.c
@@ -1,0 +1,148 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <draw.h>
+#include <memdraw.h>
+
+typedef struct Ttask Ttask;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+	void (*fn)(void*);
+	void *arg;
+};
+
+struct Tpool
+{
+	ulong nprocs;
+	Ref issued;
+	Ref complete;
+
+	Channel *subq;	/* task submission queue */
+	Channel *done;	/* task completion signal */
+};
+
+void
+threadloop(void *arg)
+{
+	Tpool *pool;
+	Ttask *task;
+
+	pool = arg;
+
+	while((task = recvp(pool->subq)) != nil){
+		task->fn(task->arg);
+		incref(&pool->complete);
+		nbsend(pool->done, nil);
+	}
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+	Tpool *tp;
+
+	tp = malloc(sizeof *tp);
+	memset(tp, 0, sizeof *tp);
+	tp->nprocs = nprocs;
+	tp->subq = chancreate(sizeof(void*), nprocs);
+	tp->done = chancreate(sizeof(void*), 0);
+	while(nprocs--)
+		proccreate(threadloop, tp, mainstacksize);
+	return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+	Ttask *t;
+
+	t = malloc(sizeof *t);
+	t->fn = fn;
+	t->arg = arg;
+
+	sendp(tp->subq, t);
+	incref(&tp->issued);
+}
+
+typedef struct Targs Targs;
+struct Targs
+{
+	Memimage *i;
+	int y;
+};
+void
+fillpix(void *arg)
+{
+	Targs *imgop;
+	Point p;
+	ulong *fb, pix;
+	double α;
+
+	imgop = arg;
+
+	for(p = Pt(0, imgop->y); p.x < Dx(imgop->i->r); p.x++){
+		fb = (ulong*)byteaddr(imgop->i, p);
+		α = atan2(p.y, p.x);
+		pix = α*25523UL*25523UL/* + truerand()*/;
+		*fb = pix|0xFF<<24;
+	}
+}
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+	exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+	static int W = 1000, H = 1000;
+	Tpool *pool;
+	Targs *t;
+	Memimage *img;
+	int i;
+	int threaded;
+	int nprocs;
+
+	threaded = 0;
+	nprocs = 8;
+	ARGBEGIN{
+	case 't': threaded++; break;
+	case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+	default: usage();
+	}ARGEND;
+	if(argc != 0)
+		usage();
+
+	if(memimageinit() != 0)
+		sysfatal("memimageinit: %r");
+
+	img = allocmemimage(Rect(0,0,W,H), XRGB32);
+	t = malloc(H*sizeof(*t));
+	if(threaded){
+		pool = mkthreadpool(nprocs);
+
+		for(i = 0; i < H; i++){
+			t[i] = (Targs){img, i};
+			threadpoolexec(pool, fillpix, &t[i]);
+		}
+
+		while(pool->issued.ref != pool->complete.ref)
+			recvp(pool->done);
+
+		writememimage(1, img);
+
+		threadexitsall(nil);
+	}
+
+	for(i = 0; i < H; i++){
+		t[i] = (Targs){img, i};
+		fillpix(&t[i]);
+	}
+	writememimage(1, img);
+	exits(nil);
+}
--- /dev/null
+++ b/mkfile
@@ -1,0 +1,9 @@
+</$objtype/mkfile
+
+BIN=/$objtype/bin
+TARG=main1\
+	main2\
+	main3\
+	main4\
+
+</sys/src/cmd/mkmany
--- /dev/null
+++ b/readme
@@ -1,0 +1,8 @@
+threadpool
+
+Thread pool experiments.
+
+	- main1: channel-based task queue
+	- main2: rendezvous point task queue
+	- main3: qlocked task queue
+	- main4: channel-based memimage line raster task