ref: f6d84677e9cd1cd4725391a577295ff561e09443
dir: /rtmp.c/
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf.h"
#include "ivf.h"
#include "rtmp.h"
#define min(a,b) ((a)<(b)?(a):(b))
enum {
Port = 1935,
Sigsz = 1536,
Chunk = 128,
ChanCtl = 3,
SzTiny = 1,
SzSmall = 4,
SzMedium = 8,
SzLarge = 12,
PktInvoke = 20,
Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
Bufsz = 64*1024,
};
typedef struct Packet Packet;
struct Packet {
int type;
int hsz;
int chan;
u32int ts;
u8int *data;
int sz;
int left;
Packet *prev;
Packet *next;
};
struct RTMP {
Biobufhdr;
char *app;
char *path;
char *tcurl;
Packet pk;
Packet *ch;
u8int *b, *p, *e;
int mode;
int bsz;
int invokes;
int i;
u8int biobuf[Biobufsz];
};
#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)
int rtmpdump = 0;
extern int debug;
static Packet *
pk4chan(RTMP *r, int chan)
{
Packet *p;
for(p = r->ch; p != nil && p->chan != chan; p = p->next);
if(p == nil){
if((p = calloc(1, sizeof(*p))) == nil)
sysfatal("memory");
p->type = -1;
p->chan = chan;
if((p->next = r->ch) != nil)
r->ch->prev = p;
}
return p;
}
static void
pkfree(RTMP *r, Packet *p)
{
if(p->prev != nil)
p->prev->next = p->next;
if(p->next != nil)
p->next->prev = p->prev;
if(r->ch == p)
r->ch = p->next;
free(p->data);
}
static void
newpacket(RTMP *r, int type, int hsz, int chan)
{
memset(&r->pk, 0, sizeof(r->pk));
r->pk.type = type;
r->pk.hsz = hsz;
r->pk.chan = chan;
r->p = r->b + hsz;
r->b[0] = chan;
switch(hsz){
case SzTiny: r->b[0] |= 3<<6; break;
case SzSmall: r->b[0] |= 2<<6; break;
case SzMedium: r->b[0] |= 1<<6; break;
}
}
static void
bextend(RTMP *r, int bsz)
{
u8int *ob;
if(r->bsz >= bsz)
return;
ob = r->b;
if((r->b = realloc(r->b, bsz*2)) == nil)
sysfatal("memory");
if(ob != nil)
r->p = r->b + (intptr)(ob - r->p);
r->bsz *= 2;
r->e = r->b + r->bsz;
}
static int
handshake(int f)
{
u8int cl[1+Sigsz], sv[1+Sigsz];
cl[0] = 3; /* no encryption */
memset(cl+1, 0, 8);
prng(cl+1+8, Sigsz-8);
if(write(f, cl, sizeof(cl)) != sizeof(cl))
goto err;
if(readn(f, sv, sizeof(sv)) != sizeof(sv))
goto err;
if(cl[0] != sv[0]){
werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
goto err;
}
if(write(f, sv+1, Sigsz) != Sigsz)
goto err;
if(readn(f, sv+1, Sigsz) != Sigsz)
goto err;
if(memcmp(cl, sv, sizeof(cl)) != 0){
werrstr("signature mismatch");
goto err;
}
return 0;
err:
werrstr("handshake: %r");
return -1;
}
static int
rtmprecv(RTMP *r)
{
u8int *h, *e, type;
int hsz, chan, bodysz, dummy;
u32int ts;
r->p = r->b;
if(readn(r->i, r->p, 1) != 1)
goto err;
hsz = (r->p[0] & 0xc0)>>6;
chan = r->p[0] & 0x3f;
if(debug)
fprint(2, "hsz=%d chan=%d\n", hsz, chan);
if(readn(r->i, r->p+1, hsz-1) != hsz-1)
goto err;
memset(&r->pk, 0, sizeof(r->pk));
r->pk.hsz = hsz;
r->pk.chan = chan;
h = r->p + 1;
e = r->p + hsz;
r->pk.type = -1;
bodysz = 0;
if(hsz >= SzSmall){
h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
if(hsz >= SzMedium){
h = amfi24get(h, e, &bodysz);
h = amfbyteget(h, e, &type);
r->pk.type = type;
if(hsz >= SzLarge)
h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
}
}
if(ts == 0xffffff){ /* exntended timestamp */
if(readn(r->i, h, 4) != 4)
goto err;
h = amfi32get(h, h+4, (s32int*)&ts);
}
bextend(r, bodysz);
if(readn(r->i, h, bodysz) != bodysz)
goto err;
h += bodysz;
return 0;
err:
werrstr("rtmprecv: %r");
return -1;
}
static int
rtmpsend(RTMP *r)
{
int bodysz, n, hsz;
u8int *p, *h, *e;
assert(r->p != nil);
bodysz = r->p - r->b - r->pk.hsz;
/* FIXME special case when bodysz is 0 */
h = r->b;
e = h + r->pk.hsz;
h++;
if(r->pk.hsz >= SzSmall){
h = amfi24(h, e, 0); /* FIXME proper timestamps? */
if(r->pk.hsz >= SzMedium){
h = amfi24(h, e, bodysz);
h = amfbyte(h, e, r->pk.type);
if(r->pk.hsz >= SzLarge)
h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
}
}
if(h == nil)
goto err;
memset(h, 0, e-h);
p = r->b;
hsz = e - r->b;
for(; hsz+bodysz > 0;){
n = min(bodysz, Chunk);
if(Bwrite(r, p, hsz+n) < 0)
goto err;
if(rtmpdump)
write(1, p, hsz+n);
bodysz -= n;
p += hsz+n;
hsz = 0;
if(bodysz > 0){
*(--p) = 0xc0 | r->b[0];
hsz = 1;
}
}
r->p = nil;
Bflush(r);
return 0;
err:
werrstr("rtmpsend: %r");
return -1;
}
static int
connect(RTMP *r)
{
newpacket(r, PktInvoke, SzLarge, ChanCtl);
putstr("connect");
putnum(++r->invokes);
putobj();
putkvstr("app", r->app);
putkvstr("tcUrl", r->tcurl);
if(r->mode & OWRITE)
putkvstr("type", "nonprivate");
else{
putkvbool("fpad", 0);
putkvnum("capabilities", 15);
putkvnum("audioCodecs", 3191);
putkvnum("videoCodecs", 252);
putkvnum("videoFunction", 1);
}
putend();
return rtmpsend(r);
}
RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
char *s, *e, *path, *app;
int f, port, ctl;
RTMP *r;
r = nil;
f = -1;
url = strdup(url); /* since we're changing it in-place */
if(memcmp(url, "rtmp://", 7) != 0){
werrstr("invalid url");
goto err;
}
s = url + 7;
if((e = strpbrk(s, ":/")) == nil){
werrstr("no path");
goto err;
}
port = 1935;
if(*e == ':'){
if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){
werrstr("invalid port");
goto err;
}
}else{
path = e;
}
while(*(++path) == '/');
s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
f = dial(s, nil, nil, &ctl);
free(s);
if(f < 0)
goto err;
app = path;
if((s = strchr(path, '/')) == nil){
werrstr("no path");
goto err;
}
if((e = strchr(s+1, '/')) != nil){
/* at this point it can be app instance if there is another slash following */
if((s = strchr(e+1, '/')) == nil){
/* no, just path leftovers */
s = e;
}
*s = 0;
path = s+1;
}else{
path = nil;
}
if(handshake(f) != 0)
goto err;
if((r = calloc(1, sizeof(*r))) == nil)
sysfatal("memory");
if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil))
sysfatal("memory");
bextend(r, Bufsz);
r->tcurl = url;
Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
r->i = f;
if(connect(r) != 0 || rtmprecv(r) != 0)
goto err;
return r;
err:
werrstr("rtmpdial: %r");
if(r != nil)
rtmpclose(r);
else if(f >= 0)
close(f);
free(url);
return nil;
}
void
rtmpclose(RTMP *r)
{
if(r == nil)
return;
free(r->path);
free(r->b);
close(r->i);
Bterm(r);
free(r);
}