ref: 0ce01d66fb72ea51b7dd0e885a01e2bba1eda7ff
dir: /rtmp.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include <libsec.h> #include "amf.h" #include "ivf.h" #include "rtmp.h" #include "util.h" #define min(a,b) ((a)<(b)?(a):(b)) enum { SzLarge, SzMedium, SzSmall, SzTiny, Port = 1935, Sigsz = 1536, Chunk = 128, ChanCtl = 3, CbWhat = 0, CbInvoke, CbData, CbStatus, NumCbA, CtlStreamBegin = 0, CtlStreamEnd, CtlStreamDry, CtlStreamRecorded, CtlPing = 6, CtlBufferEmpty = 31, CtlBufferReady, PktChunkSz = 1, PktBytesReadReport = 3, PktControl = 4, PktServerBW = 5, PktClientBW, PktAudio = 8, PktVideo, PktFlexStreamSend = 15, PktFlexSharedObj, PkgFlexMessage, PktFlexInfo, PktSharedObj = 19, PktInvoke = 20, PktFlashVideo = 22, Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */ Bufsz = 64*1024, }; typedef struct Invoke Invoke; typedef struct Packet Packet; struct Invoke { void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux); void *aux; int n; Invoke *prev, *next; }; struct Packet { int type; int ht; int chan; u32int ts; u8int *data; int sz; Invoke invoke; }; struct RTMP { Biobufhdr; Channel *c; char *app; char *path; char *tcurl; Packet pk; u8int *b, *p, *e; int chunk; int mode; int bsz; int i; struct { int n; Invoke *w; }invokes; u8int biobuf[Biobufsz]; }; #define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0) #define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0) #define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0) #define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0) #define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0) #define putarr() do{ r->p = amfarr(r->p, r->e); }while(0) #define putobj() do{ r->p = amfobj(r->p, r->e); }while(0) #define putend() do{ r->p = amfend(r->p, r->e); }while(0) #define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0) #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0) #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0) #define putinvoke(name) do { \ putstr(name); \ putnum(r->pk.invoke.n); \ putobj(); \ }while(0) static int szs[] = { [SzTiny] = 1, [SzSmall] = 4, [SzMedium] = 8, [SzLarge] = 12, }; static char *pktype2s[] = { [PktChunkSz] = "ChunkSz", [PktBytesReadReport] = "BytesReadReport", [PktControl] = "Control", [PktServerBW] = "ServerBW", [PktClientBW] = "ClientBW", [PktAudio] = "Audio", [PktVideo] = "Video", [PktFlexStreamSend] = "FlexStreamSend", [PktFlexSharedObj] = "FlexSharedObj", [PktFlexInfo] = "FlexInfo", [PktSharedObj] = "SharedObj", [PktInvoke] = "Invoke", [PktFlashVideo] = "FlashVideo", }; static char *ctl2s[] = { [CtlStreamBegin] = "StreamBegin", [CtlStreamEnd] = "StreamEnd", [CtlStreamDry] = "StreamDry", [CtlStreamRecorded] = "StreamRecorded", [CtlPing] = "Ping", [CtlBufferEmpty] = "BufferEmpty", [CtlBufferReady] = "BufferReady", }; extern int debug; #pragma varargck type "T" int static int pktypefmt(Fmt *f) { char *s; int t; if((t = va_arg(f->args, int)) >= 0 && t < nelem(pktype2s) && (s = pktype2s[t]) != nil) return fmtprint(f, "%s", s); return fmtprint(f, "%d", t); } #pragma varargck type "P" Packet* static int pkfmt(Fmt *f) { u8int *s, *e; Packet *p; Amf *a; p = va_arg(f->args, Packet*); fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz); s = p->data; e = s + p->sz; if(p->type == PktInvoke){ fmtprint(f, ":"); for(; s != nil && s != e;){ if((s = amfparse(&a, s, e)) != nil) fmtprint(f, " %A", a); else fmtprint(f, " %r"); amffree(a); } } return 0; } static void newpacket(RTMP *r, int type, int ht, int chan) { memset(&r->pk, 0, sizeof(r->pk)); r->pk.type = type; r->pk.ht = ht; r->pk.chan = chan; r->p = r->b; if(type == PktInvoke) r->pk.invoke.n = ++r->invokes.n; } static void bextend(RTMP *r, int bsz) { u8int *ob; if(r->bsz >= bsz) return; ob = r->b; r->b = erealloc(r->b, bsz*2); if(ob != nil) r->p = r->b + (intptr)(ob - r->p); r->bsz = bsz*2; r->e = r->b + r->bsz; } static int handshake(int f) { u8int cl[1+Sigsz], sv[1+Sigsz]; cl[0] = 3; /* no encryption */ memset(cl+1, 0, 8); prng(cl+1+8, Sigsz-8); if(write(f, cl, sizeof(cl)) != sizeof(cl)) goto err; if(readn(f, sv, sizeof(sv)) != sizeof(sv)) goto err; if(cl[0] != sv[0]){ werrstr("expected %d (no encryption), got %d", cl[0], sv[0]); goto err; } if(write(f, sv+1, Sigsz) != Sigsz) goto err; if(readn(f, sv+1, Sigsz) != Sigsz) goto err; if(memcmp(cl, sv, sizeof(cl)) != 0){ werrstr("signature mismatch"); goto err; } return 0; err: werrstr("handshake: %r"); return -1; } static int rtmprecv(RTMP *r) { int hsz, bodysz, dummy, n; u8int *h, *e, type; u32int ts; memset(&r->pk, 0, sizeof(r->pk)); r->p = r->b; if(readn(r->i, r->p, 1) != 1) goto err; r->pk.ht = (r->p[0] & 0xc0)>>6; r->pk.chan = r->p[0] & 0x3f; hsz = szs[r->pk.ht]; if(readn(r->i, r->p+1, hsz-1) != hsz-1) goto err; h = r->p + 1; e = r->p + hsz; r->pk.type = -1; bodysz = 0; if(hsz >= szs[SzSmall]){ h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */ if(hsz >= szs[SzMedium]){ h = amfi24get(h, e, &bodysz); h = amfbyteget(h, e, &type); r->pk.type = type; if(hsz >= szs[SzLarge]){ dummy = 0; h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */ } } } if(ts == 0xffffff){ /* exntended timestamp */ if(readn(r->i, h, 4) != 4) goto err; h = amfi32get(h, h+4, (s32int*)&ts); } /* FIXME do all consecutive chunks use Tiny? */ bextend(r, bodysz); r->pk.data = h; r->pk.sz = bodysz; for(;;){ n = min(bodysz, r->chunk); if(readn(r->i, h, n) != n) goto err; bodysz -= n; h += n; if(bodysz < 1) break; if(readn(r->i, h, 1) != 1) goto err; if((r->pk.chan | SzTiny<<6) != *h){ werrstr("chan/size does not match: %02x", *h); goto err; } } if(debug) fprint(2, "→ %P\n", &r->pk); return 0; err: werrstr("rtmprecv: %r"); return -1; } static int rtmpsend(RTMP *r) { u8int *p, *h, *e, hdata[32]; int bodysz, n, hsz; Invoke *i; if(r->p == nil) goto err; r->pk.data = r->b; r->pk.sz = r->p - r->b; /* FIXME special case when bodysz is 0 */ hsz = szs[r->pk.ht]; h = hdata; e = h + hsz; *h++ = r->pk.ht<<6 | r->pk.chan; if(hsz >= szs[SzSmall]){ h = amfi24(h, e, 0); /* FIXME proper timestamps? */ if(hsz >= szs[SzMedium]){ h = amfi24(h, e, r->pk.sz); h = amfbyte(h, e, r->pk.type); if(hsz >= szs[SzLarge]) h = amfi32(h, e, 0); /* FIXME seems to be always 0? */ } } assert(h != nil); memset(h, 0, e-h); if(Bwrite(r, hdata, h-hdata) < 0) goto err; for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){ n = min(bodysz, r->chunk); if(Bwrite(r, p, n) < 0) goto err; p += n; bodysz -= n; if(bodysz > 0){ *h = r->pk.chan | SzTiny<<6; Bputc(r, *h); } } Bflush(r); if(debug) fprint(2, "← %P\n", &r->pk); if(r->pk.type == PktInvoke){ i = emalloc(sizeof(*i)); *i = r->pk.invoke; assert(i->cb != nil); if((i->next = r->invokes.w) != nil) i->next->prev = i; r->invokes.w = i; } return 0; err: werrstr("rtmpsend: %r"); return -1; } static void connected(RTMP *r, int ok, Amf *a[NumCbA], void *) { sendp(r->c, ok ? nil : smprint("%A", a[CbStatus])); } static int connect(RTMP *r) { newpacket(r, PktInvoke, SzLarge, ChanCtl); putinvoke("connect"); putkvstr("app", r->app); putkvstr("tcUrl", r->tcurl); if(r->mode & OWRITE) putkvstr("type", "nonprivate"); else{ putkvbool("fpad", 0); putkvnum("capabilities", 15); putkvnum("audioCodecs", 3191); putkvnum("videoCodecs", 252); putkvnum("videoFunction", 1); } putend(); r->pk.invoke.cb = connected; return rtmpsend(r); } static void rtmpfree(RTMP *r) { free(r->app); free(r->b); free(r->path); free(r->tcurl); if(r->c != nil){ sendp(r->c, "done"); chanfree(r->c); } Bterm(r); free(r); } static void loop(void *aux) { int res, n, ok; Amf *a[NumCbA]; u8int *s, *e; s16int s16; Packet *p; Invoke *i; RTMP *r; r = aux; p = &r->pk; res = 0; memset(a, 0, sizeof(a)); for(;;){ for(n = 0; n < nelem(a); n++) amffree(a[n]); memset(a, 0, sizeof(a)); if(res != 0 || (res = rtmprecv(r)) != 0){ if(debug) fprint(2, "rtmp loop: %r\n"); for(n = 0; n < nelem(a); n++) amffree(a[n]); break; } s = r->pk.data; e = s + r->pk.sz; switch(p->type){ case PktInvoke: i = nil; ok = 0; for(n = 0; n < NumCbA; n++){ if((s = amfparse(&a[n], s, e)) == nil) goto err; switch(n){ case CbWhat: if(a[n]->type != Tstr){ werrstr("invoke a[%d] not a string", n); goto err; } if(strcmp(a[n]->str, "_result") == 0) ok = 1; else if(strcmp(a[n]->str, "_error") == 0) ok = 0; else{ werrstr("unexpected a[%d]: %#q", n, a[n]->str); goto err; } break; case CbInvoke: if(a[n]->type != Tnum){ werrstr("invoke a[%d] not a number", n); goto err; } for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next); if(i == nil){ werrstr("did not expect invoke %d result", (int)a[n]->num); goto err; } break; } } if(i->prev != nil) i->prev->next = i->next; if(i->next != nil) i->next->prev = i->prev; if(r->invokes.w == i) r->invokes.w = i->next; i->cb(r, ok, a, i->aux); free(i); break; case PktChunkSz: if(amfi32get(s, e, &r->chunk) == nil) goto err; if(r->chunk < 2){ werrstr("invalid chunk size: %d", r->chunk); goto err; } if(debug) fprint(2, "new chunk size: %d bytes\n", r->chunk); break; case PktBytesReadReport: case PktControl: if((s = amfi16get(s, e, &s16)) == nil) goto err; if((s = amfi32get(s, e, &n)) == nil) n = -1; switch(s16){ case CtlStreamBegin: case CtlStreamEnd: case CtlStreamDry: case CtlStreamRecorded: case CtlBufferEmpty: case CtlBufferReady: if(0){ case CtlPing: /* FIXME pong */ USED(n); } if(debug) fprint(2, "control packet: %s %d\n", ctl2s[s16], n); break; default: if(debug) fprint(2, "unknown control packet %d (value %d)\n", s16, n); break; } break; case PktServerBW: case PktClientBW: case PktAudio: case PktVideo: case PktFlexStreamSend: case PktFlexSharedObj: case PktFlexInfo: case PktSharedObj: case PktFlashVideo: break; err: res = -1; break; } } rtmpfree(r); threadexitsall(res == 0 ? nil : "error"); } RTMP * rtmpdial(char *url, int w, int h, int withaudio) { char *s, *e, *path, *app; int f, port, ctl; RTMP *r; fmtinstall('A', amffmt); fmtinstall('T', pktypefmt); fmtinstall('P', pkfmt); quotefmtinstall(); r = nil; f = -1; url = estrdup(url); /* since we're changing it in-place */ if(memcmp(url, "rtmp://", 7) != 0){ werrstr("invalid url"); goto err; } s = url + 7; if((e = strpbrk(s, ":/")) == nil){ werrstr("no path"); goto err; } port = 1935; if(*e == ':'){ if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){ werrstr("invalid port"); goto err; } }else{ path = e; } while(*(++path) == '/'); s = smprint("tcp!%.*s!%d", (int)(e-s), s, port); f = dial(s, nil, nil, &ctl); free(s); if(f < 0) goto err; app = path; if((s = strchr(path, '/')) == nil){ werrstr("no path"); goto err; } if((e = strchr(s+1, '/')) != nil){ /* at this point it can be app instance if there is another slash following */ if((s = strchr(e+1, '/')) == nil){ /* no, just path leftovers */ s = e; } *s = 0; path = s+1; }else{ path = nil; } if(handshake(f) != 0) goto err; r = ecalloc(1, sizeof(*r)); r->i = f; r->chunk = Chunk; r->tcurl = url; url = nil; r->c = chancreate(sizeof(void*), 0); r->app = estrdup(app); r->path = path == nil ? nil : estrdup(path); bextend(r, Bufsz); Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf)); if(connect(r) != 0) goto err; if(proccreate(loop, r, mainstacksize) < 0) goto err; /* wait for the connect call to finish */ if((s = recvp(r->c)) != nil){ rtmpclose(r); werrstr("rtmpdial: %s", s); free(s); r = nil; } return r; err: werrstr("rtmpdial: %r"); if(r != nil) rtmpfree(r); if(f >= 0) close(f); free(url); return nil; } void rtmpclose(RTMP *r) { if(r == nil) return; if(r->i >= 0) close(r->i); if(r->c != nil) chanclose(r->c); }