ref: 2852fc75b203598e781db315ea6b2a7735d2db4a
parent: 3d91225b38e4e1a87b60e065c2f02c16ea0a8c1e
author: qwx <qwx@sciops.net>
date: Sat Mar 6 21:03:58 EST 2021
basic network protocol and communication - com.c: protocol/communication code - net.c now only deals with networking - make pause a request to the local server - fix not initializing buffer in kproc - give sim proc higher stack size, not main client communicates to the local server via a pipe, but transparently wrt the server. network data is read in separate procs and csp is used for communication and syncing. each connection has its own buffer which is blocked until parsed.
--- /dev/null
+++ b/com.c
@@ -1,0 +1,168 @@
+#include <u.h>
+#include <libc.h>
+#include <draw.h>
+#include <fcall.h>
+#include "dat.h"
+#include "fns.h"
+
+enum{
+ Hdrsz = 4,
+};
+typedef struct Header Header;
+struct Header{
+ int empty;
+};
+
+static int
+vpack(uchar *p, uchar *e, char *fmt, va_list a)
+{
+ int n, sz;
+ uchar u[8];
+ u32int v;
+ u64int w;
+
+ sz = 0;
+ for(;;){
+ n = 0;
+ switch(*fmt++){
+ default: sysfatal("unknown format %c", fmt[-1]);
+ copy: if(p + n > e) sysfatal("vpack: buffer overflow");
+ memcpy(p, u, n); p += n; break;
+ case 0: return sz;
+ case 'h': v = va_arg(a, int); PBIT8(u, v); n = sizeof(u8int); goto copy;
+ case 's': v = va_arg(a, int); PBIT16(u, v); n = sizeof(u16int); goto copy;
+ case 'l': v = va_arg(a, int); PBIT32(u, v); n = sizeof(u32int); goto copy;
+ case 'v': w = va_arg(a, vlong); PBIT64(u, w); n = sizeof(u64int); goto copy;
+ }
+ sz += n;
+ }
+}
+
+static int
+vunpack(uchar *p, uchar *e, char *fmt, va_list a)
+{
+ int n, sz;
+
+ sz = 0;
+ for(;;){
+ switch(*fmt++){
+ default: sysfatal("vunpack: unknown format %c", fmt[-1]);
+ error: werrstr("vunpack: truncated message"); return -1;
+ case 0: return sz;
+ case 'h': n = sizeof(u8int); if(p + n > e) goto error;
+ *va_arg(a, int*) = GBIT8(p); p += n;
+ break;
+ case 's': n = sizeof(u16int); if(p + n > e) goto error;
+ *va_arg(a, int*) = GBIT16(p); p += n;
+ break;
+ case 'l': n = sizeof(u32int); if(p + n > e) goto error;
+ *va_arg(a, int*) = GBIT32(p); p += n;
+ break;
+ case 'v': n = sizeof(u64int); if(p + n > e) goto error;
+ *va_arg(a, vlong*) = GBIT64(p); p += n;
+ break;
+ }
+ sz += n;
+ }
+}
+
+static int
+pack(uchar *p, uchar *e, char *fmt, ...)
+{
+ int n;
+ va_list a;
+
+ va_start(a, fmt);
+ n = vpack(p, e, fmt, a);
+ va_end(a);
+ return n;
+}
+
+static int
+unpack(uchar *p, uchar *e, char *fmt, ...)
+{
+ int n;
+ va_list a;
+
+ va_start(a, fmt);
+ n = vunpack(p, e, fmt, a);
+ va_end(a);
+ return n;
+}
+
+static int
+reqpause(uchar *p, uchar *e)
+{
+ int dicks;
+
+ if(unpack(p, e, "l", &dicks) < 0){
+ fprint(2, "reqpause: %r\n");
+ return -1;
+ }
+ pause ^= 1;
+ return 0;
+}
+
+static int
+readhdr(Msg *m, Header *h)
+{
+ USED(h);
+ if(m->sz <= Hdrsz)
+ return -1;
+ return 0;
+}
+
+int
+parsemsg(Msg *m)
+{
+ int n, type;
+ uchar *p, *e;
+ Header h;
+
+ if(readhdr(m, &h) < 0)
+ return -1;
+ p = m->buf + Hdrsz;
+ e = p + sizeof(m->buf) - Hdrsz;
+ while(p < e){
+ type = *p++;
+ switch(type){
+ case Tpause:
+ if((n = reqpause(p, e)) < 0){
+ dprint("parse: invalid Tpause: %r\n");
+ return -1;
+ }
+ break;
+ default:
+ dprint("parse: invalid message type %ux\n", type);
+ return -1;
+ }
+ p += n;
+ }
+ return 0;
+}
+
+static void
+newmsg(Msg *m)
+{
+ Header h;
+
+ USED(h);
+ m->sz += Hdrsz;
+}
+
+int
+sendpause(void)
+{
+ int n;
+ Msg *m;
+
+ m = getclbuf();
+ if(m->sz == 0)
+ newmsg(m);
+ if((n = pack(m->buf + m->sz, m->buf + sizeof m->buf, "hl", Tpause, 0)) < 0){
+ fprint(2, "sendpause: %r\n");
+ return -1;
+ }
+ m->sz += n;
+ return 0;
+}
--- a/dat.h
+++ b/dat.h
@@ -11,6 +11,8 @@
typedef struct Map Map;
typedef struct Resource Resource;
typedef struct Team Team;
+typedef struct Cbuf Cbuf;
+typedef struct Msg Msg;
enum{
Nresource = 3,
@@ -194,6 +196,17 @@
enum{
Tquit,
+ Tpause,
+
+ Nbuf = 4096,
+};
+struct Cbuf{
+ uchar buf[Nbuf];
+ int sz;
+};
+struct Msg{
+ Team *t;
+ Cbuf;
};
enum{
--- a/fns.h
+++ b/fns.h
@@ -1,3 +1,8 @@
+void clearmsg(Msg*);
+Msg* readnet(void);
+void initnet(char*);
+int parsemsg(Msg*);
+int sendpause(void);
void stepsnd(void);
void initsnd(void);
void linktomap(Mobj*);
@@ -7,9 +12,7 @@
void initsv(int, char*);
void flushcl(void);
void packcl(char*, ...);
-void stepnet(void);
-void joinnet(char*);
-void listennet(void);
+Msg* getclbuf(void);
void dopan(Point);
void select(Point);
void move(Point);
--- a/fs.c
+++ b/fs.c
@@ -111,7 +111,7 @@
continue;
}
if(pic0.h % Nteam != 0)
- sysfatal("loadobjpic: obj %s sprite sheet %d,%d: height not multiple of %d\n",
+ sysfatal("loadobjpic: obj %s sprite sheet %d,%d: height not multiple of %d",
pl->name, pic0.w, pic0.h, Nteam);
pic0.h /= Nteam;
n = pic0.w * pic0.h;
@@ -136,7 +136,7 @@
snprint(path, sizeof path, "%s.bit", tileset);
loadpic(path, &tilesetpic, 0);
if(tilesetpic.h % tilesetpic.w != 0)
- sysfatal("loadterpic: tiles not squares: tilepic %d,%d\n",
+ sysfatal("loadterpic: tiles not squares: tilepic %d,%d",
tilesetpic.w, tilesetpic.h);
}
id = pl->frm;
--- a/mkfile
+++ b/mkfile
@@ -3,6 +3,7 @@
TARG=sce
OFILES=\
bmap.$O\
+ com.$O\
drw.$O\
fs.$O\
map.$O\
--- a/net.c
+++ b/net.c
@@ -10,153 +10,119 @@
typedef struct Con Con;
enum{
- Ncon = Nteam * 4,
- Nbuf = 4096,
+ Ncon = Nteam * 2,
};
struct Con{
int fd;
- Team *t;
+ Msg;
+ Channel *waitc;
};
static Con con[Ncon];
-static Channel *lc;
-static uchar cbuf[Nbuf], *cbufp = cbuf;
+static Msg clbuf, sendbuf;
+Channel *conc;
+static int clpfd[2];
+
static void
closenet(Con *c)
{
close(c->fd);
c->fd = -1;
+ chanfree(c->waitc);
+ c->waitc = nil;
}
static void
-flushcmd(Con *c)
+flushreq(Con *c, Cbuf *cb)
{
- int n;
-
- if((n = cbufp - cbuf) == 0)
+ if(cb->sz == 0)
return;
- if(write(c->fd, cbuf, n) != n){
+ if(write(c->fd, cb->buf, cb->sz) != cb->sz){
fprint(2, "flushcmd: %r\n");
- closenet(c);
+ close(c->fd);
}
- cbufp = cbuf;
}
static void
-writecmd(Con *c, char *fmt, va_list a)
+writenet(void)
{
- union{
- uchar u[4];
- s32int l;
- } u;
+ Con *c;
- for(;;){
- if(cbufp - cbuf > sizeof(cbuf) - 4)
- flushcmd(c);
- switch(*fmt++){
- default: sysfatal("unknown format %c", fmt[-1]);
- case 0: return;
- case 'u':
- u.l = va_arg(a, s32int);
- memcpy(cbufp, u.u, sizeof u.u);
- cbufp += sizeof u.u;
- }
+ for(c=con; c<con+nelem(con); c++){
+ if(c->fd < 0)
+ continue;
+ flushreq(c, &sendbuf);
}
+ sendbuf.sz = 0;
}
-static void
-packcmd(Con *c, char *fmt, ...)
-{
- va_list a;
-
- va_start(a, fmt);
- writecmd(c, fmt, a);
- va_end(a);
-}
-
void
flushcl(void)
{
- flushcmd(con);
+ if(clbuf.sz == 0)
+ return;
+ write(clpfd[0], clbuf.buf, clbuf.sz);
+ clbuf.sz = 0;
}
void
-packcl(char *fmt, ...)
+clearmsg(Msg *m)
{
- va_list a;
+ Con *c;
- va_start(a, fmt);
- packcmd(con, fmt, a);
- va_end(a);
+ for(c=con; c<con+sizeof con; c++)
+ if(m == &c->Msg)
+ break;
+ assert(c < con + sizeof con);
+ m->sz = 0;
+ nbsendul(c->waitc, 0);
}
-static void
-writenet(void)
+Msg *
+readnet(void)
{
Con *c;
- for(c=con; c<con+nelem(con); c++){
- if(c->fd < 0)
- continue;
- }
+ if((c = nbrecvp(conc)) == nil)
+ return nil;
+ return &c->Msg;
}
-static void
-execbuf(Con *c, uchar *p, uchar *e)
+Msg *
+getclbuf(void)
{
- int n;
-
- while(p < e){
- n = *p++;
- switch(n){
- case Tquit: closenet(c); return;
- }
- }
+ return &clbuf;
}
static void
-readnet(void)
+conproc(void *cp)
{
- int n, m;
+ int n;
Con *c;
- uchar buf[Nbuf], *p;
- for(c=con; c<con+nelem(con); c++){
- if(c->fd < 0)
- continue;
- for(m=sizeof buf, p=buf; (n = flen(c->fd)) > 1 && n <= m; m-=n, p+=n){
- if(read(c->fd, p, n) != n){
- fprint(2, "readnet: %r\n");
- closenet(c);
- break;
- }
+ c = cp;
+ for(;;){
+ if((n = read(c->fd, c->buf, sizeof c->buf)) <= 0){
+ fprint(2, "cproc %zd: %r\n", c - con);
+ closenet(c);
+ return;
}
- execbuf(c, buf, p);
+ c->sz = n;
+ sendp(conc, c);
+ recvul(c->waitc);
+ c->sz = 0;
}
}
-void
-stepnet(void)
+static void
+initcon(Con *c)
{
- int fd;
-
- if(nbrecv(lc, &fd) > 0){
- /* FIXME: add to observers (team 0) */
- }
- readnet();
- writenet();
+ if((c->waitc = chancreate(sizeof(ulong), 0)) == nil)
+ sysfatal("chancreate: %r");
+ if(proccreate(conproc, c, 8192) < 0)
+ sysfatal("proccreate: %r");
}
-void
-joinnet(char *sys)
-{
- char s[128];
-
- snprint(s, sizeof s, "%s/tcp!%s!%d", netmtpt, sys != nil ? sys : sysname(), lport);
- if((con[0].fd = dial(s, nil, nil, nil)) < 0)
- sysfatal("dial: %r");
-}
-
static int
regnet(int lfd, char *ldir)
{
@@ -165,16 +131,19 @@
for(c=con; c<con+nelem(con); c++)
if(c->fd < 0)
break;
- if(c == con + nelem(con))
- return reject(lfd, ldir, "no more open slots");
+ if(c == con + nelem(con)){
+ reject(lfd, ldir, "no more open slots");
+ return -1;
+ }
c->fd = accept(lfd, ldir);
- return c->fd;
+ initcon(c);
+ return 0;
}
static void
-lproc(void *)
+listenproc(void *)
{
- int fd, lfd;
+ int lfd;
char adir[40], ldir[40], data[100];
snprint(data, sizeof data, "%s/tcp!*!%d", netmtpt, lport);
@@ -182,22 +151,46 @@
sysfatal("announce: %r");
for(;;){
if((lfd = listen(adir, ldir)) < 0
- || (fd = regnet(lfd, ldir)) < 0
- || close(lfd) < 0
- || send(lc, &fd) < 0)
+ || regnet(lfd, ldir) < 0
+ || close(lfd) < 0)
break;
}
}
-void
+static void
listennet(void)
{
+ if(proccreate(listenproc, nil, 8192) < 0)
+ sysfatal("proccreate: %r");
+}
+
+static void
+joinnet(char *sys)
+{
+ char s[128];
Con *c;
+ snprint(s, sizeof s, "%s/tcp!%s!%d", netmtpt, sys != nil ? sys : sysname(), lport);
+ c = &con[1];
+ if((c->fd = dial(s, nil, nil, nil)) < 0)
+ sysfatal("dial: %r");
+ initcon(c);
+}
+
+void
+initnet(char *sys)
+{
+ Con *c;
+
for(c=con; c<con+nelem(con); c++)
c->fd = -1;
- if((lc = chancreate(sizeof(int), 0)) == nil)
+ if((conc = chancreate(sizeof(uintptr), 1)) == nil)
sysfatal("chancreate: %r");
- if(proccreate(lproc, nil, 8192) < 0)
- sysfatal("proccreate: %r");
+ if(pipe(clpfd) < 0)
+ sysfatal("pipe: %r");
+ con[0].fd = clpfd[1];
+ initcon(&con[0]);
+ USED(sys);
+ //joinnet(sys);
+ //listennet();
}
--- a/sce.c
+++ b/sce.c
@@ -8,14 +8,12 @@
#include "dat.h"
#include "fns.h"
-mainstacksize = 16*1024;
-
enum{
Hz = 60,
};
char *progname = "sce", *dbname, *prefix, *mapname = "map1.db";
-int pause, debugmap;
+int debugmap;
QLock drawlock;
typedef struct Kev Kev;
@@ -80,6 +78,7 @@
if((fd = open("/dev/kbd", OREAD)) < 0)
sysfatal("kproc: %r");
memset(buf, 0, sizeof buf);
+ memset(down, 0, sizeof down);
for(;;){
if(buf[0] != 0){
n = strlen(buf)+1;
@@ -225,6 +224,7 @@
if(me.b & 4)
move(me);
qunlock(&drawlock);
+ flushcl();
break;
case Akbd:
if(ke.r == Kdel)
@@ -232,9 +232,10 @@
if(!ke.down)
continue;
switch(ke.r){
- case KF|1: debugmap ^= 1; pause ^= 1; break;
- case ' ': pause ^= 1; break;
+ case KF|1: debugmap ^= 1; break;
+ case ' ': sendpause(); break;
}
+ flushcl();
break;
case Atic:
updatefb();
--- a/sv.c
+++ b/sv.c
@@ -8,6 +8,7 @@
extern QLock drawlock;
vlong tc;
+int pause;
static int tdiv;
@@ -14,7 +15,13 @@
static void
step(vlong tics)
{
+ Msg *m;
+
qlock(&drawlock);
+ while((m = readnet()) != nil){
+ parsemsg(m);
+ clearmsg(m);
+ }
while(!pause && tics-- > 0)
stepsim();
qunlock(&drawlock);
@@ -25,7 +32,7 @@
{
vlong t, t0, dt, Δtc;
- USED(sys);
+ initnet(sys);
initsim();
Δtc = 1;
t0 = nsec();
@@ -47,6 +54,6 @@
initsv(int tv, char *sys)
{
tdiv = Te9 / (tv * 3);
- if(proccreate(simproc, sys, 8192) < 0)
+ if(proccreate(simproc, sys, 16*1024) < 0)
sysfatal("proccreate: %r");
}