shithub: rtmp

ref: c6821e9c478e8c1e4effdba02cfe7d56a78c3cd9
dir: /rtmp.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf.h"
#include "ivf.h"
#include "rtmp.h"

#define min(a,b) ((a)<(b)?(a):(b))

enum {
	Port = 1935,

	Sigsz = 1536,
	Chunk = 128,

	ChanCtl = 3,

	SzLarge = 0,
	SzMedium,
	SzSmall,
	SzTiny,

	PktChunkSz = 1,
	PktBytesReadReport,
	PktControl,
	PktServerBW,
	PktClientBW,
	PktAudio = 8,
	PktVideo,
	PktFlexStreamSend = 15,
	PktFlexSharedObj,
	PktFlexInfo,
	PktSharedObj,
	PktInvoke = 20,
	PktFlashVideo = 22,

	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
	Bufsz = 64*1024,
};

typedef struct Packet Packet;

struct Packet {
	int type;
	int ht;
	int chan;
	u32int ts;
	u8int *data;
	int sz;
	int left;
	Packet *prev;
	Packet *next;
};

struct RTMP {
	Biobufhdr;
	char *app;
	char *path;
	char *tcurl;
	Packet pk;
	Packet *ch;
	u8int *b, *p, *e;
	int mode;
	int bsz;
	int invokes;
	int i;
	u8int biobuf[Biobufsz];
};

#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)

static szs[] = {
	[SzTiny] = 1,
	[SzSmall] = 4,
	[SzMedium] = 8,
	[SzLarge] = 12,
};

static char *pktype2s[] = {
	[PktChunkSz] = "ChunkSz",
	[PktBytesReadReport] = "BytesReadReport",
	[PktControl] = "Control",
	[PktServerBW] = "ServerBW",
	[PktClientBW] = "ClientBW",
	[PktAudio] = "Audio",
	[PktVideo] = "Video",
	[PktFlexStreamSend] = "FlexStreamSend",
	[PktFlexSharedObj] = "FlexSharedObj",
	[PktFlexInfo] = "FlexInfo",
	[PktSharedObj] = "SharedObj",
	[PktInvoke] = "Invoke",
	[PktFlashVideo] = "FlashVideo",
};

int rtmpdump = 0;
extern int debug;

#pragma varargck type "T" int
static int
pktypefmt(Fmt *f)
{
	char *s;
	int t;

	if((t = va_arg(f->args, int)) >= 0 &&
	   t < nelem(pktype2s) &&
	   (s = pktype2s[t]) != nil)
		return fmtprint(f, "%s", s);

	return fmtprint(f, "%d", t);
}

#pragma varargck type "P" Packet*
static int
pkfmt(Fmt *f)
{
	Packet *p;

	p = va_arg(f->args, Packet*);

	return fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);
}

static Packet *
pk4chan(RTMP *r, int chan)
{
	Packet *p;

	for(p = r->ch; p != nil && p->chan != chan; p = p->next);

	if(p == nil){
		if((p = calloc(1, sizeof(*p))) == nil)
			sysfatal("memory");
		p->type = -1;
		p->chan = chan;
		if((p->next = r->ch) != nil)
			r->ch->prev = p;
	}

	return p;
}

static void
pkfree(RTMP *r, Packet *p)
{
	if(p->prev != nil)
		p->prev->next = p->next;
	if(p->next != nil)
		p->next->prev = p->prev;
	if(r->ch == p)
		r->ch = p->next;

	free(p->data);
}

static void
newpacket(RTMP *r, int type, int ht, int chan)
{
	memset(&r->pk, 0, sizeof(r->pk));

	r->pk.type = type;
	r->pk.ht = ht;
	r->pk.chan = chan;
	r->p = r->b;
}

static void
bextend(RTMP *r, int bsz)
{
	u8int *ob;

	if(r->bsz >= bsz)
		return;
	ob = r->b;
	if((r->b = realloc(r->b, bsz*2)) == nil)
		sysfatal("memory");
	if(ob != nil)
		r->p = r->b + (intptr)(ob - r->p);
	r->bsz = bsz*2;
	r->e = r->b + r->bsz;
}

