ref: c6821e9c478e8c1e4effdba02cfe7d56a78c3cd9
dir: /rtmp.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include <libsec.h> #include "amf.h" #include "ivf.h" #include "rtmp.h" #define min(a,b) ((a)<(b)?(a):(b)) enum { Port = 1935, Sigsz = 1536, Chunk = 128, ChanCtl = 3, SzLarge = 0, SzMedium, SzSmall, SzTiny, PktChunkSz = 1, PktBytesReadReport, PktControl, PktServerBW, PktClientBW, PktAudio = 8, PktVideo, PktFlexStreamSend = 15, PktFlexSharedObj, PktFlexInfo, PktSharedObj, PktInvoke = 20, PktFlashVideo = 22, Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */ Bufsz = 64*1024, }; typedef struct Packet Packet; struct Packet { int type; int ht; int chan; u32int ts; u8int *data; int sz; int left; Packet *prev; Packet *next; }; struct RTMP { Biobufhdr; char *app; char *path; char *tcurl; Packet pk; Packet *ch; u8int *b, *p, *e; int mode; int bsz; int invokes; int i; u8int biobuf[Biobufsz]; }; #define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0) #define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0) #define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0) #define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0) #define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0) #define putarr() do{ r->p = amfarr(r->p, r->e); }while(0) #define putobj() do{ r->p = amfobj(r->p, r->e); }while(0) #define putend() do{ r->p = amfend(r->p, r->e); }while(0) #define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0) #define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0) #define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0) static szs[] = { [SzTiny] = 1, [SzSmall] = 4, [SzMedium] = 8, [SzLarge] = 12, }; static char *pktype2s[] = { [PktChunkSz] = "ChunkSz", [PktBytesReadReport] = "BytesReadReport", [PktControl] = "Control", [PktServerBW] = "ServerBW", [PktClientBW] = "ClientBW", [PktAudio] = "Audio", [PktVideo] = "Video", [PktFlexStreamSend] = "FlexStreamSend", [PktFlexSharedObj] = "FlexSharedObj", [PktFlexInfo] = "FlexInfo", [PktSharedObj] = "SharedObj", [PktInvoke] = "Invoke", [PktFlashVideo] = "FlashVideo", }; int rtmpdump = 0; extern int debug; #pragma varargck type "T" int static int pktypefmt(Fmt *f) { char *s; int t; if((t = va_arg(f->args, int)) >= 0 && t < nelem(pktype2s) && (s = pktype2s[t]) != nil) return fmtprint(f, "%s", s); return fmtprint(f, "%d", t); } #pragma varargck type "P" Packet* static int pkfmt(Fmt *f) { Packet *p; p = va_arg(f->args, Packet*); return fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz); } static Packet * pk4chan(RTMP *r, int chan) { Packet *p; for(p = r->ch; p != nil && p->chan != chan; p = p->next); if(p == nil){ if((p = calloc(1, sizeof(*p))) == nil) sysfatal("memory"); p->type = -1; p->chan = chan; if((p->next = r->ch) != nil) r->ch->prev = p; } return p; } static void pkfree(RTMP *r, Packet *p) { if(p->prev != nil) p->prev->next = p->next; if(p->next != nil) p->next->prev = p->prev; if(r->ch == p) r->ch = p->next; free(p->data); } static void newpacket(RTMP *r, int type, int ht, int chan) { memset(&r->pk, 0, sizeof(r->pk)); r->pk.type = type; r->pk.ht = ht; r->pk.chan = chan; r->p = r->b; } static void bextend(RTMP *r, int bsz) { u8int *ob; if(r->bsz >= bsz) return; ob = r->b; if((r->b = realloc(r->b, bsz*2)) == nil) sysfatal("memory"); if(ob != nil) r->p = r->b + (intptr)(ob - r->p); r->bsz = bsz*2; r->e = r->b + r->bsz; } static int handshake(int f) { u8int cl[1+Sigsz], sv[1+Sigsz]; cl[0] = 3; /* no encryption */ memset(cl+1, 0, 8); prng(cl+1+8, Sigsz-8); if(write(f, cl, sizeof(cl)) != sizeof(cl)) goto err; if(readn(f, sv, sizeof(sv)) != sizeof(sv)) goto err; if(cl[0] != sv[0]){ werrstr("expected %d (no encryption), got %d", cl[0], sv[0]); goto err; } if(write(f, sv+1, Sigsz) != Sigsz) goto err; if(readn(f, sv+1, Sigsz) != Sigsz) goto err; if(memcmp(cl, sv, sizeof(cl)) != 0){ werrstr("signature mismatch"); goto err; } return 0; err: werrstr("handshake: %r"); return -1; } static int rtmprecv(RTMP *r) { int hsz, bodysz, dummy, n; u8int *h, *e, type; u32int ts; memset(&r->pk, 0, sizeof(r->pk)); r->p = r->b; if(readn(r->i, r->p, 1) != 1) goto err; r->pk.ht = (r->p[0] & 0xc0)>>6; r->pk.chan = r->p[0] & 0x3f; hsz = szs[r->pk.ht]; if(readn(r->i, r->p+1, hsz-1) != hsz-1) goto err; h = r->p + 1; e = r->p + hsz; r->pk.type = -1; bodysz = 0; if(hsz >= szs[SzSmall]){ h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */ if(hsz >= szs[SzMedium]){ h = amfi24get(h, e, &bodysz); h = amfbyteget(h, e, &type); r->pk.type = type; if(hsz >= szs[SzLarge]){ dummy = 0; h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */ } } } if(ts == 0xffffff){ /* exntended timestamp */ if(readn(r->i, h, 4) != 4) goto err; h = amfi32get(h, h+4, (s32int*)&ts); } /* FIXME do all consecutive chunks use Tiny? */ bextend(r, bodysz); r->pk.data = h; r->pk.sz = bodysz; for(;;){ n = min(bodysz, Chunk); if(readn(r->i, h, n) != n) goto err; bodysz -= n; h += n; if(bodysz < 1) break; if(readn(r->i, h, 1) != 1) goto err; if((r->pk.chan | SzTiny<<6) != *h){ werrstr("chan/size does not match: %02x", *h); goto err; } } if(rtmpdump) write(1, r->pk.data, r->pk.sz); if(debug) fprint(2, "→ %P\n", &r->pk); return 0; err: werrstr("rtmprecv: %r"); return -1; } static int rtmpsend(RTMP *r) { int bodysz, n, hsz; u8int *p, *h, *e, hdata[32]; r->pk.data = r->b; r->pk.sz = r->p - r->b; /* FIXME special case when bodysz is 0 */ hsz = szs[r->pk.ht]; h = hdata; e = h + hsz; *h++ = r->pk.ht<<6 | r->pk.chan; if(hsz >= szs[SzSmall]){ h = amfi24(h, e, 0); /* FIXME proper timestamps? */ if(hsz >= szs[SzMedium]){ h = amfi24(h, e, r->pk.sz); h = amfbyte(h, e, r->pk.type); if(hsz >= szs[SzLarge]) h = amfi32(h, e, 0); /* FIXME seems to be always 0? */ } } assert(h != nil); memset(h, 0, e-h); if(Bwrite(r, hdata, h-hdata) < 0) goto err; if(rtmpdump) write(1, hdata, h-hdata); for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){ n = min(bodysz, Chunk); if(Bwrite(r, p, n) < 0) goto err; if(rtmpdump) write(1, p, n); p += n; bodysz -= n; if(bodysz > 0){ *h = r->pk.chan | SzTiny<<6; Bputc(r, *h); if(rtmpdump) write(1, h, 1); } } Bflush(r); if(debug) fprint(2, "← %P\n", &r->pk); return 0; err: werrstr("rtmpsend: %r"); return -1; } static int connect(RTMP *r) { newpacket(r, PktInvoke, SzLarge, ChanCtl); putstr("connect"); putnum(++r->invokes); putobj(); putkvstr("app", r->app); putkvstr("tcUrl", r->tcurl); if(r->mode & OWRITE) putkvstr("type", "nonprivate"); else{ putkvbool("fpad", 0); putkvnum("capabilities", 15); putkvnum("audioCodecs", 3191); putkvnum("videoCodecs", 252); putkvnum("videoFunction", 1); } putend(); return rtmpsend(r); } RTMP * rtmpdial(char *url, int w, int h, int withaudio) { char *s, *e, *path, *app; int f, port, ctl; RTMP *r; fmtinstall('T', pktypefmt); fmtinstall('P', pkfmt); r = nil; f = -1; url = strdup(url); /* since we're changing it in-place */ if(memcmp(url, "rtmp://", 7) != 0){ werrstr("invalid url"); goto err; } s = url + 7; if((e = strpbrk(s, ":/")) == nil){ werrstr("no path"); goto err; } port = 1935; if(*e == ':'){ if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){ werrstr("invalid port"); goto err; } }else{ path = e; } while(*(++path) == '/'); s = smprint("tcp!%.*s!%d", (int)(e-s), s, port); f = dial(s, nil, nil, &ctl); free(s); if(f < 0) goto err; app = path; if((s = strchr(path, '/')) == nil){ werrstr("no path"); goto err; } if((e = strchr(s+1, '/')) != nil){ /* at this point it can be app instance if there is another slash following */ if((s = strchr(e+1, '/')) == nil){ /* no, just path leftovers */ s = e; } *s = 0; path = s+1; }else{ path = nil; } if(handshake(f) != 0) goto err; if((r = calloc(1, sizeof(*r))) == nil) sysfatal("memory"); if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil)) sysfatal("memory"); bextend(r, Bufsz); r->tcurl = url; Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf)); r->i = f; if(connect(r) != 0 || rtmprecv(r) != 0) goto err; return r; err: werrstr("rtmpdial: %r"); if(r != nil) rtmpclose(r); else if(f >= 0) close(f); free(url); return nil; } void rtmpclose(RTMP *r) { if(r == nil) return; free(r->path); free(r->b); close(r->i); Bterm(r); free(r); }