ref: 55fd4285c635d6e55c6fa440cd523c264be5bf4a
dir: /rtmp.c/
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include <libsec.h>
#include "amf0.h"
#include "ivf.h"
#include "rtmp.h"
#include "util.h"
#define min(a,b) ((a)<(b)?(a):(b))
enum {
Port = 1935,
CSsz = 1536,
ChunkDefault = 128,
ChunkDesired = 65536,
DataHdr = 0,
DataFrame,
Type0 = 0,
Type1,
Type2,
Type3,
CSUserCtl = 2,
CSCtl = 3,
CSData = 4,
CbCommand = 0,
CbTransID,
CbObject,
CbResponse,
NumCb,
/* UserControl */
CtlStreamBegin = 0,
CtlStreamEOF,
CtlStreamDry,
CtlSetBufferLen,
CtlStreamIsRecorded,
CtlPingRequest = 6,
CtlPingResponse,
/* Message.type */
SetChunkSize = 1,
Abort,
Ack,
UserControl,
WindowAckSize,
SetBandwidth,
Audio = 8,
Video,
AMF3Metadata = 15,
AMF3SharedObject,
AMF3Command,
AMF0Metadata,
AMF0SharedObject,
AMF0Command,
Aggregate = 22,
/* RTMP.bwlimit */
LimitHard = 0,
LimitSoft,
LimitDynamic,
Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */
Bufsz = 64*1024,
};
typedef struct Buffer Buffer;
typedef struct Command Command;
typedef struct Message Message;
#pragma varargck type "T" int
#pragma varargck type "M" Message*
struct Command {
void (*cb)(RTMP *r, int ok, A₀ *a[NumCb], void *aux);
char *code;
void *aux;
int tid;
int cs;
Command *prev, *next;
};
struct Message {
int type;
int fmt;
int cs;
int sid;
u32int ts;
u8int *data;
int sz;
Command cmd;
};
struct Buffer {
Message msg;
u8int *b, *p, *e;
int bsz;
};
struct RTMP {
Biobufhdr;
QLock;
Buffer i;
Buffer o;
Channel *c;
char *tcurl;
char *app;
char *inst;
char *path; /* FIXME no idea what this is for */
int chunkin;
int chunkout;
int mode;
int fd;
int winacksz;
int bw;
u8int bwlimit;
struct {
int tid;
Command *w;
}cmds;
int sps;
int aacpd;
u8int biobuf[Biobufsz];
};
#define putnull() do{ r->o.p = a₀null(r->o.p, r->o.e); }while(0)
#define putbyte(b) do{ r->o.p = a₀byte(r->o.p, r->o.e, b); }while(0)
#define putdata(d, sz) do { r->o.p = a₀data(r->o.p, r->o.e, d, sz); }while(0)
#define puti16(i) do{ r->o.p = a₀i16(r->o.p, r->o.e, i); }while(0)
#define puti24(i) do{ r->o.p = a₀i24(r->o.p, r->o.e, i); }while(0)
#define puti32(i) do{ r->o.p = a₀i32(r->o.p, r->o.e, i); }while(0)
#define putnum(v) do{ r->o.p = a₀num(r->o.p, r->o.e, v); }while(0)
#define putstr(s) do{ r->o.p = a₀str(r->o.p, r->o.e, s); }while(0)
#define putarr() do{ r->o.p = a₀arr(r->o.p, r->o.e); }while(0)
#define putobj() do{ r->o.p = a₀obj(r->o.p, r->o.e); }while(0)
#define putend() do{ r->o.p = a₀end(r->o.p, r->o.e); }while(0)
#define putkvnum(name, v) do{ r->o.p = a₀kvnum(r->o.p, r->o.e, name, v); }while(0)
#define putkvstr(name, s) do{ r->o.p = a₀kvstr(r->o.p, r->o.e, name, s); }while(0)
#define putkvbool(name, s) do{ r->o.p = a₀kvbool(r->o.p, r->o.e, name, s); }while(0)
#define putcommand(name, cb_) do { \
putstr(name); \
putnum(r->o.msg.cmd.tid); \
putobj(); \
r->o.msg.cmd.cb = cb_; \
}while(0)
#define putcall(name) do { \
putstr(name); \
putnum(r->o.msg.cmd.tid); \
}while(0)
static int szs[] = {
[Type3] = 0,
[Type2] = 3,
[Type1] = 7,
[Type0] = 11,
};
static char *msgtype2s[] = {
[SetChunkSize] = "SetChunkSize",
[Abort] = "Abort",
[Ack] = "Ack",
[UserControl] = "UserControl",
[WindowAckSize] = "WindowAckSize",
[SetBandwidth] = "SetBandwidth",
[Audio] = "Audio",
[Video] = "Video",
[AMF3Metadata] = "AMF3Metadata",
[AMF3SharedObject] = "AMF3SharedObject",
[AMF3Command] = "AMF3Command",
[AMF0Metadata] = "AMF0Metadata",
[AMF0SharedObject] = "AMF0SharedObject",
[AMF0Command] = "AMF0Command",
[Aggregate] = "Aggregate",
};
static char *ctl2s[] = {
[CtlStreamBegin] = "StreamBegin",
[CtlStreamEOF] = "StreamEOF",
[CtlStreamDry] = "StreamDry",
[CtlSetBufferLen] = "SetBufferLen",
[CtlStreamIsRecorded] = "StreamIsRecorded",
[CtlPingRequest] = "PingRequest",
[CtlPingResponse] = "PingResponse",
};
static char *bwlimit2s[] = {
[LimitHard] = "hard",
[LimitSoft] = "soft",
[LimitDynamic] = "dynamic",
};
static char *pubtype2s[] = {
[PubLive] = "live",
[PubAppend] = "append",
[PubRecord] = "record",
};
extern int debug;
static void
newmsg(RTMP *r, int type, int fmt, int cs)
{
memset(&r->o.msg, 0, sizeof(r->o.msg));
r->o.msg.type = type;
r->o.msg.fmt = fmt;
r->o.msg.cs = cs;
r->o.p = r->o.b;
if(type == AMF0Command)
r->o.msg.cmd.tid = ++r->cmds.tid;
else
r->o.msg.cmd.tid = 0;
r->o.msg.cmd.cs = cs;
}
static void
notransaction(RTMP *r)
{
r->o.msg.cmd.tid = 0;
}
static void
bextend(Buffer *b, int bsz)
{
u8int *ob;
if(b->bsz >= bsz)
return;
ob = b->b;
b->b = erealloc(ob, bsz*2);
if(ob != nil)
b->p = b->b + (intptr)(ob - b->p);
b->bsz = bsz*2;
b->e = b->b + b->bsz;
}
static int
rtmprecv(RTMP *r)
{
int hsz, len, n, msid;
u8int *h, *e, byte;
u32int ts;
memset(&r->i.msg, 0, sizeof(r->i.msg));
r->i.p = r->i.b;
if((n = readn(r->fd, &byte, 1)) != 1){
if(n == 0)
werrstr("eof");
goto err;
}
r->i.msg.fmt = (byte & 0xc0)>>6;
r->i.msg.cs = byte & 0x3f;
n = r->i.msg.cs + 1;
if(n <= 2){
if(readn(r->fd, r->i.p, n) != n)
goto err;
r->i.msg.cs = 64 + r->i.p[0];
if(n == 2)
r->i.msg.cs += 256 * r->i.p[1];
}
hsz = szs[r->i.msg.fmt];
if(readn(r->fd, r->i.p, hsz) != hsz)
goto err;
h = r->i.p;
e = r->i.p + hsz;
r->i.msg.type = -1;
msid = 0;
ts = 0;
len = 0;
if(hsz >= szs[Type2]){
h = a₀i24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */
if(hsz >= szs[Type1]){
h = a₀i24get(h, e, &len);
h = a₀byteget(h, e, &byte);
r->i.msg.type = byte;
if(hsz >= szs[Type0])
h = a₀i32leget(h, e, &msid);
}
}
if(ts == 0xffffff){ /* exntended timestamp */
if(readn(r->fd, h, 4) != 4)
goto err;
h = a₀i32get(h, h+4, (s32int*)&ts);
}
/* FIXME do all consecutive chunks use Type3? */
bextend(&r->i, len);
r->i.msg.data = h;
r->i.msg.sz = len;
for(;;){
n = min(len, r->chunkin);
if(readn(r->fd, h, n) != n)
goto err;
len -= n;
h += n;
if(len < 1)
break;
if(readn(r->fd, h, 1) != 1)
goto err;
if((r->i.msg.cs | Type3<<6) != *h){
werrstr("cs/fmt does not match: %02x", *h);
goto err;
}
}
return 0;
err:
werrstr("rtmprecv: %r");
return -1;
}
static int
rtmpsend(RTMP *r)
{
u8int *p, *h, *e, hdata[24];
int len, n, hsz;
Command *c;
Message *m;
if(r->o.p == nil)
goto err;
m = &r->o.msg;
m->data = r->o.b;
m->sz = r->o.p - r->o.b;
h = hdata;
*h++ = m->fmt<<6 | m->cs;
hsz = szs[m->fmt];
e = h + hsz;
if(hsz >= szs[Type2]){
h = a₀i24(h, e, m->ts);
if(hsz >= szs[Type1]){
h = a₀i24(h, e, m->sz);
h = a₀byte(h, e, m->type);
if(hsz >= szs[Type0])
h = a₀i32le(h, e, m->sid);
}
}
assert(h != nil);
memset(h, 0, e-h);
if(Bwrite(r, hdata, h-hdata) < 0)
goto err;
for(p = m->data, len = m->sz; len > 0;){
n = min(len, r->chunkout);
if(Bwrite(r, p, n) < 0)
goto err;
p += n;
len -= n;
if(len > 0){
*h = m->cs | Type3<<6;
Bputc(r, *h);
}
}
if(Bflush(r) < 0)
goto err;
if(debug){
fprint(2, "← %M", m);
if(m->type == AMF0Command){
A₀ *a;
u8int *s, *e;
fprint(2, ":");
s = m->data;
e = s + m->sz;
for(; s != nil && s != e;){
if((s = a₀parse(&a, s, e)) != nil)
fprint(2, " %A", a);
else
fprint(2, " %r");
a₀free(a);
}
}
fprint(2, "\n");
}
if(m->type == AMF0Command){
c = emalloc(sizeof(*c));
*c = m->cmd;
assert(c->cb != nil);
if((c->next = r->cmds.w) != nil)
c->next->prev = c;
r->cmds.w = c;
}
return 0;
err:
werrstr("rtmpsend: %r");
return -1;
}
static int
pong(RTMP *r, s32int n)
{
newmsg(r, UserControl, Type0, CSUserCtl);
puti16(CtlPingResponse);
puti32(n);
return rtmpsend(r);
}
static int
setchunksz(RTMP *r, int sz)
{
int n;
newmsg(r, SetChunkSize, Type0, CSUserCtl);
puti32(sz);
n = rtmpsend(r);
r->chunkout = sz;
return n;
}
static void
loop(void *aux)
{
int res, n, ok, i;
A₀ *a[NumCb], *v;
u8int *s, *e;
s16int s16;
Message *m;
Command *c;
char *k;
RTMP *r;
r = aux;
m = &r->i.msg;
res = 0;
memset(a, 0, sizeof(a));
for(;;){
for(n = 0; n < nelem(a); n++)
a₀free(a[n]);
memset(a, 0, sizeof(a));
if(res != 0 || (res = rtmprecv(r)) != 0){
if(debug)
fprint(2, "rtmp loop: %r\n");
for(n = 0; n < nelem(a); n++)
a₀free(a[n]);
break;
}
s = m->data;
e = s + m->sz;
qlock(r);
if(debug)
fprint(2, "→ %M", m);
switch(m->type){
case AMF0Command:
c = nil;
ok = 1;
for(n = 0; n < NumCb; n++){
if((s = a₀parse(&a[n], s, e)) == nil)
goto err;
switch(n){
case CbCommand:
if(a[n]->type != Tstr){
werrstr("command name is not a string: %A", a[n]);
goto err;
}
if(strcmp(a[n]->str, "_error") == 0)
ok = 0;
/* other values: "_result", etc */
break;
case CbTransID:
if(a[n]->type != Tnum){
werrstr("transaction ID is not a number");
goto err;
}
if(a[n]->num == 0) /* no transaction, will try matching with response */
break;
for(c = r->cmds.w; c != nil; c = c->next){
/* transaction id match */
if(c->tid == a[n]->num)
break;
}
break;
case CbResponse:
if(a[CbTransID]->num != 0) /* should have matches with the transaction */
break;
if(a[n]->type != Tobj)
break;
for(i = 0; i < a[n]->obj.n; i++){
k = a[n]->obj.k[i];
v = a[n]->obj.v[i];
if(strcmp(k, "code") == 0 && v->type == Tstr){
for(c = r->cmds.w; c != nil; c = c->next){
if(c->code != nil && strcmp(c->code, v->str) == 0)
break;
}
break;
}
}
break;
}
}
if(debug)
fprint(2, " tid=%A: %A %A %A\n", a[CbTransID], a[CbCommand], a[CbObject], a[CbResponse]);
if(c != nil){
if(c->prev != nil)
c->prev->next = c->next;
if(c->next != nil)
c->next->prev = c->prev;
if(r->cmds.w == c)
r->cmds.w = c->next;
c->cb(r, ok, a, c->aux);
free(c);
}else if(a[CbTransID] != nil){
fprint(2, "response/command with no handler (transaction %d)\n", (int)a[CbTransID]->num);
}
break;
case SetChunkSize:
if(a₀i32get(s, e, &r->chunkin) == nil)
goto err;
if(r->chunkin < 2){
werrstr("invalid chunk size: %d", r->chunkin);
goto err;
}
if(debug)
fprint(2, ": %d\n", r->chunkin);
break;
case UserControl:
if((s = a₀i16get(s, e, &s16)) == nil)
goto err;
if(a₀i32get(s, e, &n) == nil)
n = -1;
switch(s16){
case CtlStreamBegin:
case CtlStreamEOF:
case CtlStreamDry:
case CtlSetBufferLen:
case CtlStreamIsRecorded:
if(0){
case CtlPingRequest:
if(pong(r, n) != 0)
goto err;
}
if(debug)
fprint(2, ": %s %d\n", ctl2s[s16], n);
break;
default:
if(debug)
fprint(2, ": ?%d? %d\n", s16, n);
break;
}
break;
case WindowAckSize: /* FIXME send acks too */
if(a₀i32get(s, e, &r->winacksz) == nil)
goto err;
if(debug)
fprint(2, ": %d\n", r->winacksz);
break;
case SetBandwidth:
if((s = a₀i32get(s, e, &r->bw)) == nil || a₀byteget(s, e, &r->bwlimit) == nil)
goto err;
if(debug)
fprint(2, ": %d (%s)\n", r->bw, r->bwlimit < nelem(bwlimit2s) ? bwlimit2s[r->bwlimit] : "???");
break;
/* FIXME */
case Aggregate:
case Abort:
case Ack:
case Audio:
case Video:
case AMF0Metadata:
case AMF0SharedObject:
break;
case AMF3Metadata:
case AMF3SharedObject:
case AMF3Command:
if(debug)
fprint(2, ": ignored\n");
break;
err:
res = -1;
break;
}
qunlock(r);
}
chanclose(r->c);
threadexitsall(res == 0 ? nil : "error");
}
static int
handshake(int f)
{
u8int c[1+CSsz], s[1+CSsz];
c[0] = 3; /* rtmp v3 */
memset(c+1, 0, 4+4); /* timestamp + zero */
prng(c+1+8, CSsz-4-4);
if(write(f, c, sizeof(c)) != sizeof(c))
goto err;
if(readn(f, s, sizeof(s)) != sizeof(s))
goto err;
if(c[0] != s[0]){
werrstr("expected version %d, got %d", c[0], s[0]);
goto err;
}
if(write(f, s+1, CSsz) != CSsz)
goto err;
if(readn(f, s+1, CSsz) != CSsz)
goto err;
if(memcmp(c, s, sizeof(c)) != 0){
werrstr("C1 != S2");
goto err;
}
return 0;
err:
werrstr("handshake: %r");
return -1;
}
static void
streamcreated(RTMP *, int ok, A₀ *a[NumCb], void *aux)
{
Channel *sid;
sid = aux;
if(strcmp(a[CbCommand]->str, "_result") != 0)
fprint(2, "createStream: expected '_result', got %#q\n", a[CbCommand]->str);
else if(a[CbResponse]->type != Tnum)
fprint(2, "createStream: expected stream ID, got NaN\n");
else if(!ok)
fprint(2, "createStream: %A\n", a[CbResponse]);
else
sendul(sid, (ulong)a[CbResponse]->num);
chanclose(sid);
}
int
rtmpstream(RTMP *r, ulong *sid)
{
Channel *c;
int n;
c = chancreate(sizeof(ulong), 0);
qlock(r);
newmsg(r, AMF0Command, Type0, CSCtl);
putcall("createStream");
putnull();
r->o.msg.cmd.cb = streamcreated;
r->o.msg.cmd.aux = c;
n = rtmpsend(r);
qunlock(r);
n = (n == 0 && recv(c, sid) == 1) ? 0 : -1;
chanfree(c);
return n;
}
static void
streampublished(RTMP *, int ok, A₀ *a[NumCb], void *aux)
{
Channel *err;
err = aux;
if(strcmp(a[CbCommand]->str, "onStatus") != 0)
fprint(2, "streampublished: expected 'onStatus', got %#q\n", a[CbCommand]->str);
else if(a[CbResponse]->type != Tobj)
fprint(2, "streampublished: expected object, got something else\n");
else if(ok)
sendp(err, nil);
chanclose(err);
}
int
rtmppublish(RTMP *r, ulong sid, int type, char *name)
{
Channel *c;
char *e;
int n;
if(type < 0 || type >= nelem(pubtype2s)){
werrstr("invalid publish type %d", type);
return -1;
}
if(name == nil && (name = r->inst) == nil){
werrstr("no name to publish to");
return -1;
}
c = chancreate(sizeof(char*), 0);
qlock(r);
newmsg(r, AMF0Command, Type0, CSData);
putcall("publish");
putnull();
putstr(name);
putstr(pubtype2s[type]);
r->o.msg.cmd.cb = streampublished;
r->o.msg.cmd.aux = c;
r->o.msg.cmd.code = "NetStream.Publish.Start";
r->o.msg.sid = sid;
n = rtmpsend(r);
qunlock(r);
e = nil;
n = (n == 0 && recv(c, &e) == 1) ? 0 : -1;
chanfree(c);
if(e != nil){
werrstr("%s", e);
free(c);
}
return (n == 0 && e == nil) ? 0 : -1;
}
int
rtmpmeta(RTMP *r, ulong sid, int vcodec, int w, int h, int acodec)
{
int res;
assert(vcodec < 0 || vcodec == VcodecH264);
assert(acodec < 0 || acodec == AcodecAAC);
qlock(r);
newmsg(r, AMF0Metadata, Type0, CSData);
putstr("@setDataFrame");
putstr("onMetaData");
putarr();
puti32(2 + (vcodec < 0 ? 0 : 3) + (acodec < 0 ? 0 : 1));
putkvnum("duration", 0);
putkvnum("filesize", 0);
if(vcodec >= 0){
putkvnum("videocodecid", vcodec);
putkvnum("width", w);
putkvnum("height", h);
}
if(acodec >= 0)
putkvnum("audiocodecid", acodec);
putend();
r->o.msg.sid = sid;
res = rtmpsend(r);
qunlock(r);
return res;
}
static int
nalsz(u8int *p, int sz, int *csz)
{
int n;
*csz = 0;
for(n = 0; n < sz-3;){
if(p[n] == 0 && p[n+1] == 0){
if(p[n+2] == 1){
*csz += 3;
n += 3;
}else if(p[n+2] == 0 && p[n+3] == 1){
*csz += 4;
n += 4;
}else
break;
}else
break;
}
for(; n < sz-3; n++){
if(p[n] == 0 && p[n+1] == 0){
if(p[n+2] == 1)
return n;
else if(p[n+2] == 0 && p[n+3] == 1)
return n;
}
}
return sz;
}
static int
h264data(RTMP *r, ulong sid, u32int dt, u8int *p, int sz)
{
u8int *p₀, sps[128], pps[128], ps[16+sizeof(sps)+sizeof(pps)];
int sz₀, csz, nsz, ntype, spssz, ppssz, key, total;
sz₀ = sz;
p₀ = p;
spssz = 0;
ppssz = 0;
key = 0;
for(total = 0; sz > 0; total += 4+nsz, p += nsz, sz -= nsz){
nsz = nalsz(p, sz, &csz);
p += csz;
sz -= csz;
nsz -= csz;
ntype = *p & 0x1f;
if(ntype == 7){
memmove(sps, p, nsz);
spssz = nsz;
}
if(ntype == 8){
memmove(pps, p, nsz);
ppssz = nsz;
}
if(ntype == 5)
key = 1;
}
if(spssz > 0 && ppssz > 0 && !r->sps){
newmsg(r, Video, Type0, CSData);
r->o.msg.ts = 0;
r->o.msg.sid = sid;
putbyte(0x10 | VcodecH264);
putbyte(DataHdr);
puti24(0);
p = ps;
*p++ = 1; /* version */
*p++ = sps[1]; /* profile */
*p++ = sps[2]; /* compatibility */
*p++ = sps[3]; /* level */
*p++ = 0xfc | 3; /* reserved (6 bits), NULA length size - 1 (2 bits) */
*p++ = 0xe0 | 1; /* reserved (3 bits), num of SPS (5 bits) */
*p++ = spssz >> 8;
*p++ = spssz;
p = (u8int*)memmove(p, sps, spssz) + spssz;
*p++ = 1;
*p++ = ppssz >> 8;
*p++ = ppssz;
p = (u8int*)memmove(p, pps, ppssz) + ppssz;
putdata(ps, p-ps);
if(rtmpsend(r) < 0)
return -1;
r->sps = 1;
}
bextend(&r->o, 64+total);
newmsg(r, Video, Type0, CSData);
r->o.msg.ts = dt;
r->o.msg.sid = sid;
putbyte((key ? 0x10 : 0x20) | VcodecH264);
putbyte(DataFrame);
puti24(0);
for(p = p₀, sz = sz₀; sz > 0; p += nsz, sz -= nsz){
nsz = nalsz(p, sz, &csz);
p += csz;
sz -= csz;
nsz -= csz;
puti32(nsz);
putdata(p, nsz);
}
return rtmpsend(r);
}
static int
aacdata(RTMP *r, ulong sid, u32int dt, u8int *p, int sz)
{
int chanc, ratei, objt;
u16int x;
if(sz < 7){
werrstr("aac frame too small");
return -1;
}
if(!r->aacpd){
newmsg(r, Audio, Type0, CSData);
r->o.msg.ts = 0;
r->o.msg.sid = sid;
putbyte(AcodecAAC<<4 | 0xf);
putbyte(DataHdr);
objt = (p[2]>>6) + 1;
ratei = (p[2]>>2) & 7;
chanc = (p[2]&3)<<2 | p[3]>>6;
if(chanc > 7){
werrstr("channel config out of range: %d", chanc);
return -1;
}else if(ratei > 12){
werrstr("invalid rate config: %d", ratei);
return -1;
}else if(objt > 4){
werrstr("object type out of range: %d", objt);
return -1;
}
x = chanc<<3 | ratei<<7 | objt<<11;
putbyte(x>>8);
putbyte(x);
/* FIXME wtf */
putbyte(0x56);
putbyte(0xe5);
putbyte(0x00);
if(rtmpsend(r) < 0)
return -1;
r->aacpd = 1;
}
bextend(&r->o, sz);
newmsg(r, Audio, Type0, CSData);
r->o.msg.ts = dt;
r->o.msg.sid = sid;
putbyte(AcodecAAC<<4 | 0xf);
putbyte(DataFrame);
putdata(p+7, sz-7);
return rtmpsend(r);
}
int
rtmpdata(RTMP *r, ulong sid, u32int dt, int type, void *p, int sz)
{
int res;
assert(type == Taudio || type == Tvideo);
qlock(r);
res = (type == Tvideo ? h264data : aacdata)(r, sid, dt, p, sz);
qunlock(r);
return res;
}
static void
connected(RTMP *r, int ok, A₀ *a[NumCb], void *)
{
char *s;
s = nil;
if(ok){
if(strcmp(a[CbCommand]->str, "_result") != 0)
s = smprint("expected '_result', got %#q", a[CbCommand]->str);
else
setchunksz(r, ChunkDesired);
}else{
s = smprint("%A", a[CbResponse]);
}
sendp(r->c, s);
}
static int
connect(RTMP *r)
{
newmsg(r, AMF0Command, Type0, CSCtl);
putcommand("connect", connected);
putkvstr("app", r->app);
putkvstr("tcUrl", r->tcurl);
putkvstr("type", "nonprivate");
putkvbool("fpad", 0); /* no proxy */
putkvnum("audioCodecs", 0x4 | 0x400); /* mp3 + aac */
putkvnum("videoCodecs", 0x80); /* h.264 */
putkvnum("videoFunction", 0); /* no frame-accurate seek */
putkvnum("objectEncoding", 0); /* AMF0 */
putend();
return rtmpsend(r);
}
static int
msgtypefmt(Fmt *f)
{
char *s;
int t;
if((t = va_arg(f->args, int)) >= 0 &&
t < nelem(msgtype2s) &&
(s = msgtype2s[t]) != nil)
return fmtprint(f, "%s", s);
return fmtprint(f, "%d", t);
}
static int
msgfmt(Fmt *f)
{
Message *m;
m = va_arg(f->args, Message*);
fmtprint(f, "type=%T cs=%d ts=%ud sz=%d", m->type, m->cs, m->ts, m->sz);
return 0;
}
static void
rtmpfree(RTMP *r)
{
free(r->i.b);
free(r->o.b);
free(r->tcurl);
free(r->app);
free(r->inst);
free(r->path);
if(r->c != nil)
chanfree(r->c);
Bterm(r);
free(r);
}
RTMP *
rtmpdial(char *url)
{
char *s, *e, *p, *app, *inst, *path;
int f, port, ctl;
RTMP *r;
fmtinstall('A', a₀fmt);
fmtinstall('T', msgtypefmt);
fmtinstall('M', msgfmt);
quotefmtinstall();
r = nil;
f = -1;
url = estrdup(url); /* since we're changing it in-place */
if(memcmp(url, "rtmp://", 7) != 0){
werrstr("invalid url");
goto err;
}
s = url + 7;
if((e = strpbrk(s, ":/")) == nil){
werrstr("no path");
goto err;
}
port = 1935;
if(*e == ':'){
if((port = strtol(e+1, &p, 10)) < 1 || p == e+1 || *p != '/'){
werrstr("invalid port");
goto err;
}
}else{
p = e;
}
while(*(++p) == '/');
s = smprint("tcp!%.*s!%d", (int)(e-s), s, port);
f = dial(s, nil, nil, &ctl);
free(s);
if(f < 0)
goto err;
/* rtmp://host:port/app[/inst[/path]] */
app = p;
inst = nil;
path = nil;
if((s = strchr(p, '/')) != nil){ /* app instance */
*s = 0;
inst = s+1;
if((s = strchr(s+1, '/')) != nil){ /* path */
*s = 0;
path = s+1;
}
}
if(handshake(f) != 0)
goto err;
r = ecalloc(1, sizeof(*r));
r->fd = f;
r->chunkin = ChunkDefault;
r->chunkout = ChunkDefault;
r->tcurl = url;
url = nil;
r->c = chancreate(sizeof(void*), 0);
r->app = estrdup(app);
r->inst = inst == nil ? nil : estrdup(inst);
r->path = path == nil ? nil : estrdup(path);
bextend(&r->i, Bufsz);
bextend(&r->o, Bufsz);
Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf));
if(connect(r) != 0)
goto err;
if(proccreate(loop, r, mainstacksize) < 0)
goto err;
/* wait for the connect call to finish */
if((s = recvp(r->c)) != nil){
rtmpclose(r);
werrstr("rtmpdial: %s", s);
free(s);
r = nil;
}
return r;
err:
werrstr("rtmpdial: %r");
if(r != nil)
rtmpfree(r);
if(f >= 0)
close(f);
free(url);
return nil;
}
void
rtmpclose(RTMP *r)
{
if(r == nil)
return;
if(r->fd >= 0)
close(r->fd);
if(r->c != nil)
recvp(r->c);
rtmpfree(r);
}