shithub: purgatorio

ref: a411870ee4640241e3c494367d922847da84f972
dir: /os/port/devpipe.c/

View raw version
#include	"u.h"
#include	"../port/lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"../port/error.h"
#include "interp.h"

#define NETTYPE(x)	((ulong)(x)&0x1f)
#define NETID(x)	(((ulong)(x))>>5)
#define NETQID(i,t)	(((i)<<5)|(t))

typedef struct Pipe	Pipe;
struct Pipe
{
	QLock;
	Pipe*	next;
	int	ref;
	ulong	path;
	Queue*	q[2];
	int	qref[2];
	Dirtab	*pipedir;
	char*	user;
};

static struct
{
	Lock;
	ulong	path;
	int	pipeqsize;	
} pipealloc;

enum
{
	Qdir,
	Qdata0,
	Qdata1,
};

static 
Dirtab pipedir[] =
{
	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
	"data",		{Qdata0},	0,			0660,
	"data1",	{Qdata1},	0,			0660,
};

static void
freepipe(Pipe *p)
{
	if(p != nil){
		free(p->user);
		free(p->q[0]);
		free(p->q[1]);
		free(p->pipedir);
		free(p);
	}
}

static void
pipeinit(void)
{
	pipealloc.pipeqsize = 32*1024;
}

/*
 *  create a pipe, no streams are created until an open
 */
static Chan*
pipeattach(char *spec)
{
	Pipe *p;
	Chan *c;

	c = devattach('|', spec);
	p = malloc(sizeof(Pipe));
	if(p == 0)
		error(Enomem);
	if(waserror()){
		freepipe(p);
		nexterror();
	}
	p->pipedir = malloc(sizeof(pipedir));
	if (p->pipedir == 0)
		error(Enomem);
	memmove(p->pipedir, pipedir, sizeof(pipedir));
	kstrdup(&p->user, up->env->user);
	p->ref = 1;

	p->q[0] = qopen(pipealloc.pipeqsize, 0, 0, 0);
	if(p->q[0] == 0)
		error(Enomem);
	p->q[1] = qopen(pipealloc.pipeqsize, 0, 0, 0);
	if(p->q[1] == 0)
		error(Enomem);
	poperror();

	lock(&pipealloc);
	p->path = ++pipealloc.path;
	unlock(&pipealloc);

	c->qid.path = NETQID(2*p->path, Qdir);
	c->qid.vers = 0;
	c->qid.type = QTDIR;
	c->aux = p;
	c->dev = 0;
	return c;
}

static int
pipegen(Chan *c, char *, Dirtab *tab, int ntab, int i, Dir *dp)
{
	int id, len;
	Qid qid;
	Pipe *p;

	if(i == DEVDOTDOT){
		devdir(c, c->qid, "#|", 0, eve, 0555, dp);
		return 1;
	}
	i++;	/* skip . */
	if(tab==0 || i>=ntab)
		return -1;
	tab += i;
	p = c->aux;
	switch(NETTYPE(tab->qid.path)){
	case Qdata0:
		len = qlen(p->q[0]);
		break;
	case Qdata1:
		len = qlen(p->q[1]);
		break;
	default:
		len = tab->length;
		break;
	}
	id = NETID(c->qid.path);
	qid.path = NETQID(id, tab->qid.path);
	qid.vers = 0;
	qid.type = QTFILE;
	devdir(c, qid, tab->name, len, eve, tab->perm, dp);
	return 1;
}


static Walkqid*
pipewalk(Chan *c, Chan *nc, char **name, int nname)
{
	Walkqid *wq;
	Pipe *p;

	p = c->aux;
	wq = devwalk(c, nc, name, nname, p->pipedir, nelem(pipedir), pipegen);
	if(wq != nil && wq->clone != nil && wq->clone != c){
		qlock(p);
		p->ref++;
		if(c->flag & COPEN){
			switch(NETTYPE(c->qid.path)){
			case Qdata0:
				p->qref[0]++;
				break;
			case Qdata1:
				p->qref[1]++;
				break;
			}
		}
		qunlock(p);
	}
	return wq;
}

static int
pipestat(Chan *c, uchar *db, int n)
{
	Pipe *p;
	Dir dir;
	Dirtab *tab;

	p = c->aux;
	tab = p->pipedir;

	switch(NETTYPE(c->qid.path)){
	case Qdir:
		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
		break;
	case Qdata0:
		devdir(c, c->qid, tab[1].name, qlen(p->q[0]), eve, tab[1].perm, &dir);
		break;
	case Qdata1:
		devdir(c, c->qid, tab[2].name, qlen(p->q[1]), eve, tab[2].perm, &dir);
		break;
	default:
		panic("pipestat");
	}
	n = convD2M(&dir, db, n);
	if(n < BIT16SZ)
		error(Eshortstat);
	return n;
}

/*
 *  if the stream doesn't exist, create it
 */
