ref: f321298c551e4333fcf2819eabf7ce67ea443e20
parent: 5e20e8f963482a2008ed70cc8fa5973078248aed
author: Ori Bernstein <ori@eigenstate.org>
date: Sat Jan 23 06:03:05 EST 2021
upas/runq: support parallel queue processing, drop -a When running a mail queue, it's useful to run it with limited parallelism. This helps mailing lists process messages in a reasonable time. At the same time, we can remove the load balancing from runq, since the kinds of systems that this matters on no longer exist, and running multiple queues at once can be better done through xargs.
--- a/sys/man/8/qer
+++ b/sys/man/8/qer
@@ -15,7 +15,7 @@
.br
.B runq
[
-.B -adsER
+.B -dER
]
[
.B -f
@@ -26,10 +26,6 @@
.I subdir
]
[
-.B -l
-.I load
-]
-[
.B -t
.I time
]
@@ -39,7 +35,7 @@
]
[
.B -n
-.I nprocs
+.I njobs
]
.I root cmd
.SH DESCRIPTION
@@ -84,10 +80,7 @@
.I Runq
processes the files queued by
.IR qer .
-Without the
-.B -a
-option,
-.I runq
+.I Runq
processes all requests in the directory
.IR root / subdir ,
where
@@ -96,9 +89,6 @@
.B -q
if present, else the contents of
.BR /dev/user .
-With the
-.B -a
-it processes all requests.
Each request is processed by executing the command
.I cmd
with the contents of the control file as its arguments,
@@ -172,31 +162,12 @@
.I -q
flag.
.P
-The
-.BR -s ,
-.BR -n ,
-and
-.B -l
-flags are only meaningful with the
-.B -a
-flag. They control amount of parallelism that
-is used when sweeping all of the queues. The argument following the
+The argument following the
.B -n
-flag specifies the number of queues that are swept
-in parallel; the default is 50. The argument following the
-.B -l
-flag specifies the total number of queues that are being swept.
-By default, there is no limit. The number of active sweeps
-is cumulative over all active executions of
-.IR runq .
-The
-.B -s
-flag forces each queue directory to be processed by exactly
-one instance of
-.IR runq .
-This is useful on systems that connect to slow
-external systems and prevents all the queue sweeps from
-piling up trying to process a few slow systems.
+flag specifies the number of queued jobs that are processed
+in parallel from the queue; the default is 1.
+This is useful for a large queue to be processed with a bounded
+amount of parallelism.
.PP
.I Runq
is often called from
--- a/sys/src/cmd/upas/q/runq.c
+++ b/sys/src/cmd/upas/q/runq.c
@@ -1,9 +1,25 @@
#include "common.h"
#include <ctype.h>
+typedef struct Job Job;
+
+struct Job {
+ Job *next;
+ int pid;
+ int ac;
+ int dfd;
+ char **av;
+ char *buf; /* backing for av */
+ Dir *dp; /* not owned */
+ Mlock *l;
+ Biobuf *b;
+};
+
void doalldirs(void);
void dodir(char*);
-void dofile(Dir*);
+Job* dofile(Dir*);
+Job* donefile(Job*, Waitmsg*);
+void freejob(Job*);
void rundir(char*);
char* file(char*, char);
void warning(char*, void*);
@@ -17,7 +33,6 @@
char *root;
int debug;
int giveup = 2*24*60*60;
-int load;
int limit;
/* the current directory */
@@ -28,12 +43,9 @@
char *runqlog = "runq";
-int *pidlist;
char **badsys; /* array of recalcitrant systems */
int nbad;
-int npid = 50;
-int sflag; /* single thread per directory */
-int aflag; /* all directories */
+int njob = 1; /* number of concurrent jobs to invoke */
int Eflag; /* ignore E.xxxxxx dates */
int Rflag; /* no giving up, ever */
@@ -40,26 +52,18 @@
void
usage(void)
{
- fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
- exits("");
+ fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+ exits("usage");
}
void
main(int argc, char **argv)
{
- char *qdir, *x;
+ char *qdir;
qdir = 0;
ARGBEGIN{
- case 'l':
- x = ARGF();
- if(x == 0)
- usage();
- load = atoi(x);
- if(load < 0)
- load = 0;
- break;
case 'E':
Eflag++;
break;
@@ -66,29 +70,21 @@
case 'R': /* no giving up -- just leave stuff in the queue */
Rflag++;
break;
- case 'a':
- aflag++;
- break;
case 'd':
debug++;
break;
case 'r':
- limit = atoi(ARGF());
+ limit = atoi(EARGF(usage()));
break;
- case 's':
- sflag++;
- break;
case 't':
- giveup = 60*60*atoi(ARGF());
+ giveup = 60*60*atoi(EARGF(usage()));
break;
case 'q':
- qdir = ARGF();
- if(qdir == 0)
- usage();
+ qdir = EARGF(usage());
break;
case 'n':
- npid = atoi(ARGF());
- if(npid == 0)
+ njob = atoi(EARGF(usage()));
+ if(njob == 0)
usage();
break;
}ARGEND;
@@ -96,15 +92,10 @@
if(argc != 2)
usage();
- pidlist = malloc(npid*sizeof(*pidlist));
- if(pidlist == 0)
- error("can't malloc", 0);
-
- if(aflag == 0 && qdir == 0) {
+ if(qdir == nil)
qdir = getuser();
- if(qdir == 0)
- error("unknown user", 0);
- }
+ if(qdir == nil)
+ error("unknown user", 0);
root = argv[0];
cmd = argv[1];
@@ -111,12 +102,7 @@
if(chdir(root) < 0)
error("can't cd to %s", root);
- doload(1);
- if(aflag)
- doalldirs();
- else
- dodir(qdir);
- doload(0);
+ dodir(qdir);
exits(0);
}
@@ -142,75 +128,7 @@
return 0;
}
-int
-forkltd(void)
-{
- int i;
- int pid;
-
- for(i = 0; i < npid; i++){
- if(pidlist[i] <= 0)
- break;
- }
-
- while(i >= npid){
- pid = waitpid();
- if(pid < 0){
- syslog(0, runqlog, "forkltd confused");
- exits(0);
- }
-
- for(i = 0; i < npid; i++)
- if(pidlist[i] == pid)
- break;
- }
- pidlist[i] = fork();
- return pidlist[i];
-}
-
/*
- * run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
- Dir *db;
- int fd;
- long i, n;
-
-
- fd = open(".", OREAD);
- if(fd == -1){
- warning("reading %s", root);
- return;
- }
- n = dirreadall(fd, &db);
- if(n > 0){
- for(i=0; i<n; i++){
- if(db[i].qid.type & QTDIR){
- if(emptydir(db[i].name))
- continue;
- switch(forkltd()){
- case -1:
- syslog(0, runqlog, "out of procs");
- doload(0);
- exits(0);
- case 0:
- if(sysdetach() < 0)
- error("%r", 0);
- dodir(db[i].name);
- exits(0);
- default:
- break;
- }
- }
- }
- free(db);
- }
- close(fd);
-}
-
-/*
* cd to a user directory and run it
*/
void
@@ -234,30 +152,57 @@
void
rundir(char *name)
{
- int fd;
- long i;
+ Job *hd, *j, **p;
+ int nlive, fidx, fd, found;
+ Waitmsg *w;
- if(aflag && sflag)
- fd = sysopenlocked(".", OREAD);
- else
- fd = open(".", OREAD);
+ fd = open(".", OREAD);
if(fd == -1){
warning("reading %s", name);
return;
}
+ fidx= 0;
+ hd = nil;
+ nlive = 0;
nfiles = dirreadall(fd, &dirbuf);
- if(nfiles > 0){
- for(i=0; i<nfiles; i++){
- if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+ while(nlive > 0 || fidx< nfiles){
+ while(fidx< nfiles && nlive < njob){
+ if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+ fidx++;
continue;
- dofile(&dirbuf[i]);
+ }
+ if((j = dofile(&dirbuf[fidx])) != nil){
+ nlive++;
+ j->next = hd;
+ hd = j;
+ }
+ fidx++;
}
- free(dirbuf);
+ if(nlive == 0){
+ fprint(2, "nothing live\n");
+ break;
+ }
+rescan:
+ if((w = wait()) == nil){
+ syslog(0, "runq", "wait error: %r");
+ break;
+ }
+ found = 0;
+ for(p = &hd; *p != nil; p = &(*p)->next){
+ if(w->pid == (*p)->pid){
+ *p = donefile(*p, w);
+ found++;
+ nlive--;
+ break;
+ }
+ }
+ free(w);
+ if(!found)
+ goto rescan;
}
- if(aflag && sflag)
- sysunlockfile(fd);
- else
- close(fd);
+ assert(hd == nil);
+ free(dirbuf);
+ close(fd);
}
/*
@@ -314,17 +259,16 @@
}
/*
- * try a message
+ * Launch trying a message, returning a job
+ * tracks the running pid.
*/
-void
+Job*
dofile(Dir *dp)
{
+ int dtime, efd, i, etime;
+ Job *j;
Dir *d;
- int dfd, ac, dtime, efd, pid, i, etime;
- char *buf, *cp, **av;
- Waitmsg *wm;
- Biobuf *b;
- Mlock *l = nil;
+ char *cp;
if(debug)
fprint(2, "dofile %s\n", dp->name);
@@ -337,7 +281,7 @@
if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name);
remmatch(dp->name);
- return;
+ return nil;
}
if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){
@@ -344,7 +288,7 @@
syslog(0, runqlog, "empty ctl file for %s", dp->name);
remmatch(dp->name);
}
- return;
+ return nil;
}
dtime = d->mtime;
free(d);
@@ -358,31 +302,35 @@
if(etime - dtime < 60*60){
/* up to the first hour, try every 15 minutes */
if(time(0) - etime < 15*60)
- return;
+ return nil;
} else {
/* after the first hour, try once an hour */
if(time(0) - etime < 60*60)
- return;
+ return nil;
}
-
}
/*
* open control and data
*/
- b = sysopen(file(dp->name, 'C'), "rl", 0660);
- if(b == 0) {
+ j = malloc(sizeof(Job));
+ if(j == nil)
+ return nil;
+ memset(j, 0, sizeof(Job));
+ j->dp = dp;
+ j->dfd = -1;
+ j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+ if(j->b == 0) {
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
- return;
+ return nil;
}
- dfd = open(file(dp->name, 'D'), OREAD);
- if(dfd < 0){
+ j->dfd = open(file(dp->name, 'D'), OREAD);
+ if(j->dfd < 0){
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
/*
@@ -390,48 +338,36 @@
* - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it
*/
- buf = malloc(dp->length+1);
- if(buf == 0){
+
+ j->buf = malloc(dp->length+1);
+ if(j->buf == nil){
warning("buffer allocation", 0);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- return;
+ freejob(j);
+ return nil;
}
- if(Bread(b, buf, dp->length) != dp->length){
+ if(Bread(j->b, j->buf, dp->length) != dp->length){
warning("reading control file %s\n", dp->name);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- free(buf);
- return;
+ freejob(j);
+ return nil;
}
- buf[dp->length] = 0;
- av = malloc(2*sizeof(char*));
- if(av == 0){
+ j->buf[dp->length] = 0;
+ j->av = malloc(2*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
- for(ac = 1, cp = buf; *cp; ac++){
+ for(j->ac = 1, cp = j->buf; *cp; j->ac++){
while(isspace(*cp))
*cp++ = 0;
if(*cp == 0)
break;
- av = realloc(av, (ac+2)*sizeof(char*));
- if(av == 0){
+ j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
}
- av[ac] = cp;
+ j->av[j->ac] = cp;
while(*cp && !isspace(*cp)){
if(*cp++ == '"'){
while(*cp && *cp != '"')
@@ -441,18 +377,18 @@
}
}
}
- av[0] = cmd;
- av[ac] = 0;
+ j->av[0] = cmd;
+ j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){
- if(returnmail(av, dp->name, "Giveup") != 0)
- logit("returnmail failed", dp->name, av);
+ if(returnmail(j->av, dp->name, "Giveup") != 0)
+ logit("returnmail failed", dp->name, j->av);
remmatch(dp->name);
goto done;
}
for(i = 0; i < nbad; i++){
- if(strcmp(av[3], badsys[i]) == 0)
+ if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
goto done;
}
@@ -460,33 +396,34 @@
* Ken's fs, for example, gives us 5 minutes of inactivity before
* the lock goes stale, so we have to keep reading it.
*/
- l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+ j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
/*
* transfer
*/
- pid = fork();
- switch(pid){
+ j->pid = fork();
+ switch(j->pid){
case -1:
- sysunlock(l);
- sysunlockfile(Bfildes(b));
+ sysunlock(j->l);
+ sysunlockfile(Bfildes(j->b));
syslog(0, runqlog, "out of procs");
exits(0);
case 0:
if(debug) {
- fprint(2, "Starting %s", cmd);
- for(ac = 0; av[ac]; ac++)
- fprint(2, " %s", av[ac]);
+ fprint(2, "Starting %s\n", cmd);
+ for(i = 0; j->av[i]; i++)
+ fprint(2, " %s", j->av[i]);
fprint(2, "\n");
}
- logit("execing", dp->name, av);
+ logit("execing", dp->name, j->av);
close(0);
- dup(dfd, 0);
- close(dfd);
+ dup(j->dfd, 0);
+ close(j->dfd);
close(2);
efd = open(file(dp->name, 'E'), OWRITE);
if(efd < 0){
- if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+ if(debug)
+ syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
efd = create(file(dp->name, 'E'), OWRITE, 0666);
if(efd < 0){
if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +431,72 @@
}
}
seek(efd, 0, 2);
- exec(cmd, av);
+ exec(cmd, j->av);
error("can't exec %s", cmd);
break;
default:
- for(;;){
- wm = wait();
- if(wm == nil)
- error("wait failed: %r", "");
- if(wm->pid == pid)
- break;
- free(wm);
- }
- if(debug)
- fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+ return j;
+ }
+done:
+ freejob(j);
+ return nil;
+}
- if(wm->msg[0]){
- if(debug)
- fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
- if(!Rflag && strstr(wm->msg, "Retry")==0){
- /* return the message and remove it */
- if(returnmail(av, dp->name, wm->msg) != 0)
- logit("returnmail failed", dp->name, av);
- remmatch(dp->name);
- } else {
- /* add sys to bad list and try again later */
- nbad++;
- badsys = realloc(badsys, nbad*sizeof(char*));
- badsys[nbad-1] = strdup(av[3]);
- }
+/*
+ * Handle the completion of a job.
+ * Wait for the pid, check its status,
+ * and then pop the job off the list.
+ * Return the next running job.
+ */
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+ Job *n;
+
+ if(debug)
+ fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+ if(wm->msg[0]){
+ if(debug)
+ fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+ if(!Rflag && strstr(wm->msg, "Retry")==0){
+ /* return the message and remove it */
+ if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+ logit("returnmail failed", j->dp->name, j->av);
+ remmatch(j->dp->name);
} else {
- /* it worked remove the message */
- remmatch(dp->name);
+ /* add sys to bad list and try again later */
+ nbad++;
+ badsys = realloc(badsys, nbad*sizeof(char*));
+ badsys[nbad-1] = strdup(j->av[3]);
}
- free(wm);
+ } else {
+ /* it worked remove the message */
+ remmatch(j->dp->name);
+ }
+ n = j->next;
+ freejob(j);
+ return n;
+}
+/*
+ * Release resources associated with
+ * a job.
+ */
+void
+freejob(Job *j)
+{
+ if(j->b != nil){
+ sysunlockfile(Bfildes(j->b));
+ Bterm(j->b);
}
-done:
- if (l)
- sysunlock(l);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- free(buf);
- free(av);
- close(dfd);
+ if(j->dfd != -1)
+ close(j->dfd);
+ if(j->l != nil)
+ sysunlock(j->l);
+ free(j->buf);
+ free(j->av);
+ free(j);
}
@@ -690,72 +649,4 @@
n += m + 3;
}
syslog(0, runqlog, "%s", buf);
-}
-
-char *loadfile = ".runqload";
-
-/*
- * load balancing
- */
-void
-doload(int start)
-{
- int fd;
- char buf[32];
- int i, n;
- Mlock *l;
- Dir *d;
-
- if(load <= 0)
- return;
-
- if(chdir(root) < 0){
- load = 0;
- return;
- }
-
- l = syslock(loadfile);
- fd = open(loadfile, ORDWR);
- if(fd < 0){
- fd = create(loadfile, 0666, ORDWR);
- if(fd < 0){
- load = 0;
- sysunlock(l);
- return;
- }
- }
-
- /* get current load */
- i = 0;
- n = read(fd, buf, sizeof(buf)-1);
- if(n >= 0){
- buf[n] = 0;
- i = atoi(buf);
- }
- if(i < 0)
- i = 0;
-
- /* ignore load if file hasn't been changed in 30 minutes */
- d = dirfstat(fd);
- if(d != nil){
- if(d->mtime + 30*60 < time(0))
- i = 0;
- free(d);
- }
-
- /* if load already too high, give up */
- if(start && i >= load){
- sysunlock(l);
- exits(0);
- }
-
- /* increment/decrement load */
- if(start)
- i++;
- else
- i--;
- seek(fd, 0, 0);
- fprint(fd, "%d\n", i);
- sysunlock(l);
- close(fd);
}