shithub: rtmp

ref: 0ce01d66fb72ea51b7dd0e885a01e2bba1eda7ff
dir: /rtmp.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf.h"
#include "ivf.h"
#include "rtmp.h"
#include "util.h"

#define min(a,b) ((a)<(b)?(a):(b))

enum {
	SzLarge,
	SzMedium,
	SzSmall,
	SzTiny,

	Port = 1935,
	Sigsz = 1536,
	Chunk = 128,

	ChanCtl = 3,

	CbWhat = 0,
	CbInvoke,
	CbData,
	CbStatus,
	NumCbA,

	CtlStreamBegin = 0,
	CtlStreamEnd,
	CtlStreamDry,
	CtlStreamRecorded,
	CtlPing = 6,
	CtlBufferEmpty = 31,
	CtlBufferReady,

	PktChunkSz = 1,
	PktBytesReadReport = 3,
	PktControl = 4,
	PktServerBW = 5,
	PktClientBW,
	PktAudio = 8,
	PktVideo,
	PktFlexStreamSend = 15,
	PktFlexSharedObj,
	PkgFlexMessage,
	PktFlexInfo,
	PktSharedObj = 19,
	PktInvoke = 20,
	PktFlashVideo = 22,

	Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
	Bufsz = 64*1024,
};

typedef struct Invoke Invoke;
typedef struct Packet Packet;

struct Invoke {
	void (*cb)(RTMP *r, int ok, Amf *a[NumCbA], void *aux);
	void *aux;
	int n;

	Invoke *prev, *next;
};

struct Packet {
	int type;
	int ht;
	int chan;
	u32int ts;
	u8int *data;
	int sz;
	Invoke invoke;
};

struct RTMP {
	Biobufhdr;
	Channel *c;
	char *app;
	char *path;
	char *tcurl;
	Packet pk;
	u8int *b, *p, *e;
	int chunk;
	int mode;
	int bsz;
	int i;
	struct {
		int n;
		Invoke *w;
	}invokes;
	u8int biobuf[Biobufsz];
};

#define puti16(i) do{ r->p = amfi16(r->p, r->e, i); }while(0)
#define puti24(i) do{ r->p = amfi24(r->p, r->e, i); }while(0)
#define puti32(i) do{ r->p = amfi32(r->p, r->e, i); }while(0)
#define putnum(v) do{ r->p = amfnum(r->p, r->e, v); }while(0)
#define putstr(s) do{ r->p = amfstr(r->p, r->e, s); }while(0)
#define putarr() do{ r->p = amfarr(r->p, r->e); }while(0)
#define putobj() do{ r->p = amfobj(r->p, r->e); }while(0)
#define putend() do{ r->p = amfend(r->p, r->e); }while(0)
#define putkvnum(name, v) do{ r->p = amfkvnum(r->p, r->e, name, v); }while(0)
#define putkvstr(name, s) do{ r->p = amfkvstr(r->p, r->e, name, s); }while(0)
#define putkvbool(name, s) do{ r->p = amfkvbool(r->p, r->e, name, s); }while(0)

#define putinvoke(name) do { \
	putstr(name); \
	putnum(r->pk.invoke.n); \
	putobj(); \
}while(0)

static int szs[] = {
	[SzTiny] = 1,
	[SzSmall] = 4,
	[SzMedium] = 8,
	[SzLarge] = 12,
};

static char *pktype2s[] = {
	[PktChunkSz] = "ChunkSz",
	[PktBytesReadReport] = "BytesReadReport",
	[PktControl] = "Control",
	[PktServerBW] = "ServerBW",
	[PktClientBW] = "ClientBW",
	[PktAudio] = "Audio",
	[PktVideo] = "Video",
	[PktFlexStreamSend] = "FlexStreamSend",
	[PktFlexSharedObj] = "FlexSharedObj",
	[PktFlexInfo] = "FlexInfo",
	[PktSharedObj] = "SharedObj",
	[PktInvoke] = "Invoke",
	[PktFlashVideo] = "FlashVideo",
};

static char *ctl2s[] = {
	[CtlStreamBegin] = "StreamBegin",
	[CtlStreamEnd] = "StreamEnd",
	[CtlStreamDry] = "StreamDry",
	[CtlStreamRecorded] = "StreamRecorded",
	[CtlPing] = "Ping",
	[CtlBufferEmpty] = "BufferEmpty",
	[CtlBufferReady] = "BufferReady",
};

extern int debug;