static Chan*
pipeopen(Chan *c, int omode)
{
	Pipe *p;

	if(c->qid.type & QTDIR){
		if(omode != OREAD)
			error(Ebadarg);
		c->mode = omode;
		c->flag |= COPEN;
		c->offset = 0;
		return c;
	}

	openmode(omode);	/* check it */

	p = c->aux;
	qlock(p);
	if(waserror()){
		qunlock(p);
		nexterror();
	}
	switch(NETTYPE(c->qid.path)){
	case Qdata0:
		devpermcheck(p->user, p->pipedir[1].perm, omode);
		p->qref[0]++;
		break;
	case Qdata1:
		devpermcheck(p->user, p->pipedir[2].perm, omode);
		p->qref[1]++;
		break;
	}
	poperror();
	qunlock(p);

	c->mode = openmode(omode);
	c->flag |= COPEN;
	c->offset = 0;
	c->iounit = qiomaxatomic;
	return c;
}

static void
pipeclose(Chan *c)
{
	Pipe *p;

	p = c->aux;
	qlock(p);

	if(c->flag & COPEN){
		/*
		 *  closing either side hangs up the stream
		 */
		switch(NETTYPE(c->qid.path)){
		case Qdata0:
			p->qref[0]--;
			if(p->qref[0] == 0){
				qhangup(p->q[1], 0);
				qclose(p->q[0]);
			}
			break;
		case Qdata1:
			p->qref[1]--;
			if(p->qref[1] == 0){
				qhangup(p->q[0], 0);
				qclose(p->q[1]);
			}
			break;
		}
	}

	
	/*
	 *  if both sides are closed, they are reusable
	 */
	if(p->qref[0] == 0 && p->qref[1] == 0){
		qreopen(p->q[0]);
		qreopen(p->q[1]);
	}

	/*
	 *  free the structure on last close
	 */
	p->ref--;
	if(p->ref == 0){
		qunlock(p);
		freepipe(p);
	} else
		qunlock(p);
}

static long
piperead(Chan *c, void *va, long n, vlong)
{
	Pipe *p;

	p = c->aux;

	switch(NETTYPE(c->qid.path)){
	case Qdir:
		return devdirread(c, va, n, p->pipedir, nelem(pipedir), pipegen);
	case Qdata0:
		return qread(p->q[0], va, n);
	case Qdata1:
		return qread(p->q[1], va, n);
	default:
		panic("piperead");
	}
	return -1;	/* not reached */
}

static Block*
pipebread(Chan *c, long n, ulong offset)
{
	Pipe *p;

	p = c->aux;

	switch(NETTYPE(c->qid.path)){
	case Qdata0:
		return qbread(p->q[0], n);
	case Qdata1:
		return qbread(p->q[1], n);
	}

	return devbread(c, n, offset);
}

/*
 *  a write to a closed pipe causes an exception to be sent to
 *  the prog.
 */
static long
pipewrite(Chan *c, void *va, long n, vlong)
{
	Pipe *p;
	Prog *r;

	if(waserror()) {
		/* avoid exceptions when pipe is a mounted queue */
		if((c->flag & CMSG) == 0) {
			r = up->iprog;
			if(r != nil && r->kill == nil)
				r->kill = "write on closed pipe";
		}
		nexterror();
	}

	p = c->aux;

	switch(NETTYPE(c->qid.path)){
	case Qdata0:
		n = qwrite(p->q[1], va, n);
		break;

	case Qdata1:
		n = qwrite(p->q[0], va, n);
		break;

	default:
		panic("pipewrite");
	}

	poperror();
	return n;
}

static long
pipebwrite(Chan *c, Block *bp, ulong junk)
{
	long n;
	Pipe *p;
	Prog *r;

	USED(junk);
	if(waserror()) {
		/* avoid exceptions when pipe is a mounted queue */
		if((c->flag & CMSG) == 0) {
			r = up->iprog;
			if(r != nil && r->kill == nil)
				r->kill = "write on closed pipe";
		}
		nexterror();
	}

	p = c->aux;
	switch(NETTYPE(c->qid.path)){
	case Qdata0:
		n = qbwrite(p->q[1], bp);
		break;

	case Qdata1:
		n = qbwrite(p->q[0], bp);
		break;

	default:
		n = 0;
		panic("pipebwrite");
	}

	poperror();
	return n;
}

static int
pipewstat(Chan *c, uchar *dp, int n)
{
	Dir *d;
	Pipe *p;
	int d1;

	if (c->qid.type&QTDIR)
		error(Eperm);
	p = c->aux;
	if(strcmp(up->env->user, p->user) != 0)
		error(Eperm);
	d = smalloc(sizeof(*d)+n);
	if(waserror()){
		free(d);
		nexterror();
	}
	n = convM2D(dp, n, d, (char*)&d[1]);
	if(n == 0)
		error(Eshortstat);
	d1 = NETTYPE(c->qid.path) == Qdata1;
	if(!emptystr(d->name)){
		validwstatname(d->name);
		if(strlen(d->name) >= KNAMELEN)
			error(Efilename);
		if(strcmp(p->pipedir[1+!d1].name, d->name) == 0)
			error(Eexist);
		kstrcpy(p->pipedir[1+d1].name, d->name, KNAMELEN);
	}
	if(d->mode != ~0UL)
		p->pipedir[d1 + 1].perm = d->mode & 0777;
	poperror();
	free(d);
	return n;
}

Dev pipedevtab = {
	'|',
	"pipe",

	devreset,
	pipeinit,
	devshutdown,
	pipeattach,
	pipewalk,
	pipestat,
	pipeopen,
	devcreate,
	pipeclose,
	piperead,
	pipebread,
	pipewrite,
	pipebwrite,
	devremove,
	pipewstat,
};