ref: 5745445bdf695dc69d3c15189e083432f2c1f42c
dir: /main.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include "adts.h" #include "ivf.h" #include "rtmp.h" #include "util.h" typedef struct Conn Conn; struct Conn { char *url; RTMP *r; ulong sid; }; enum { Abufsz = 441*2*2, /* 1/100s */ }; int mainstacksize = 65536; int debug = 0; static Conn *cs; static int ncs; static uvlong ns₀, vms; static int afd; static Channel *ans₀; static uvlong ns2ms(uvlong z, uvlong ns) { if(z != 0 && z != Zns₀) ns = z - ns₀ + ns; return ns / 1000000ULL; } static void audioenc(void *aux) { int nssent, fd, n; u8int *buf; uvlong ns; buf = emalloc(Abufsz); fd = *(int*)aux; nssent = 0; for(;;){ if((n = readn(afd, buf, Abufsz)) < 1) break; if(nssent == 0){ ns = nsec() - 10000000ULL; if(send(ans₀, &ns) != 1) break; nssent = 1; } if(write(fd, buf, n) != n) break; } chanclose(ans₀); close(afd); close(fd); threadexits(nil); } static void audiosend(void *aux) { ADTSFrame af; Biobuf *a; u64int ms; Conn *c; int i; if((a = Bfdopen(*(int*)aux, OREAD)) == nil) sysfatal("%r"); memset(&af, 0, sizeof(af)); if(recv(ans₀, &af.ns₀) != 1) sysfatal("no audio timestamp"); for(;;){ if(adtsread(a, &af) != 0) sysfatal("%r"); if(af.sz == 0) /* eof */ break; ms = ns2ms(af.ns₀, af.ns); for(c = cs, i = 0; i < ncs; i++, c++){ if(rtmpdata(c->r, c->sid, ms, Taudio, af.buf, af.sz) != 0){ fprint(2, "%s: %r\n", c->url); goto out; } } /* protect against overruns */ if(vms+200 < ms) sleep(100); } /* FIXME properly close RTMP connection */ out: threadexitsall(nil); } static void audio(void) { static int p[4], pid; Dir d; nulldir(&d); d.length = Abufsz; pipe(p); pipe(p+2); dirfwstat(p[0], &d); if((pid = fork()) < 0) sysfatal("%r"); if(pid == 0){ close(afd); dup(p[0], 0); close(p[0]); dup(p[2], 1); close(p[2]); execl("/bin/audio/aacenc", "audio/aacenc", nil); sysfatal("aacenc: %r"); } close(p[0]); close(p[2]); ans₀ = chancreate(sizeof(uvlong), 1); proccreate(audioenc, &p[1], mainstacksize); proccreate(audiosend, &p[3], mainstacksize); } static void usage(void) { fprint(2, "usage: %s [-a AUDIO] URL [URL...]\n", argv0); threadexitsall("usage"); } void threadmain(int argc, char **argv) { IVFrame vf; u64int ms; Biobuf v; IVF ivf; Conn *c; int i; afd = open("/dev/zero", OREAD|OCEXEC); ARGBEGIN{ case 'd': debug++; break; case 'a': if((afd = open(EARGF(usage()), OREAD|OCEXEC)) < 0) sysfatal("%r"); break; default: usage(); }ARGEND ncs = argc; if(ncs < 1) usage(); ns₀ = nsec() - 10ULL*1000000000ULL; /* base, -10s */ srand(time(nil)); if(Binit(&v, 0, OREAD) != 0 || ivfopen(&v, &ivf) != 0) sysfatal("%r"); if(strcmp(ivf.type, "AVC1") != 0) sysfatal("not H.264"); cs = ecalloc(argc, sizeof(*cs)); for(c = cs, i = 0; i < ncs; i++, c++){ c->url = "rtmp://REDACTED";//FIXME the key has to be redacted argv[i]; if((c->r = rtmpdial(argv[i])) == nil) sysfatal("%r"); if(rtmpstream(c->r, &c->sid) != 0 || rtmppublish(c->r, c->sid, PubLive, nil) != 0 || rtmpmeta(c->r, c->sid, VcodecH264, ivf.w, ivf.h, afd >= 0 ? AcodecAAC : -1) != 0){ sysfatal("%r"); } } if(afd >= 0) audio(); memset(&vf, 0, sizeof(vf)); for(;;){ if(ivfread(&ivf, &vf) != 0) sysfatal("%r"); if(vf.sz == 0) break; ms = ns2ms(ivf.ns₀, vf.ns); vms = ms; for(c = cs, i = 0; i < ncs; i++, c++){ if(rtmpdata(c->r, c->sid, ms, Tvideo, vf.buf, vf.sz) != 0){ fprint(2, "%s: %r\n", c->url); goto out; } } } /* FIXME properly close RTMP connection */ out: close(afd); threadexitsall(nil); }