#pragma varargck type "T" int
static int
pktypefmt(Fmt *f)
{
	char *s;
	int t;

	if((t = va_arg(f->args, int)) >= 0 &&
	   t < nelem(pktype2s) &&
	   (s = pktype2s[t]) != nil)
		return fmtprint(f, "%s", s);

	return fmtprint(f, "%d", t);
}

#pragma varargck type "P" Packet*
static int
pkfmt(Fmt *f)
{
	u8int *s, *e;
	Packet *p;
	Amf *a;

	p = va_arg(f->args, Packet*);

	fmtprint(f, "type=%T chan=%d ts=%ud sz=%d", p->type, p->chan, p->ts, p->sz);

	s = p->data;
	e = s + p->sz;

	if(p->type == PktInvoke){
		fmtprint(f, ":");
		for(; s != nil && s != e;){
			if((s = amfparse(&a, s, e)) != nil)
				fmtprint(f, " %A", a);
			else
				fmtprint(f, " %r");
			amffree(a);
		}
	}

	return 0;
}

static void
newpacket(RTMP *r, int type, int ht, int chan)
{
	memset(&r->pk, 0, sizeof(r->pk));

	r->pk.type = type;
	r->pk.ht = ht;
	r->pk.chan = chan;
	r->p = r->b;
	if(type == PktInvoke)
		r->pk.invoke.n = ++r->invokes.n;
}

static void
bextend(RTMP *r, int bsz)
{
	u8int *ob;

	if(r->bsz >= bsz)
		return;
	ob = r->b;
	r->b = erealloc(r->b, bsz*2);
	if(ob != nil)
		r->p = r->b + (intptr)(ob - r->p);
	r->bsz = bsz*2;
	r->e = r->b + r->bsz;
}

