shithub: riscv

ref: f70cf0b70eb2b9b6cdf7208bf775189c724eb446
dir: /sys/src/cmd/venti/srv/lumpqueue.c/

View raw version
#include "stdinc.h"
#include "dat.h"
#include "fns.h"

typedef struct LumpQueue	LumpQueue;
typedef struct WLump		WLump;

enum
{
	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
};

struct WLump
{
	Lump	*u;
	Packet	*p;
	int	creator;
	int	gen;
	uint	ms;
};

struct LumpQueue
{
	QLock	lock;
	Rendez 	flush;
	Rendez	full;
	Rendez	empty;
	WLump	q[MaxLumpQ];
	int	w;
	int	r;
};

static LumpQueue	*lumpqs;
static int		nqs;

static QLock		glk;
static int		gen;

static void	queueproc(void *vq);

int
initlumpqueues(int nq)
{
	LumpQueue *q;

	int i;
	nqs = nq;

	lumpqs = MKNZ(LumpQueue, nq);

	for(i = 0; i < nq; i++){
		q = &lumpqs[i];
		q->full.l = &q->lock;
		q->empty.l = &q->lock;
		q->flush.l = &q->lock;

		if(vtproc(queueproc, q) < 0){
			seterr(EOk, "can't start write queue slave: %r");
			return -1;
		}
	}

	return 0;
}

/*
 * queue a lump & it's packet data for writing
 */
int
queuewrite(Lump *u, Packet *p, int creator, uint ms)
{
	LumpQueue *q;
	int i;

	trace(TraceProc, "queuewrite");
	i = indexsect(mainindex, u->score);
	if(i < 0 || i >= nqs){
		seterr(EBug, "internal error: illegal index section in queuewrite");
		return -1;
	}

	q = &lumpqs[i];

	qlock(&q->lock);
	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
		trace(TraceProc, "queuewrite sleep");
		rsleep(&q->full);
	}

	q->q[q->w].u = u;
	q->q[q->w].p = p;
	q->q[q->w].creator = creator;
	q->q[q->w].ms = ms;
	q->q[q->w].gen = gen;
	q->w = (q->w + 1) & (MaxLumpQ - 1);

	trace(TraceProc, "queuewrite wakeup");
	rwakeup(&q->empty);

	qunlock(&q->lock);

	return 0;
}

void
flushqueue(void)
{
	int i;
	LumpQueue *q;

	if(!lumpqs)
		return;

	trace(TraceProc, "flushqueue");

	qlock(&glk);
	gen++;
	qunlock(&glk);

	for(i=0; i<mainindex->nsects; i++){
		q = &lumpqs[i];
		qlock(&q->lock);
		while(q->w != q->r && gen - q->q[q->r].gen > 0){
			trace(TraceProc, "flushqueue sleep q%d", i);
			rsleep(&q->flush);
		}
		qunlock(&q->lock);
	}
}
	
static void
queueproc(void *vq)
{
	LumpQueue *q;
	Lump *u;
	Packet *p;
	int creator;
	uint ms;

	threadsetname("queueproc");

	q = vq;
	for(;;){
		qlock(&q->lock);
		while(q->w == q->r){
			trace(TraceProc, "queueproc sleep empty");
			rsleep(&q->empty);
		}

		u = q->q[q->r].u;
		p = q->q[q->r].p;
		creator = q->q[q->r].creator;
		ms = q->q[q->r].ms;

		q->r = (q->r + 1) & (MaxLumpQ - 1);
		trace(TraceProc, "queueproc wakeup flush");
		rwakeupall(&q->flush);

		trace(TraceProc, "queueproc wakeup full");
		rwakeup(&q->full);

		qunlock(&q->lock);

		trace(TraceProc, "queueproc writelump %V", u->score);
		if(writeqlump(u, p, creator, ms) < 0)
			fprint(2, "failed to write lump for %V: %r", u->score);
		trace(TraceProc, "queueproc wrotelump %V", u->score);

		putlump(u);
	}
}