ref: 51319cc5b56924b03d9c6188f6a1e189a497d42c
parent: 41f85d46f866c57a618b8df85c0e59cebc04f095
author: Ori Bernstein <ori@eigenstate.org>
date: Sat Jan 23 11:05:21 EST 2021
upas/runq: bring back -a Turns out -a is useful in crontab, so bring back a simplified version of it. This only iterates through directories one at a time.
--- a/sys/man/8/qer
+++ b/sys/man/8/qer
@@ -15,7 +15,7 @@
.br
.B runq
[
-.B -dER
+.B -adER
]
[
.B -f
@@ -50,13 +50,7 @@
The data file contains the standard input to
.IR qer .
The files are created in the directory
-.IR root / subdir ,
-where
-.I subdir
-is the argument to
-.B -q
-if present, else the contents of
-.BR /dev/user .
+.IR root / subdi
The names of the control and data files differ only
in the first character which is `C' and `D' respectively.
.IR Mktemp (2)
@@ -77,6 +71,18 @@
only in the first character. The first one
starts with 'F', the second 'G', etc.
.P
+Qer takes the following arguments:
+.TP
+.B -q subdir
+Specifies the queue subdirectory to use. If
+unspecified, the contents of
+.B /dev/user
+are used.
+.TP
+.B -f file
+Specifies the files to copy into the queue
+directory, in the manner described above.
+.P
.I Runq
processes the files queued by
.IR qer .
@@ -124,32 +130,34 @@
error file exists and was last modified within the preceding 10 minutes.
A data file older than one hour will not be processed if its error
file exists and was last modified within the preceding hour.
-The
+.PP
+The following flags are accepted by runq:
+.TP
+.B -a
+Causes runq to process all user directories in sequence, instead
+of only the directory of the current user.
+.TP
.B -E
-flag causes all files to be reprocessed regardless of
+Causes all files to be reprocessed regardless of
the file times.
-.P
-The
+.TP
.B -R
-flag instructs
+Instructs
.I runq
never to give up on a failed queue job, instead leaving
it in the queue to be retried.
-.P
-The
+.TP
.B -d
-option causes debugging output on standard error
+Causes debugging output on standard error
describing the progress through the queues.
-.P
-The
+.TP
.B -t
-flags specifies the number of hours
+Specifies the number of hours
that retries will continue after a send
failure. The default is 48 hours.
-.P
-The
+.TP
.BR -r
-flag limits the number of files that are processed in a single pass of a queue.
+Limits the number of files that are processed in a single pass of a queue.
.I Runq
accumulates the entire directory containing a queue before processing any
files. When a queue contains many files and the system does not
@@ -161,10 +169,9 @@
be drained incrementally. It is most useful in combination with the
.I -q
flag.
-.P
-The argument following the
+.TP
.B -n
-flag specifies the number of queued jobs that are processed
+Specifies the number of queued jobs that are processed
in parallel from the queue; the default is 1.
This is useful for a large queue to be processed with a bounded
amount of parallelism.
--- a/sys/src/cmd/upas/q/runq.c
+++ b/sys/src/cmd/upas/q/runq.c
@@ -2,7 +2,14 @@
#include <ctype.h>
typedef struct Job Job;
+typedef struct Wdir Wdir;
+struct Wdir {
+ Dir *d;
+ int nd;
+ char *name;
+};
+
struct Job {
Job *next;
int pid;
@@ -10,6 +17,7 @@
int dfd;
char **av;
char *buf; /* backing for av */
+ Wdir *wdir; /* work dir */
Dir *dp; /* not owned */
Mlock *l;
Biobuf *b;
@@ -17,7 +25,7 @@
void doalldirs(void);
void dodir(char*);
-Job* dofile(Dir*);
+Job* dofile(Wdir*, Dir*);
Job* donefile(Job*, Waitmsg*);
void freejob(Job*);
void rundir(char*);
@@ -24,11 +32,10 @@
char* file(char*, char);
void warning(char*, void*);
void error(char*, void*);
-int returnmail(char**, char*, char*);
-void logit(char*, char*, char**);
+int returnmail(char**, Wdir*, char*, char*);
+void logit(char*, Wdir*, char*, char**);
void doload(int);
-#define HUNK 32
char *cmd;
char *root;
int debug;
@@ -35,12 +42,6 @@
int giveup = 2*24*60*60;
int limit;
-/* the current directory */
-Dir *dirbuf;
-long ndirbuf = 0;
-int nfiles;
-char *curdir;
-
char *runqlog = "runq";
char **badsys; /* array of recalcitrant systems */
@@ -48,6 +49,7 @@
int njob = 1; /* number of concurrent jobs to invoke */
int Eflag; /* ignore E.xxxxxx dates */
int Rflag; /* no giving up, ever */
+int aflag; /* do all dirs */
void
usage(void)
@@ -82,20 +84,27 @@
case 'q':
qdir = EARGF(usage());
break;
+ case 'a':
+ aflag++;
+ break;
case 'n':
njob = atoi(EARGF(usage()));
if(njob == 0)
usage();
break;
+ default:
+ usage();
+ break;
}ARGEND;
if(argc != 2)
usage();
- if(qdir == nil)
+ if(!aflag && qdir == nil){
qdir = getuser();
- if(qdir == nil)
- error("unknown user", 0);
+ if(qdir == nil)
+ error("unknown user", 0);
+ }
root = argv[0];
cmd = argv[1];
@@ -102,7 +111,10 @@
if(chdir(root) < 0)
error("can't cd to %s", root);
- dodir(qdir);
+ if(aflag)
+ doalldirs();
+ else
+ dodir(qdir);
exits(0);
}
@@ -129,13 +141,41 @@
}
/*
+ * run all user directories, must be bootes (or root on unix) to do this
+ */
+void
+doalldirs(void)
+{
+ Dir *db;
+ int fd;
+ long i, n;
+
+
+ if((fd = open(".", OREAD)) == -1)
+ warning("opening %s", root);
+ return;
+ }
+ if((n = dirreadall(fd, &db)) == -1){
+ warning("reading %s: ", root);
+ return;
+ }
+ for(i=0; i<n; i++){
+ if((db[i].qid.type & QTDIR) == 0)
+ continue;
+ if(emptydir(db[i].name))
+ continue;
+ dodir(db[i].name);
+ }
+ free(db);
+ close(fd);
+}
+
+/*
* cd to a user directory and run it
*/
void
dodir(char *name)
{
- curdir = name;
-
if(chdir(name) < 0){
warning("cd to %s", name);
return;
@@ -152,9 +192,10 @@
void
rundir(char *name)
{
- Job *hd, *j, **p;
int nlive, fidx, fd, found;
+ Job *hd, *j, **p;
Waitmsg *w;
+ Wdir wd;
fd = open(".", OREAD);
if(fd == -1){
@@ -164,14 +205,15 @@
fidx= 0;
hd = nil;
nlive = 0;
- nfiles = dirreadall(fd, &dirbuf);
- while(nlive > 0 || fidx< nfiles){
- while(fidx< nfiles && nlive < njob){
- if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+ wd.name = name;
+ wd.nd = dirreadall(fd, &wd.d);
+ while(nlive > 0 || fidx< wd.nd){
+ while(fidx< wd.nd && nlive < njob){
+ if(strncmp(wd.d[fidx].name, "C.", 2) != 0){
fidx++;
continue;
}
- if((j = dofile(&dirbuf[fidx])) != nil){
+ if((j = dofile(&wd, &wd.d[fidx])) != nil){
nlive++;
j->next = hd;
hd = j;
@@ -201,7 +243,7 @@
goto rescan;
}
assert(hd == nil);
- free(dirbuf);
+ free(wd.d);
close(fd);
}
@@ -209,15 +251,15 @@
* free files matching name in the current directory
*/
void
-remmatch(char *name)
+remmatch(Wdir *w, char *name)
{
long i;
- syslog(0, runqlog, "removing %s/%s", curdir, name);
+ syslog(0, runqlog, "removing %s/%s", w->name, name);
- for(i=0; i<nfiles; i++){
- if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
- remove(dirbuf[i].name);
+ for(i=0; i<w->nd; i++){
+ if(strcmp(&w->d[i].name[1], &name[1]) == 0)
+ remove(w->d[i].name);
}
/* error file (may have) appeared after we read the directory */
@@ -263,7 +305,7 @@
* tracks the running pid.
*/
Job*
-dofile(Dir *dp)
+dofile(Wdir *w, Dir *dp)
{
int dtime, efd, i, etime;
Job *j;
@@ -280,13 +322,13 @@
d = dirstat(file(dp->name, 'D'));
if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name);
- remmatch(dp->name);
+ remmatch(w, dp->name);
return nil;
}
if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){
syslog(0, runqlog, "empty ctl file for %s", dp->name);
- remmatch(dp->name);
+ remmatch(w, dp->name);
}
return nil;
}
@@ -338,7 +380,7 @@
* - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it
*/
-
+ j->wdir = w;
j->buf = malloc(dp->length+1);
if(j->buf == nil){
warning("buffer allocation", 0);
@@ -381,9 +423,9 @@
j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){
- if(returnmail(j->av, dp->name, "Giveup") != 0)
- logit("returnmail failed", dp->name, j->av);
- remmatch(dp->name);
+ if(returnmail(j->av, w, dp->name, "Giveup") != 0)
+ logit("returnmail failed", w, dp->name, j->av);
+ remmatch(w, dp->name);
goto done;
}
@@ -415,7 +457,7 @@
fprint(2, " %s", j->av[i]);
fprint(2, "\n");
}
- logit("execing", dp->name, j->av);
+ logit("execing", w, dp->name, j->av);
close(0);
dup(j->dfd, 0);
close(j->dfd);
@@ -461,9 +503,9 @@
fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
if(!Rflag && strstr(wm->msg, "Retry")==0){
/* return the message and remove it */
- if(returnmail(j->av, j->dp->name, wm->msg) != 0)
- logit("returnmail failed", j->dp->name, j->av);
- remmatch(j->dp->name);
+ if(returnmail(j->av, j->wdir, j->dp->name, wm->msg) != 0)
+ logit("returnmail failed", j->wdir, j->dp->name, j->av);
+ remmatch(j->wdir, j->dp->name);
} else {
/* add sys to bad list and try again later */
nbad++;
@@ -472,7 +514,7 @@
}
} else {
/* it worked remove the message */
- remmatch(j->dp->name);
+ remmatch(j->wdir, j->dp->name);
}
n = j->next;
freejob(j);
@@ -520,7 +562,7 @@
* return 0 if successful
*/
int
-returnmail(char **av, char *name, char *msg)
+returnmail(char **av, Wdir *w, char *name, char *msg)
{
char buf[256], attachment[Pathlen], *sender;
int i, fd, pfd[2];
@@ -529,7 +571,7 @@
String *s;
if(av[1] == 0 || av[2] == 0){
- logit("runq - dumping bad file", name, av);
+ logit("runq - dumping bad file", w, name, av);
return 0;
}
@@ -537,21 +579,21 @@
sender = s_to_c(s);
if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
- logit("runq - dumping p to p mail", name, av);
+ logit("runq - dumping p to p mail", w, name, av);
return 0;
}
if(pipe(pfd) < 0){
- logit("runq - pipe failed", name, av);
+ logit("runq - pipe failed", w, name, av);
return -1;
}
switch(rfork(RFFDG|RFPROC|RFENVG)){
case -1:
- logit("runq - fork failed", name, av);
+ logit("runq - fork failed", w, name, av);
return -1;
case 0:
- logit("returning", name, av);
+ logit("returning", w, name, av);
close(pfd[1]);
close(0);
dup(pfd[0], 0);
@@ -592,7 +634,7 @@
wm = wait();
if(wm == nil){
syslog(0, "runq", "wait: %r");
- logit("wait failed", name, av);
+ logit("wait failed", w, name, av);
return -1;
}
i = 0;
@@ -599,7 +641,7 @@
if(wm->msg[0]){
i = -1;
syslog(0, "runq", "returnmail child: %s", wm->msg);
- logit("returnmail child failed", name, av);
+ logit("returnmail child failed", w, name, av);
}
free(wm);
return i;
@@ -635,12 +677,12 @@
}
void
-logit(char *msg, char *file, char **av)
+logit(char *msg, Wdir *w, char *file, char **av)
{
int n, m;
char buf[256];
- n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
+ n = snprint(buf, sizeof(buf), "%s/%s: %s", w->name, file, msg);
for(; *av; av++){
m = strlen(*av);
if(n + m + 4 > sizeof(buf))