static int
handshake(int f)
{
	u8int cl[1+Sigsz], sv[1+Sigsz];

	cl[0] = 3; /* no encryption */
	memset(cl+1, 0, 8);
	prng(cl+1+8, Sigsz-8);
	if(write(f, cl, sizeof(cl)) != sizeof(cl))
		goto err;
	if(readn(f, sv, sizeof(sv)) != sizeof(sv))
		goto err;
	if(cl[0] != sv[0]){
		werrstr("expected %d (no encryption), got %d", cl[0], sv[0]);
		goto err;
	}
	if(write(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(readn(f, sv+1, Sigsz) != Sigsz)
		goto err;
	if(memcmp(cl, sv, sizeof(cl)) != 0){
		werrstr("signature mismatch");
		goto err;
	}

	return 0;

err:
	werrstr("handshake: %r");
	return -1;
}

static int
rtmprecv(RTMP *r)
{
	int hsz, bodysz, dummy, n;
	u8int *h, *e, type;
	u32int ts;

	memset(&r->pk, 0, sizeof(r->pk));

	r->p = r->b;
	if(readn(r->i, r->p, 1) != 1)
		goto err;
	r->pk.ht = (r->p[0] & 0xc0)>>6;
	r->pk.chan = r->p[0] & 0x3f;
	hsz = szs[r->pk.ht];
	if(readn(r->i, r->p+1, hsz-1) != hsz-1)
		goto err;

	h = r->p + 1;
	e = r->p + hsz;
	r->pk.type = -1;
	bodysz = 0;
	if(hsz >= szs[SzSmall]){
		h = amfi24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
		if(hsz >= szs[SzMedium]){
			h = amfi24get(h, e, &bodysz);
			h = amfbyteget(h, e, &type);
			r->pk.type = type;
			if(hsz >= szs[SzLarge]){
				dummy = 0;
				h = amfi32get(h, e, &dummy); /* FIXME seems to be always 0? */
			}
		}
	}

	if(ts == 0xffffff){ /* exntended timestamp */
		if(readn(r->i, h, 4) != 4)
			goto err;
		h = amfi32get(h, h+4, (s32int*)&ts);
	}

	/* FIXME do all consecutive chunks use Tiny? */
	bextend(r, bodysz);
	r->pk.data = h;
	r->pk.sz = bodysz;
	for(;;){
		n = min(bodysz, r->chunk);
		if(readn(r->i, h, n) != n)
			goto err;
		bodysz -= n;
		h += n;
		if(bodysz < 1)
			break;
		if(readn(r->i, h, 1) != 1)
			goto err;
		if((r->pk.chan | SzTiny<<6) != *h){
			werrstr("chan/size does not match: %02x", *h);
			goto err;
		}
	}

	if(debug)
		fprint(2, "→ %P\n", &r->pk);

	return 0;

err:
	werrstr("rtmprecv: %r");
	return -1;
}

static int
rtmpsend(RTMP *r)
{
	u8int *p, *h, *e, hdata[32];
	int bodysz, n, hsz;
	Invoke *i;

	if(r->p == nil)
		goto err;

	r->pk.data = r->b;
	r->pk.sz = r->p - r->b;
	/* FIXME special case when bodysz is 0 */

	hsz = szs[r->pk.ht];
	h = hdata;
	e = h + hsz;
	*h++ = r->pk.ht<<6 | r->pk.chan;
	if(hsz >= szs[SzSmall]){
		h = amfi24(h, e, 0); /* FIXME proper timestamps? */
		if(hsz >= szs[SzMedium]){
			h = amfi24(h, e, r->pk.sz);
			h = amfbyte(h, e, r->pk.type);
			if(hsz >= szs[SzLarge])
				h = amfi32(h, e, 0); /* FIXME seems to be always 0? */
		}
	}
	assert(h != nil);
	memset(h, 0, e-h);
	if(Bwrite(r, hdata, h-hdata) < 0)
		goto err;

	for(p = r->pk.data, bodysz = r->pk.sz; bodysz > 0;){
		n = min(bodysz, r->chunk);
		if(Bwrite(r, p, n) < 0)
			goto err;
		p += n;
		bodysz -= n;
		if(bodysz > 0){
			*h = r->pk.chan | SzTiny<<6;
			Bputc(r, *h);
		}
	}

	Bflush(r);

	if(debug)
		fprint(2, "← %P\n", &r->pk);

	if(r->pk.type == PktInvoke){
		i = emalloc(sizeof(*i));
		*i = r->pk.invoke;
		assert(i->cb != nil);
		if((i->next = r->invokes.w) != nil)
			i->next->prev = i;
		r->invokes.w = i;
	}

	return 0;
err:
	werrstr("rtmpsend: %r");
	return -1;
}

static void
connected(RTMP *r, int ok, Amf *a[NumCbA], void *)
{
	sendp(r->c, ok ? nil : smprint("%A", a[CbStatus]));
}

static int
connect(RTMP *r)
{
	newpacket(r, PktInvoke, SzLarge, ChanCtl);
	putinvoke("connect");
		putkvstr("app", r->app);
		putkvstr("tcUrl", r->tcurl);
		if(r->mode & OWRITE)
			putkvstr("type", "nonprivate");
		else{
			putkvbool("fpad", 0);
			putkvnum("capabilities", 15);
			putkvnum("audioCodecs", 3191);
			putkvnum("videoCodecs", 252);
			putkvnum("videoFunction", 1);
		}
	putend();
	r->pk.invoke.cb = connected;

	return rtmpsend(r);
}

static void
rtmpfree(RTMP *r)
{
	free(r->app);
	free(r->b);
	free(r->path);
	free(r->tcurl);
	if(r->c != nil){
		sendp(r->c, "done");
		chanfree(r->c);
	}
	Bterm(r);
	free(r);
}

static void
loop(void *aux)
{
	int res, n, ok;
	Amf *a[NumCbA];
	u8int *s, *e;
	s16int s16;
	Packet *p;
	Invoke *i;
	RTMP *r;

	r = aux;
	p = &r->pk;
	res = 0;
	memset(a, 0, sizeof(a));
	for(;;){
		for(n = 0; n < nelem(a); n++)
			amffree(a[n]);
		memset(a, 0, sizeof(a));

		if(res != 0 || (res = rtmprecv(r)) != 0){
			if(debug)
				fprint(2, "rtmp loop: %r\n");
			for(n = 0; n < nelem(a); n++)
				amffree(a[n]);
			break;
		}

		s = r->pk.data;
		e = s + r->pk.sz;

		switch(p->type){
		case PktInvoke:
			i = nil;
			ok = 0;
			for(n = 0; n < NumCbA; n++){
				if((s = amfparse(&a[n], s, e)) == nil)
					goto err;
				switch(n){
				case CbWhat:
					if(a[n]->type != Tstr){
						werrstr("invoke a[%d] not a string", n);
						goto err;
					}
					if(strcmp(a[n]->str, "_result") == 0)
						ok = 1;
					else if(strcmp(a[n]->str, "_error") == 0)
						ok = 0;
					else{
						werrstr("unexpected a[%d]: %#q", n, a[n]->str);
						goto err;
					}
					break;
				case CbInvoke:
					if(a[n]->type != Tnum){
						werrstr("invoke a[%d] not a number", n);
						goto err;
					}
					for(i = r->invokes.w; i != nil && i->n != a[n]->num; i = i->next);
					if(i == nil){
						werrstr("did not expect invoke %d result", (int)a[n]->num);
						goto err;
					}
					break;
				}
			}
			if(i->prev != nil)
				i->prev->next = i->next;
			if(i->next != nil)
				i->next->prev = i->prev;
			if(r->invokes.w == i)
				r->invokes.w = i->next;
			i->cb(r, ok, a, i->aux);
			free(i);
			break;

		case PktChunkSz:
			if(amfi32get(s, e, &r->chunk) == nil)
				goto err;
			if(r->chunk < 2){
				werrstr("invalid chunk size: %d", r->chunk);
				goto err;
			}
			if(debug)
				fprint(2, "new chunk size: %d bytes\n", r->chunk);
			break;

		case PktBytesReadReport:
		case PktControl:
			if((s = amfi16get(s, e, &s16)) == nil)
				goto err;
			if((s = amfi32get(s, e, &n)) == nil)
				n = -1;
			switch(s16){
			case CtlStreamBegin:
			case CtlStreamEnd:
			case CtlStreamDry:
			case CtlStreamRecorded:
			case CtlBufferEmpty:
			case CtlBufferReady:
				if(0){
			case CtlPing:
					/* FIXME pong */
					USED(n);
				}
				if(debug)
					fprint(2, "control packet: %s %d\n", ctl2s[s16], n);
				break;
			default:
				if(debug)
					fprint(2, "unknown control packet %d (value %d)\n", s16, n);
				break;
			}
			break;

		case PktServerBW:
		case PktClientBW:
		case PktAudio:
		case PktVideo:
		case PktFlexStreamSend:
		case PktFlexSharedObj:
		case PktFlexInfo:
		case PktSharedObj:
		case PktFlashVideo:
			break;
err:
			res = -1;
			break;
		}
	}

	rtmpfree(r);
	threadexitsall(res == 0 ? nil : "error");
}

RTMP *
rtmpdial(char *url, int w, int h, int withaudio)
{
	char *s, *e, *path, *app;
	int f, port, ctl;
	RTMP *r;

	fmtinstall('A', amffmt);
	fmtinstall('T', pktypefmt);
	fmtinstall('P', pkfmt);
	quotefmtinstall();

	r = nil;
	f = -1;
	url = estrdup(url); /* since we're changing it in-place */
	if(memcmp(url, "rtmp://", 7) != 0){
		werrstr("invalid url");
		goto err;
	}
	s = url + 7;
	if((e = strpbrk(s, ":/")) == nil){
		werrstr("no path");
		goto err;
	}
	port = 1935;
	if(*e == ':'){
		if((port = strtol(e+1, &path, 10)) < 1 || path == e+1 || *path != '/'){
			werrstr("invalid port");
			goto err;
		}
	}else{
		path = e;
	}
	while(*(++path) == '/');

	s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
	f = dial(s, nil, nil, &ctl);
	free(s);
	if(f < 0)
		goto err;

	app = path;
	if((s = strchr(path, '/')) == nil){
		werrstr("no path");
		goto err;
	}
	if((e = strchr(s+1, '/')) != nil){
		/* at this point it can be app instance if there is another slash following */
		if((s = strchr(e+1, '/')) == nil){
			/* no, just path leftovers */
			s = e;
		}
		*s = 0;
		path = s+1;
	}else{
		path = nil;
	}

	if(handshake(f) != 0)
		goto err;

	r = ecalloc(1, sizeof(*r));
	r->i = f;
	r->chunk = Chunk;
	r->tcurl = url;
	url = nil;
	r->c = chancreate(sizeof(void*), 0);
	r->app = estrdup(app);
	r->path = path == nil ? nil : estrdup(path);
	bextend(r, Bufsz);
	Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));

	if(connect(r) != 0)
		goto err;
	if(proccreate(loop, r, mainstacksize) < 0)
		goto err;

	/* wait for the connect call to finish */
	if((s = recvp(r->c)) != nil){
		rtmpclose(r);
		werrstr("rtmpdial: %s", s);
		free(s);
		r = nil;
	}

	return r;

err:
	werrstr("rtmpdial: %r");
	if(r != nil)
		rtmpfree(r);
	if(f >= 0)
		close(f);
	free(url);
	return nil;
}

void
rtmpclose(RTMP *r)
{
	if(r == nil)
		return;
	if(r->i >= 0)
		close(r->i);
	if(r->c != nil)
		chanclose(r->c);
}