static int
handshake(int f)
{
	u8int cl[1+Sigsz], sv[1+Sigsz];

	cl[0] = 3; /* no encryption */
	memset(cl+1, 0, 8);
	prng(cl+1+8, Sigsz-8);
	if(write(f, cl, sizeof(cl)) != sizeof(cl))
		goto err;
	if(readn(f, sv, sizeof(sv)) != sizeof(sv))
		goto err;
	if(cl[0] != sv[0]){
		werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
		goto err;
	}
	if(write(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(readn(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(memcmp(cl, sv, sizeof(cl)) != 0){
		werrstr("signature mismatch");
		goto err;
	}

	return 0;

err:
	werrstr("handshake: %r");
	return -1;
}

static int
rtmprecv(RTMP *r)
{
	int hsz, bodysz, dummy, n;
	u8int *h, *e, type;
	u32int ts;

	memset(&r->pk, 0, sizeof(r->pk));

	r->p = r->b;
	if(readn(r->i, r->p, 1) != 1)
		goto err;
	r->pk.ht = (r->p[0] & 0xc0)>>6;
	r->pk.chan = r->p[0] & 0x3f;
	hsz = szs[r->pk.ht];
	if(readn(r->i, r->p+1, hsz-1) != hsz-1)
		goto err;

	h = r->p + 1;
	e = r->p + hsz;
	r->pk.type = -1;
	bodysz = 0;
	if(hsz >= szs[SzSmall]){
		h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
		if(hsz >= szs[SzMedium]){
			h = amfi24get(h, e, &bodysz);
			h = amfbyteget(h, e, &type);
			r->pk.type = type;
			if(hsz >= szs[SzLarge]){
				dummy = 0;
				h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
			}
		}
	}

	if(ts == 0xffffff){ /* exntended timestamp */
		if(readn(r->i, h, 4) != 4)
			goto err;
		h = amfi32get(h, h+4, (s32int*)&ts);
	}

	/* FIXME do all consecutive chunks use Tiny? */
	bextend(r, bodysz);
	r->pk.data = h;
	r->pk.sz = bodysz;
	for(;;){
		n = min(bodysz, Chunk);
		if(readn(r->i, h, n) != n)
			goto err;
		bodysz -= n;
		h += n;
		if(bodysz < 1)
			break;
		if(readn(r->i, h, 1) != 1)
			goto err;
		if((r->pk.chan | SzTiny<<6) != *h){
			werrstr("chan/size does not match: %02x", *h);
			goto err;
		}
	}
	if(rtmpdump)
		write(1, r->pk.data, r->pk.sz);

	if(debug)
		fprint(2, "→ %P\n", &r->pk);

	return 0;

err:
	werrstr("rtmprecv: %r");
	return -1;
}

static int
rtmpsend(RTMP *r)
{
	int bodysz, n, hsz;
	u8int *p, *h, *e, hdata[32];

	r->pk.data = r->b;
	r->pk.sz = r->p - r->b;
	/* FIXME special case when bodysz is 0 */

	hsz = szs[r->pk.ht];
	h = hdata;
	e = h + hsz;
	*h++ = r->pk.ht<<6 | r->pk.chan;
	if(hsz >= szs[SzSmall]){
		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
		if(hsz >= szs[SzMedium]){
			h = amfi24(h, e, r->pk.sz);
			h = amfbyte(h, e, r->pk.type);
			if(hsz >= szs[SzLarge])
				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
		}
	}
	assert(h != nil);
	memset(h, 0, e-h);
	if(Bwrite(r, hdata, h-hdata) < 0)
		goto err;
	if(rtmpdump)
		write(1, hdata, h-hdata);

	for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
		n = min(bodysz, Chunk);
		if(Bwrite(r, p, n) < 0)
			goto err;
		if(rtmpdump)
			write(1, p, n);
		p += n;
		bodysz -= n;
		if(bodysz > 0){
			*h = r->pk.chan | SzTiny<<6;
			Bputc(r, *h);
			if(rtmpdump)
				write(1, h, 1);
		}
	}

	Bflush(r);

	if(debug)
		fprint(2, "← %P\n", &r->pk);

	return 0;
err:
	werrstr("rtmpsend: %r");
	return -1;
}

static int
connect(RTMP *r)
{
	newpacket(r, PktInvoke, SzLarge, ChanCtl);
	putstr("connect");
		putnum(++r->invokes);
	putobj();
		putkvstr("app", r->app);
		putkvstr("tcUrl", r->tcurl);
		if(r->mode & OWRITE)
			putkvstr("type", "nonprivate");
		else{
			putkvbool("fpad", 0);
			putkvnum("capabilities", 15);
			putkvnum("audioCodecs", 3191);
			putkvnum("videoCodecs", 252);
			putkvnum("videoFunction", 1);
		}
	putend();

	return rtmpsend(r);
}

RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
	char *s, *e, *path, *app;
	int f, port, ctl;
	RTMP *r;

	fmtinstall('T', pktypefmt);
	fmtinstall('P', pkfmt);

	r = nil;
	f = -1;
	url = strdup(url); /* since we're changing it in-place */
	if(memcmp(url, "rtmp://", 7) != 0){
		werrstr("invalid url");
		goto err;
	}
	s = url + 7;
	if((e = strpbrk(s, ":/")) == nil){
		werrstr("no path");
		goto err;
	}
	port = 1935;
	if(*e == ':'){
		if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){
			werrstr("invalid port");
			goto err;
		}
	}else{
		path = e;
	}
	while(*(++path) == '/');

	s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
	f = dial(s, nil, nil, &ctl);
	free(s);
	if(f < 0)
		goto err;

	app = path;
	if((s = strchr(path, '/')) == nil){
		werrstr("no path");
		goto err;
	}
	if((e = strchr(s+1, '/')) != nil){
		/* at this point it can be app instance if there is another slash following */
		if((s = strchr(e+1, '/')) == nil){
			/* no, just path leftovers */
			s = e;
		}
		*s = 0;
		path = s+1;
	}else{
		path = nil;
	}

	if(handshake(f) != 0)
		goto err;
	if((r = calloc(1, sizeof(*r))) == nil)
		sysfatal("memory");
	if((r->app = strdup(app)) == nil || (path != nil && (r->path = strdup(path)) == nil))
		sysfatal("memory");
	bextend(r, Bufsz);
	r->tcurl = url;
	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
	r->i = f;
	if(connect(r) != 0 || rtmprecv(r) != 0)
		goto err;

	return r;

err:
	werrstr("rtmpdial: %r");
	if(r != nil)
		rtmpclose(r);
	else if(f >= 0)
		close(f);
	free(url);
	return nil;
}

void
rtmpclose(RTMP *r)
{
	if(r == nil)
		return;
	free(r->path);
	free(r->b);
	close(r->i);
	Bterm(r);
	free(r);
}