ref: 3acc9f1c414b3032531e1db73e77bb4af0f8ed38
dir: /rtmp.c/
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf.h"
#include "ivf.h"
#include "rtmp.h"
#include "util.h"
#define min(a,b) ((a)<(b)?(a):(b))
enum {
SzLarge,
SzMedium,
SzSmall,
SzTiny,
Port = 1935,
Sigsz = 1536,
Chunk = 128,
ChanCtl = 3,
CbWhat = 0,
CbInvoke,
CbData,
CbStatus,
NumCbA,
PktChunkSz = 1,
PktBytesReadReport = 3,
PktControl = 4,
PktServerBW = 5,
PktClientBW,
PktAudio = 8,
PktVideo,
PktFlexStreamSend = 15,
PktFlexSharedObj,
PkgFlexMessage,
PktFlexInfo,
PktSharedObj = 19,
PktInvoke = 20,
PktFlashVideo = 22,
Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
Bufsz = 64*1024,
};
typedef struct Invoke Invoke;
typedef struct Packet Packet;
struct Invoke {
void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux);
void *aux;
int n;
Invoke *prev, *next;
};
struct Packet {
int type;
int ht;
int chan;
u32int ts;
u8int *data;
int sz;
Invoke invoke;
};
struct RTMP {
Biobufhdr;
Channel *c;
char *app;
char *path;
char *tcurl;
Packet pk;
u8int *b, *p, *e;
int chunk;
int mode;
int bsz;
int i;
struct {
int n;
Invoke *w;
}invokes;
u8int biobuf[Biobufsz];
};
#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
#define putinvoke(name) do { \
putstr(name); \
putnum(r->pk.invoke.n); \
putobj(); \
}while(0)
static szs[] = {
[SzTiny] = 1,
[SzSmall] = 4,
[SzMedium] = 8,
[SzLarge] = 12,
};
static char *pktype2s[] = {
[PktChunkSz] = "ChunkSz",
[PktBytesReadReport] = "BytesReadReport",
[PktControl] = "Control",
[PktServerBW] = "ServerBW",
[PktClientBW] = "ClientBW",
[PktAudio] = "Audio",
[PktVideo] = "Video",
[PktFlexStreamSend] = "FlexStreamSend",
[PktFlexSharedObj] = "FlexSharedObj",
[PktFlexInfo] = "FlexInfo",
[PktSharedObj] = "SharedObj",
[PktInvoke] = "Invoke",
[PktFlashVideo] = "FlashVideo",
};
extern int debug;
#pragma varargck type "T" int
static int
pktypefmt(Fmt *f)
{
char *s;
int t;
if((t = va_arg(f->args, int)) >= 0 &&
t < nelem(pktype2s) &&
(s = pktype2s[t]) != nil)
return fmtprint(f, "%s", s);
return fmtprint(f, "%d", t);
}
#pragma varargck type "P" Packet*
static int
pkfmt(Fmt *f)
{
u8int *s, *e;
Packet *p;
Amf *a;
p = va_arg(f->args, Packet*);
fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
if(p->type == PktInvoke){
fmtprint(f, ":");
for(s = p->data, e = s + p->sz; s != nil && s != e;){
if((s = amfparse(&a, s, e)) != nil)
fmtprint(f, " %A", a);
else
fmtprint(f, " %r");
amffree(a);
}
}
return 0;
}
static void
newpacket(RTMP *r, int type, int ht, int chan)
{
memset(&r->pk, 0, sizeof(r->pk));
r->pk.type = type;
r->pk.ht = ht;
r->pk.chan = chan;
r->p = r->b;
if(type == PktInvoke)
r->pk.invoke.n = ++r->invokes.n;
}
static void
bextend(RTMP *r, int bsz)
{
u8int *ob;
if(r->bsz >= bsz)
return;
ob = r->b;
r->b = erealloc(r->b, bsz*2);
if(ob != nil)
r->p = r->b + (intptr)(ob - r->p);
r->bsz = bsz*2;
r->e = r->b + r->bsz;
}
static int
handshake(int f)
{
u8int cl[1+Sigsz], sv[1+Sigsz];
cl[0] = 3; /* no encryption */
memset(cl+1, 0, 8);
prng(cl+1+8, Sigsz-8);
if(write(f, cl, sizeof(cl)) != sizeof(cl))
goto err;
if(readn(f, sv, sizeof(sv)) != sizeof(sv))
goto err;
if(cl[0] != sv[0]){
werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
goto err;
}
if(write(f, sv+1, Sigsz) != Sigsz)
goto err;
if(readn(f, sv+1, Sigsz) != Sigsz)
goto err;
if(memcmp(cl, sv, sizeof(cl)) != 0){
werrstr("signature mismatch");
goto err;
}
return 0;
err:
werrstr("handshake: %r");
return -1;
}
static int
rtmprecv(RTMP *r)
{
int hsz, bodysz, dummy, n;
u8int *h, *e, type;
u32int ts;
memset(&r->pk, 0, sizeof(r->pk));
r->p = r->b;
if(readn(r->i, r->p, 1) != 1)
goto err;
r->pk.ht = (r->p[0] & 0xc0)>>6;
r->pk.chan = r->p[0] & 0x3f;
hsz = szs[r->pk.ht];
if(readn(r->i, r->p+1, hsz-1) != hsz-1)
goto err;
h = r->p + 1;
e = r->p + hsz;
r->pk.type = -1;
bodysz = 0;
if(hsz >= szs[SzSmall]){
h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
if(hsz >= szs[SzMedium]){
h = amfi24get(h, e, &bodysz);
h = amfbyteget(h, e, &type);
r->pk.type = type;
if(hsz >= szs[SzLarge]){
dummy = 0;
h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
}
}
}
if(ts == 0xffffff){ /* exntended timestamp */
if(readn(r->i, h, 4) != 4)
goto err;
h = amfi32get(h, h+4, (s32int*)&ts);
}
/* FIXME do all consecutive chunks use Tiny? */
bextend(r, bodysz);
r->pk.data = h;
r->pk.sz = bodysz;
for(;;){
n = min(bodysz, r->chunk);
if(readn(r->i, h, n) != n)
goto err;
bodysz -= n;
h += n;
if(bodysz < 1)
break;
if(readn(r->i, h, 1) != 1)
goto err;
if((r->pk.chan | SzTiny<<6) != *h){
werrstr("chan/size does not match: %02x", *h);
goto err;
}
}
if(debug)
fprint(2, "→ %P\n", &r->pk);
return 0;
err:
werrstr("rtmprecv: %r");
return -1;
}
static int
rtmpsend(RTMP *r)
{
u8int *p, *h, *e, hdata[32];
int bodysz, n, hsz;
Invoke *i;
if(r->p == nil)
goto err;
r->pk.data = r->b;
r->pk.sz = r->p - r->b;
/* FIXME special case when bodysz is 0 */
hsz = szs[r->pk.ht];
h = hdata;
e = h + hsz;
*h++ = r->pk.ht<<6 | r->pk.chan;
if(hsz >= szs[SzSmall]){
h = amfi24(h, e, 0); /* FIXME proper timestamps? */
if(hsz >= szs[SzMedium]){
h = amfi24(h, e, r->pk.sz);
h = amfbyte(h, e, r->pk.type);
if(hsz >= szs[SzLarge])
h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
}
}
assert(h != nil);
memset(h, 0, e-h);
if(Bwrite(r, hdata, h-hdata) < 0)
goto err;
for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
n = min(bodysz, r->chunk);
if(Bwrite(r, p, n) < 0)
goto err;
p += n;
bodysz -= n;
if(bodysz > 0){
*h = r->pk.chan | SzTiny<<6;
Bputc(r, *h);
}
}
Bflush(r);
if(debug)
fprint(2, "← %P\n", &r->pk);
if(r->pk.type == PktInvoke){
i = emalloc(sizeof(*i));
*i = r->pk.invoke;
assert(i->cb != nil);
if((i->next = r->invokes.w) != nil)
i->next->prev = i;
r->invokes.w = i;
}
return 0;
err:
werrstr("rtmpsend: %r");
return -1;
}
static void
connected(RTMP *r, int ok, Amf *a[NumCbA], void *)
{
sendp(r->c, ok ? nil : smprint("%A", a[CbStatus]));
}
static int
connect(RTMP *r)
{
newpacket(r, PktInvoke, SzLarge, ChanCtl);
putinvoke("connect");
putkvstr("app", r->app);
putkvstr("tcUrl", r->tcurl);
if(r->mode & OWRITE)
putkvstr("type", "nonprivate");
else{
putkvbool("fpad", 0);
putkvnum("capabilities", 15);
putkvnum("audioCodecs", 3191);
putkvnum("videoCodecs", 252);
putkvnum("videoFunction", 1);
}
putend();
r->pk.invoke.cb = connected;
return rtmpsend(r);
}
static void
rtmpfree(RTMP *r)
{
free(r->app);
free(r->b);
free(r->path);
free(r->tcurl);
if(r->c != nil){
sendp(r->c, "done");
chanfree(r->c);
}
Bterm(r);
free(r);
}
static void
loop(void *aux)
{
int res, n, ok;
Amf *a[NumCbA];
u8int *s, *e;
Packet *p;
Invoke *i;
RTMP *r;
r = aux;
p = &r->pk;
res = 0;
memset(a, 0, sizeof(a));
for(;;){
for(n = 0; n < nelem(a); n++)
amffree(a[n]);
memset(a, 0, sizeof(a));
if(res != 0 || (res = rtmprecv(r)) != 0){
if(debug)
fprint(2, "rtmp loop: %r\n");
for(n = 0; n < nelem(a); n++)
amffree(a[n]);
break;
}
switch(p->type){
case PktInvoke:
i = nil;
ok = 0;
s = r->pk.data;
e = s + r->pk.sz;
for(n = 0; n < NumCbA; n++){
if((s = amfparse(&a[n], s, e)) == nil)
goto err;
switch(n){
case CbWhat:
if(a[n]->type != Tstr){
werrstr("invoke a[%d] not a string", n);
goto err;
}
if(strcmp(a[n]->str, "_result") == 0)
ok = 1;
else if(strcmp(a[n]->str, "_error") == 0)
ok = 0;
else{
werrstr("unexpected a[%d]: %#q", n, a[n]->str);
goto err;
}
break;
case CbInvoke:
if(a[n]->type != Tnum){
werrstr("invoke a[%d] not a number", n);
goto err;
}
for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next);
if(i == nil){
werrstr("did not expect invoke %d result", (int)a[n]->num);
goto err;
}
break;
}
}
if(i->prev != nil)
i->prev->next = i->next;
if(i->next != nil)
i->next->prev = i->prev;
if(r->invokes.w == i)
r->invokes.w = i->next;
i->cb(r, ok, a, i->aux);
free(i);
break;
case PktChunkSz:
if(amfi32get(r->pk.data, r->pk.data+r->pk.sz, &r->chunk) == nil)
goto err;
if(r->chunk < 2){
werrstr("invalid chunk size: %d", r->chunk);
goto err;
}
if(debug)
fprint(2, "new chunk size: %d bytes\n", r->chunk);
break;
case PktBytesReadReport:
case PktControl:
case PktServerBW:
case PktClientBW:
case PktAudio:
case PktVideo:
case PktFlexStreamSend:
case PktFlexSharedObj:
case PktFlexInfo:
case PktSharedObj:
case PktFlashVideo:
break;
err:
res = -1;
break;
}
}
rtmpfree(r);
threadexitsall(res == 0 ? nil : "error");
}
RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
char *s, *e, *path, *app;
int f, port, ctl;
RTMP *r;
fmtinstall('A', amffmt);
fmtinstall('T', pktypefmt);
fmtinstall('P', pkfmt);
quotefmtinstall();
r = nil;
f = -1;
url = estrdup(url); /* since we're changing it in-place */
if(memcmp(url, "rtmp://", 7) != 0){
werrstr("invalid url");
goto err;
}
s = url + 7;
if((e = strpbrk(s, ":/")) == nil){
werrstr("no path");
goto err;
}
port = 1935;
if(*e == ':'){
if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){
werrstr("invalid port");
goto err;
}
}else{
path = e;
}
while(*(++path) == '/');
s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
f = dial(s, nil, nil, &ctl);
free(s);
if(f < 0)
goto err;
app = path;
if((s = strchr(path, '/')) == nil){
werrstr("no path");
goto err;
}
if((e = strchr(s+1, '/')) != nil){
/* at this point it can be app instance if there is another slash following */
if((s = strchr(e+1, '/')) == nil){
/* no, just path leftovers */
s = e;
}
*s = 0;
path = s+1;
}else{
path = nil;
}
if(handshake(f) != 0)
goto err;
r = ecalloc(1, sizeof(*r));
r->i = f;
r->chunk = Chunk;
r->tcurl = url;
url = nil;
r->c = chancreate(sizeof(void*), 0);
r->app = estrdup(app);
r->path = path == nil ? nil : estrdup(path);
bextend(r, Bufsz);
Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
if(connect(r) != 0)
goto err;
if(proccreate(loop, r, mainstacksize) < 0)
goto err;
/* wait for the connect call to finish */
if((s = recvp(r->c)) != nil){
rtmpclose(r);
werrstr("rtmpdial: %s", s);
free(s);
r = nil;
}
return r;
err:
werrstr("rtmpdial: %r");
if(r != nil)
rtmpfree(r);
if(f >= 0)
close(f);
free(url);
return nil;
}
void
rtmpclose(RTMP *r)
{
if(r == nil)
return;
if(r->i >= 0)
close(r->i);
if(r->c != nil)
chanclose(r->c);
}