shithub: riscv

Download patch

ref: 81274ea0cf5b205d18f7345d5cf883bbd1fdac30
parent: 99ed9623b59b4aa6aa27215785b05f17118f8855
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Thu Jun 22 16:46:04 EDT 2017

upas/fs: handle plumbing for new messages for concurrent index updates

when multiple upas/fs instances are running on the same index,
another upas/fs could have written the index, but we still want
to plumb the message.

so we introduce another cstate flag "Cnew" that is set when a
message that we havnt seen before by rdidx().

--- a/sys/src/cmd/upas/fs/cache.c
+++ b/sys/src/cmd/upas/fs/cache.c
@@ -375,7 +375,8 @@
 	"idx",
 	"stale",
 	"header",
-	"body"
+	"body",
+	"new",
 };
 
 char*
--- a/sys/src/cmd/upas/fs/dat.h
+++ b/sys/src/cmd/upas/fs/dat.h
@@ -6,6 +6,7 @@
 	Cidxstale	= 1<<1,
 	Cheader 	= 1<<2,
 	Cbody		= 1<<3,
+	Cnew		= 1<<4,
 
 	/* encodings */
 	Enone=	0,
@@ -65,8 +66,8 @@
 	char	*idxaux;		/* mailbox specific */
 
 	char	*type;			/* mime info */
-	char	disposition;
 	char	*filename;
+	char	disposition;
 
 	int	nparts;
 };
@@ -117,9 +118,9 @@
 
 	/* mime info */
 	char	*charset;		
-	char	encoding;
 	char	*boundary;
 	char	converted;
+	char	encoding;
 	char	decoded;
 	char	mimeflag;
 
@@ -208,7 +209,7 @@
 long		cachefree(Mailbox*, Message*, int);
 
 Message*	gettopmsg(Mailbox*, Message*);
-char*		syncmbox(Mailbox*);
+char*		syncmbox(Mailbox*, int);
 void*		emalloc(ulong);
 void*		erealloc(void*, ulong);
 Message*	newmessage(Message*);
--- a/sys/src/cmd/upas/fs/fs.c
+++ b/sys/src/cmd/upas/fs/fs.c
@@ -1008,7 +1008,7 @@
 	Message *msg;
 
 	if(off == 0)
-		syncmbox(f->mb);
+		syncmbox(f->mb, 1);
 
 	n = 0;
 	if(f->mb->ctl){
@@ -1343,7 +1343,7 @@
 	Dir d;
 
 	if(FILE(f->qid.path) == Qmbox)
-		syncmbox(f->mb);
+		syncmbox(f->mb, 1);
 	mkstat(&d, f->mb, f->m, FILE(f->qid.path));
 	rhdr.nstat = convD2M(&d, mbuf, messagesize - IOHDRSZ);
 	rhdr.stat = mbuf;
@@ -1465,7 +1465,7 @@
 			}
 		}
 		if(mb != nil) {
-			syncmbox(mb);
+			syncmbox(mb, 1);
 			qunlock(&synclock);
 		} else {
 			qunlock(&synclock);
--- a/sys/src/cmd/upas/fs/idx.c
+++ b/sys/src/cmd/upas/fs/idx.c
@@ -383,7 +383,6 @@
 			m->cstate |= Cidx;
 			idprint("→%.2ux\n", m->cstate);
 			free(s);
-			// s = 0;
 			continue;
 		}
 		m = newmessage(parent);
@@ -412,7 +411,7 @@
 		m->nparts = strtoul(f[21], 0, 0);
 
 		m->cstate &= ~Cidxstale;
-		m->cstate |= Cidx;
+		m->cstate |= Cidx|Cnew;
 		m->str = s;
 		s = 0;
 
--- a/sys/src/cmd/upas/fs/mbox.c
+++ b/sys/src/cmd/upas/fs/mbox.c
@@ -62,7 +62,7 @@
  * do we want to plumb flag changes?
  */
 char*
-syncmbox(Mailbox *mb)
+syncmbox(Mailbox *mb, int doplumb)
 {
 	char *s;
 	int n, d, y, a;
@@ -83,13 +83,20 @@
 	y = 0;
 	for(m = mb->root->part; m; m = next){
 		next = m->next;
-		if((m->cstate & Cidx) == 0 && m->deleted == 0){
-			cachehash(mb, m);
-			if(insurecache(mb, m) == 0){
-				mailplumb(mb, m);
-				msgdecref(mb, m);
+		if(m->deleted == 0){
+			if((m->cstate & Cidx) == 0){
+				cachehash(mb, m);
+				m->cstate |= Cnew;
+				n++;
+			} else if(!doplumb)
+				m->cstate &= ~Cnew;
+			if(m->cstate & Cnew){
+				if(insurecache(mb, m) == 0){
+					mailplumb(mb, m);
+					msgdecref(mb, m);
+				}
+				m->cstate &= ~Cnew;
 			}
-			n++;
 		}
 		if(m->cstate & Cidxstale)
 			y++;
@@ -98,7 +105,8 @@
 		if(mb->delete && m->inmbox && m->deleted & Deleted)
 			mb->delete(mb, m);
 		if(!m->inmbox){
-			mailplumb(mb, m);
+			if(doplumb)
+				mailplumb(mb, m);
 			delmessage(mb, m);
 			d++;
 		}
@@ -253,7 +261,7 @@
 	if(r)
 		*r = mb;
 
-	return syncmbox(mb);
+	return syncmbox(mb, 0);
 }
 
 /* close the named mailbox */
@@ -279,7 +287,7 @@
 	Mailbox *m;
 
 	for(m = mbl; m != nil; m = m->next)
-		if(err = syncmbox(m))
+		if(err = syncmbox(m, 0))
 			eprint("syncmbox: %s\n", err);
 }
 
@@ -1070,7 +1078,7 @@
 				break;
 			}
 	if(needwrite)
-		syncmbox(mb);
+		syncmbox(mb, 1);
 	return 0;
 }
 
@@ -1100,7 +1108,7 @@
 					needwrite = 1;
 			}
 	if(needwrite)
-		syncmbox(mb);
+		syncmbox(mb, 1);
 	return rerr;
 }
 
@@ -1117,7 +1125,7 @@
 	m->refs--;
 	if(m->refs == 0){
 		if(m->deleted)
-			syncmbox(mb);
+			syncmbox(mb, 1);
 		else
 			putcache(mb, m);
 	}
@@ -1149,7 +1157,7 @@
 	assert(mb->refs > 0);
 	mb->refs--;
 	if(mb->refs == 0){
-		syncmbox(mb);
+		syncmbox(mb, 1);
 		delmessage(mb, mb->root);
 		if(mb->ctl)
 			hfree(PATH(mb->id, Qmbox), "ctl");