ref: f68aad9c7e2b465cffecdd033e3a4c6612dfa40c
dir: /rtmp.c/
#include <u.h> #include <libc.h> #include <thread.h> #include <bio.h> #include <libsec.h> #include "amf0.h" #include "ivf.h" #include "rtmp.h" #include "util.h" #define min(a,b) ((a)<(b)?(a):(b)) enum { Port = 1935, CSsz = 1536, ChunkDefault = 128, ChunkDesired = 65536, DataHdr = 0, DataFrame, Type0 = 0, Type1, Type2, Type3, CSUserCtl = 2, CSCtl = 3, CSData = 4, CbCommand = 0, CbTransID, CbObject, CbResponse, NumCb, /* UserControl */ CtlStreamBegin = 0, CtlStreamEOF, CtlStreamDry, CtlSetBufferLen, CtlStreamIsRecorded, CtlPingRequest = 6, CtlPingResponse, /* Message.type */ SetChunkSize = 1, Abort, Ack, UserControl, WindowAckSize, SetBandwidth, Audio = 8, Video, AMF3Metadata = 15, AMF3SharedObject, AMF3Command, AMF0Metadata, AMF0SharedObject, AMF0Command, Aggregate = 22, /* RTMP.bwlimit */ LimitHard = 0, LimitSoft, LimitDynamic, Biobufsz = 64*1024, /* FIXME don't know if it helps with anything */ Bufsz = 64*1024, }; typedef struct Buffer Buffer; typedef struct Command Command; typedef struct Message Message; #pragma varargck type "T" int #pragma varargck type "M" Message* struct Command { void (*cb)(RTMP *r, int ok, A₀ *a[NumCb], void *aux); char *code; void *aux; int tid; int cs; Command *prev, *next; }; struct Message { int type; int fmt; int cs; int sid; u32int ts; u8int *data; int sz; Command cmd; }; struct Buffer { Message msg; u8int *b, *p, *e; int bsz; }; struct RTMP { Biobufhdr; QLock; Buffer i; Buffer o; Channel *c; char *tcurl; char *app; char *inst; char *path; /* FIXME no idea what this is for */ int chunkin; int chunkout; int mode; int fd; int winacksz; int bw; u8int bwlimit; struct { int tid; Command *w; }cmds; int sps; int aacpd; u8int biobuf[Biobufsz]; }; #define putnull() do{ r->o.p = a₀null(r->o.p, r->o.e); }while(0) #define putbyte(b) do{ r->o.p = a₀byte(r->o.p, r->o.e, b); }while(0) #define putdata(d, sz) do { r->o.p = a₀data(r->o.p, r->o.e, d, sz); }while(0) #define puti16(i) do{ r->o.p = a₀i16(r->o.p, r->o.e, i); }while(0) #define puti24(i) do{ r->o.p = a₀i24(r->o.p, r->o.e, i); }while(0) #define puti32(i) do{ r->o.p = a₀i32(r->o.p, r->o.e, i); }while(0) #define putnum(v) do{ r->o.p = a₀num(r->o.p, r->o.e, v); }while(0) #define putstr(s) do{ r->o.p = a₀str(r->o.p, r->o.e, s); }while(0) #define putarr() do{ r->o.p = a₀arr(r->o.p, r->o.e); }while(0) #define putobj() do{ r->o.p = a₀obj(r->o.p, r->o.e); }while(0) #define putend() do{ r->o.p = a₀end(r->o.p, r->o.e); }while(0) #define putkvnum(name, v) do{ r->o.p = a₀kvnum(r->o.p, r->o.e, name, v); }while(0) #define putkvstr(name, s) do{ r->o.p = a₀kvstr(r->o.p, r->o.e, name, s); }while(0) #define putkvbool(name, s) do{ r->o.p = a₀kvbool(r->o.p, r->o.e, name, s); }while(0) #define putcommand(name, cb_) do { \ putstr(name); \ putnum(r->o.msg.cmd.tid); \ putobj(); \ r->o.msg.cmd.cb = cb_; \ }while(0) #define putcall(name) do { \ putstr(name); \ putnum(r->o.msg.cmd.tid); \ }while(0) static int szs[] = { [Type3] = 0, [Type2] = 3, [Type1] = 7, [Type0] = 11, }; static char *msgtype2s[] = { [SetChunkSize] = "SetChunkSize", [Abort] = "Abort", [Ack] = "Ack", [UserControl] = "UserControl", [WindowAckSize] = "WindowAckSize", [SetBandwidth] = "SetBandwidth", [Audio] = "Audio", [Video] = "Video", [AMF3Metadata] = "AMF3Metadata", [AMF3SharedObject] = "AMF3SharedObject", [AMF3Command] = "AMF3Command", [AMF0Metadata] = "AMF0Metadata", [AMF0SharedObject] = "AMF0SharedObject", [AMF0Command] = "AMF0Command", [Aggregate] = "Aggregate", }; static char *ctl2s[] = { [CtlStreamBegin] = "StreamBegin", [CtlStreamEOF] = "StreamEOF", [CtlStreamDry] = "StreamDry", [CtlSetBufferLen] = "SetBufferLen", [CtlStreamIsRecorded] = "StreamIsRecorded", [CtlPingRequest] = "PingRequest", [CtlPingResponse] = "PingResponse", }; static char *bwlimit2s[] = { [LimitHard] = "hard", [LimitSoft] = "soft", [LimitDynamic] = "dynamic", }; static char *pubtype2s[] = { [PubLive] = "live", [PubAppend] = "append", [PubRecord] = "record", }; extern int debug; static void newmsg(RTMP *r, int type, int fmt, int cs) { memset(&r->o.msg, 0, sizeof(r->o.msg)); r->o.msg.type = type; r->o.msg.fmt = fmt; r->o.msg.cs = cs; r->o.p = r->o.b; if(type == AMF0Command) r->o.msg.cmd.tid = ++r->cmds.tid; else r->o.msg.cmd.tid = 0; r->o.msg.cmd.cs = cs; } static void notransaction(RTMP *r) { r->o.msg.cmd.tid = 0; } static void bextend(Buffer *b, int bsz) { u8int *ob; if(b->bsz >= bsz) return; ob = b->b; b->b = erealloc(ob, bsz*2); if(ob != nil) b->p = b->b + (intptr)(ob - b->p); b->bsz = bsz*2; b->e = b->b + b->bsz; } static int rtmprecv(RTMP *r) { int hsz, len, n, msid; u8int *h, *e, byte; u32int ts; memset(&r->i.msg, 0, sizeof(r->i.msg)); r->i.p = r->i.b; if((n = readn(r->fd, &byte, 1)) != 1){ if(n == 0) werrstr("eof"); goto err; } r->i.msg.fmt = (byte & 0xc0)>>6; r->i.msg.cs = byte & 0x3f; n = r->i.msg.cs + 1; if(n <= 2){ if(readn(r->fd, r->i.p, n) != n) goto err; r->i.msg.cs = 64 + r->i.p[0]; if(n == 2) r->i.msg.cs += 256 * r->i.p[1]; } hsz = szs[r->i.msg.fmt]; if(readn(r->fd, r->i.p, hsz) != hsz) goto err; h = r->i.p; e = r->i.p + hsz; r->i.msg.type = -1; msid = 0; ts = 0; len = 0; if(hsz >= szs[Type2]){ h = a₀i24get(h, e, (s32int*)&ts); /* FIXME proper timestamps? */ if(hsz >= szs[Type1]){ h = a₀i24get(h, e, &len); h = a₀byteget(h, e, &byte); r->i.msg.type = byte; if(hsz >= szs[Type0]) h = a₀i32leget(h, e, &msid); } } if(ts == 0xffffff){ /* exntended timestamp */ if(readn(r->fd, h, 4) != 4) goto err; h = a₀i32get(h, h+4, (s32int*)&ts); } /* FIXME do all consecutive chunks use Type3? */ bextend(&r->i, len); r->i.msg.data = h; r->i.msg.sz = len; for(;;){ n = min(len, r->chunkin); if(readn(r->fd, h, n) != n) goto err; len -= n; h += n; if(len < 1) break; if(readn(r->fd, h, 1) != 1) goto err; if((r->i.msg.cs | Type3<<6) != *h){ werrstr("cs/fmt does not match: %02x", *h); goto err; } } return 0; err: werrstr("rtmprecv: %r"); return -1; } static int rtmpsend(RTMP *r) { u8int *p, *h, *e, hdata[24]; int len, n, hsz; Command *c; Message *m; if(r->o.p == nil) goto err; m = &r->o.msg; m->data = r->o.b; m->sz = r->o.p - r->o.b; h = hdata; *h++ = m->fmt<<6 | m->cs; hsz = szs[m->fmt]; e = h + hsz; if(hsz >= szs[Type2]){ h = a₀i24(h, e, m->ts); if(hsz >= szs[Type1]){ h = a₀i24(h, e, m->sz); h = a₀byte(h, e, m->type); if(hsz >= szs[Type0]) h = a₀i32le(h, e, m->sid); } } assert(h != nil); memset(h, 0, e-h); if(Bwrite(r, hdata, h-hdata) < 0) goto err; for(p = m->data, len = m->sz; len > 0;){ n = min(len, r->chunkout); if(Bwrite(r, p, n) < 0) goto err; p += n; len -= n; if(len > 0){ *h = m->cs | Type3<<6; Bputc(r, *h); } } if(Bflush(r) < 0) goto err; if(debug){ fprint(2, "← %M", m); if(m->type == AMF0Command){ A₀ *a; u8int *s, *e; fprint(2, ":"); s = m->data; e = s + m->sz; for(; s != nil && s != e;){ if((s = a₀parse(&a, s, e)) != nil) fprint(2, " %A", a); else fprint(2, " %r"); a₀free(a); } } fprint(2, "\n"); } if(m->type == AMF0Command){ c = emalloc(sizeof(*c)); *c = m->cmd; assert(c->cb != nil); if((c->next = r->cmds.w) != nil) c->next->prev = c; r->cmds.w = c; } return 0; err: werrstr("rtmpsend: %r"); return -1; } static int pong(RTMP *r, s32int n) { newmsg(r, UserControl, Type0, CSUserCtl); puti16(CtlPingResponse); puti32(n); return rtmpsend(r); } static int setchunksz(RTMP *r, int sz) { int n; newmsg(r, SetChunkSize, Type0, CSUserCtl); puti32(sz); n = rtmpsend(r); r->chunkout = sz; return n; } static void loop(void *aux) { int res, n, ok, i; A₀ *a[NumCb], *v; u8int *s, *e; s16int s16; Message *m; Command *c; char *k; RTMP *r; r = aux; m = &r->i.msg; res = 0; memset(a, 0, sizeof(a)); for(;;){ for(n = 0; n < nelem(a); n++) a₀free(a[n]); memset(a, 0, sizeof(a)); if(res != 0 || (res = rtmprecv(r)) != 0){ if(debug) fprint(2, "rtmp loop: %r\n"); for(n = 0; n < nelem(a); n++) a₀free(a[n]); break; } s = m->data; e = s + m->sz; qlock(r); if(debug) fprint(2, "→ %M", m); switch(m->type){ case AMF0Command: c = nil; ok = 1; for(n = 0; n < NumCb; n++){ if((s = a₀parse(&a[n], s, e)) == nil) goto err; switch(n){ case CbCommand: if(a[n]->type != Tstr){ werrstr("command name is not a string: %A", a[n]); goto err; } if(strcmp(a[n]->str, "_error") == 0) ok = 0; /* other values: "_result", etc */ break; case CbTransID: if(a[n]->type != Tnum){ werrstr("transaction ID is not a number"); goto err; } if(a[n]->num == 0) /* no transaction, will try matching with response */ break; for(c = r->cmds.w; c != nil; c = c->next){ /* transaction id match */ if(c->tid == a[n]->num) break; } break; case CbResponse: if(a[CbTransID]->num != 0) /* should have matches with the transaction */ break; if(a[n]->type != Tobj) break; for(i = 0; i < a[n]->obj.n; i++){ k = a[n]->obj.k[i]; v = a[n]->obj.v[i]; if(strcmp(k, "code") == 0 && v->type == Tstr){ for(c = r->cmds.w; c != nil; c = c->next){ if(c->code != nil && strcmp(c->code, v->str) == 0) break; } break; } } break; } } if(debug) fprint(2, " tid=%A: %A %A %A\n", a[CbTransID], a[CbCommand], a[CbObject], a[CbResponse]); if(c != nil){ if(c->prev != nil) c->prev->next = c->next; if(c->next != nil) c->next->prev = c->prev; if(r->cmds.w == c) r->cmds.w = c->next; c->cb(r, ok, a, c->aux); free(c); }else if(a[CbTransID] != nil){ fprint(2, "response/command with no handler (transaction %d)\n", (int)a[CbTransID]->num); } break; case SetChunkSize: if(a₀i32get(s, e, &r->chunkin) == nil) goto err; if(r->chunkin < 2){ werrstr("invalid chunk size: %d", r->chunkin); goto err; } if(debug) fprint(2, ": %d\n", r->chunkin); break; case UserControl: if((s = a₀i16get(s, e, &s16)) == nil) goto err; if(a₀i32get(s, e, &n) == nil) n = -1; switch(s16){ case CtlStreamBegin: case CtlStreamEOF: case CtlStreamDry: case CtlSetBufferLen: case CtlStreamIsRecorded: if(0){ case CtlPingRequest: if(pong(r, n) != 0) goto err; } if(debug) fprint(2, ": %s %d\n", ctl2s[s16], n); break; default: if(debug) fprint(2, ": ?%d? %d\n", s16, n); break; } break; case WindowAckSize: /* FIXME send acks too */ if(a₀i32get(s, e, &r->winacksz) == nil) goto err; if(debug) fprint(2, ": %d\n", r->winacksz); break; case SetBandwidth: if((s = a₀i32get(s, e, &r->bw)) == nil || a₀byteget(s, e, &r->bwlimit) == nil) goto err; if(debug) fprint(2, ": %d (%s)\n", r->bw, r->bwlimit < nelem(bwlimit2s) ? bwlimit2s[r->bwlimit] : "???"); break; /* FIXME */ case Aggregate: case Abort: case Ack: case Audio: case Video: case AMF0Metadata: case AMF0SharedObject: break; case AMF3Metadata: case AMF3SharedObject: case AMF3Command: if(debug) fprint(2, ": ignored\n"); break; err: res = -1; break; } qunlock(r); } chanclose(r->c); threadexitsall(res == 0 ? nil : "error"); } static int handshake(int f) { u8int c[1+CSsz], s[1+CSsz]; c[0] = 3; /* rtmp v3 */ memset(c+1, 0, 4+4); /* timestamp + zero */ prng(c+1+8, CSsz-4-4); if(write(f, c, sizeof(c)) != sizeof(c)) goto err; if(readn(f, s, sizeof(s)) != sizeof(s)) goto err; if(c[0] != s[0]){ werrstr("expected version %d, got %d", c[0], s[0]); goto err; } if(write(f, s+1, CSsz) != CSsz) goto err; if(readn(f, s+1, CSsz) != CSsz) goto err; if(memcmp(c, s, sizeof(c)) != 0){ werrstr("C1 != S2"); goto err; } return 0; err: werrstr("handshake: %r"); return -1; } static void streamcreated(RTMP *, int ok, A₀ *a[NumCb], void *aux) { Channel *sid; sid = aux; if(strcmp(a[CbCommand]->str, "_result") != 0) fprint(2, "createStream: expected '_result', got %#q\n", a[CbCommand]->str); else if(a[CbResponse]->type != Tnum) fprint(2, "createStream: expected stream ID, got NaN\n"); else if(!ok) fprint(2, "createStream: %A\n", a[CbResponse]); else sendul(sid, (ulong)a[CbResponse]->num); chanclose(sid); } int rtmpstream(RTMP *r, ulong *sid) { Channel *c; int n; c = chancreate(sizeof(ulong), 0); qlock(r); newmsg(r, AMF0Command, Type0, CSCtl); putcall("createStream"); putnull(); r->o.msg.cmd.cb = streamcreated; r->o.msg.cmd.aux = c; n = rtmpsend(r); qunlock(r); n = (n == 0 && recv(c, sid) == 1) ? 0 : -1; chanfree(c); return n; } static void streampublished(RTMP *, int ok, A₀ *a[NumCb], void *aux) { Channel *err; err = aux; if(strcmp(a[CbCommand]->str, "onStatus") != 0) fprint(2, "streampublished: expected 'onStatus', got %#q\n", a[CbCommand]->str); else if(a[CbResponse]->type != Tobj) fprint(2, "streampublished: expected object, got something else\n"); else if(ok) sendp(err, nil); chanclose(err); } int rtmppublish(RTMP *r, ulong sid, int type, char *name) { Channel *c; char *e; int n; if(type < 0 || type >= nelem(pubtype2s)){ werrstr("invalid publish type %d", type); return -1; } if(name == nil && (name = r->inst) == nil){ werrstr("no name to publish to"); return -1; } c = chancreate(sizeof(char*), 0); qlock(r); newmsg(r, AMF0Command, Type0, CSData); putcall("publish"); putnull(); putstr(name); putstr(pubtype2s[type]); r->o.msg.cmd.cb = streampublished; r->o.msg.cmd.aux = c; r->o.msg.cmd.code = "NetStream.Publish.Start"; r->o.msg.sid = sid; n = rtmpsend(r); qunlock(r); e = nil; n = (n == 0 && recv(c, &e) == 1) ? 0 : -1; chanfree(c); if(e != nil){ werrstr("%s", e); free(c); } return (n == 0 && e == nil) ? 0 : -1; } int rtmpmeta(RTMP *r, ulong sid, int vcodec, int w, int h, int acodec) { int res; assert(vcodec < 0 || vcodec == VcodecH264); assert(acodec < 0 || acodec == AcodecAAC); qlock(r); newmsg(r, AMF0Metadata, Type0, CSData); putstr("@setDataFrame"); putstr("onMetaData"); putarr(); puti32(2 + (vcodec < 0 ? 0 : 3) + (acodec < 0 ? 0 : 1)); putkvnum("duration", 0); putkvnum("fileSize", 0); if(vcodec >= 0){ putkvnum("videocodecid", vcodec); putkvnum("width", w); putkvnum("height", h); } if(acodec >= 0) putkvnum("audiocodecid", acodec); putend(); r->o.msg.sid = sid; res = rtmpsend(r); qunlock(r); return res; } static int nalsz(u8int *p, int sz, int *csz) { int n; *csz = 0; for(n = 0; n < sz-3;){ if(p[n] == 0 && p[n+1] == 0){ if(p[n+2] == 1){ *csz += 3; n += 3; }else if(p[n+2] == 0 && p[n+3] == 1){ *csz += 4; n += 4; }else break; }else break; } for(; n < sz-3; n++){ if(p[n] == 0 && p[n+1] == 0){ if(p[n+2] == 1) return n; else if(p[n+2] == 0 && p[n+3] == 1) return n; } } return sz; } static int h264data(RTMP *r, ulong sid, u32int ts, u8int *p, int sz) { u8int *p₀, sps[128], pps[128], ps[16+sizeof(sps)+sizeof(pps)]; int sz₀, csz, nsz, ntype, spssz, ppssz, key, total; sz₀ = sz; p₀ = p; spssz = 0; ppssz = 0; key = 0; for(total = 0; sz > 0; total += 4+nsz, p += nsz, sz -= nsz){ nsz = nalsz(p, sz, &csz); p += csz; sz -= csz; nsz -= csz; ntype = *p & 0x1f; if(ntype == 7){ memmove(sps, p, nsz); spssz = nsz; } if(ntype == 8){ memmove(pps, p, nsz); ppssz = nsz; } if(ntype == 5) key = 1; } if(spssz > 0 && ppssz > 0 && !r->sps){ newmsg(r, Video, Type0, CSData); r->o.msg.ts = 0; r->o.msg.sid = sid; putbyte(0x10 | VcodecH264); putbyte(DataHdr); puti24(0); p = ps; *p++ = 1; /* version */ *p++ = sps[1]; /* profile */ *p++ = sps[2]; /* compatibility */ *p++ = sps[3]; /* level */ *p++ = 0xfc | 3; /* reserved (6 bits), NULA length size - 1 (2 bits) */ *p++ = 0xe0 | 1; /* reserved (3 bits), num of SPS (5 bits) */ *p++ = spssz >> 8; *p++ = spssz; p = (u8int*)memmove(p, sps, spssz) + spssz; *p++ = 1; *p++ = ppssz >> 8; *p++ = ppssz; p = (u8int*)memmove(p, pps, ppssz) + ppssz; putdata(ps, p-ps); if(rtmpsend(r) < 0) return -1; r->sps = 1; } bextend(&r->o, 64+total); newmsg(r, Video, Type0, CSData); r->o.msg.ts = ts; r->o.msg.sid = sid; putbyte((key ? 0x10 : 0x20) | VcodecH264); putbyte(DataFrame); puti24(0); for(p = p₀, sz = sz₀; sz > 0; p += nsz, sz -= nsz){ nsz = nalsz(p, sz, &csz); p += csz; sz -= csz; nsz -= csz; puti32(nsz); putdata(p, nsz); } return rtmpsend(r); } static int aacdata(RTMP *r, ulong sid, u32int ts, u8int *p, int sz) { int chanc, ratei, objt; u16int x; if(sz < 7){ werrstr("aac frame too small"); return -1; } if(!r->aacpd){ newmsg(r, Audio, Type0, CSData); r->o.msg.ts = 0; r->o.msg.sid = sid; putbyte(AcodecAAC<<4 | 0xf); putbyte(DataHdr); objt = (p[2]>>6) + 1; ratei = (p[2]>>2) & 7; chanc = (p[2]&3)<<2 | p[3]>>6; if(chanc > 7){ werrstr("channel config out of range: %d", chanc); return -1; }else if(ratei > 12){ werrstr("invalid rate config: %d", ratei); return -1; }else if(objt > 4){ werrstr("object type out of range: %d", objt); return -1; } x = chanc<<3 | ratei<<7 | objt<<11; putbyte(x>>8); putbyte(x); /* FIXME wtf */ putbyte(0x56); putbyte(0xe5); putbyte(0x00); if(rtmpsend(r) < 0) return -1; r->aacpd = 1; } bextend(&r->o, sz); newmsg(r, Audio, Type0, CSData); r->o.msg.ts = ts; r->o.msg.sid = sid; putbyte(AcodecAAC<<4 | 0xf); putbyte(DataFrame); putdata(p+7, sz-7); return rtmpsend(r); } int rtmpdata(RTMP *r, ulong sid, u32int ts, int type, void *p, int sz) { int res; assert(type == Taudio || type == Tvideo); qlock(r); res = (type == Tvideo ? h264data : aacdata)(r, sid, ts, p, sz); qunlock(r); return res; } static void connected(RTMP *r, int ok, A₀ *a[NumCb], void *) { char *s; s = nil; if(ok){ if(strcmp(a[CbCommand]->str, "_result") != 0) s = smprint("expected '_result', got %#q", a[CbCommand]->str); else setchunksz(r, ChunkDesired); }else{ s = smprint("%A", a[CbResponse]); } sendp(r->c, s); } static int connect(RTMP *r) { newmsg(r, AMF0Command, Type0, CSCtl); putcommand("connect", connected); putkvstr("app", r->app); putkvstr("tcUrl", r->tcurl); putkvstr("type", "nonprivate"); putkvbool("fpad", 0); /* no proxy */ putkvnum("audioCodecs", 0x4 | 0x400); /* mp3 + aac */ putkvnum("videoCodecs", 0x80); /* h.264 */ putkvnum("videoFunction", 0); /* no frame-accurate seek */ putkvnum("objectEncoding", 0); /* AMF0 */ putend(); return rtmpsend(r); } static int msgtypefmt(Fmt *f) { char *s; int t; if((t = va_arg(f->args, int)) >= 0 && t < nelem(msgtype2s) && (s = msgtype2s[t]) != nil) return fmtprint(f, "%s", s); return fmtprint(f, "%d", t); } static int msgfmt(Fmt *f) { Message *m; m = va_arg(f->args, Message*); fmtprint(f, "type=%T cs=%d ts=%ud sz=%d", m->type, m->cs, m->ts, m->sz); return 0; } static void rtmpfree(RTMP *r) { free(r->i.b); free(r->o.b); free(r->tcurl); free(r->app); free(r->inst); free(r->path); if(r->c != nil) chanfree(r->c); Bterm(r); free(r); } RTMP * rtmpdial(char *url) { char *s, *e, *p, *app, *inst, *path; int f, port, ctl; RTMP *r; fmtinstall('A', a₀fmt); fmtinstall('T', msgtypefmt); fmtinstall('M', msgfmt); quotefmtinstall(); r = nil; f = -1; url = estrdup(url); /* since we're changing it in-place */ if(memcmp(url, "rtmp://", 7) != 0){ werrstr("invalid url"); goto err; } s = url + 7; if((e = strpbrk(s, ":/")) == nil){ werrstr("no path"); goto err; } port = 1935; if(*e == ':'){ if((port = strtol(e+1, &p, 10)) < 1 || p == e+1 || *p != '/'){ werrstr("invalid port"); goto err; } }else{ p = e; } while(*(++p) == '/'); s = smprint("tcp!%.*s!%d", (int)(e-s), s, port); f = dial(s, nil, nil, &ctl); free(s); if(f < 0) goto err; /* rtmp://host:port/app[/inst[/path]] */ app = p; inst = nil; path = nil; if((s = strchr(p, '/')) != nil){ /* app instance */ *s = 0; inst = s+1; if((s = strchr(s+1, '/')) != nil){ /* path */ *s = 0; path = s+1; } } if(handshake(f) != 0) goto err; r = ecalloc(1, sizeof(*r)); r->fd = f; r->chunkin = ChunkDefault; r->chunkout = ChunkDefault; r->tcurl = url; url = nil; r->c = chancreate(sizeof(void*), 0); r->app = estrdup(app); r->inst = inst == nil ? nil : estrdup(inst); r->path = path == nil ? nil : estrdup(path); bextend(&r->i, Bufsz); bextend(&r->o, Bufsz); Binits(r, f, OWRITE, r->biobuf, sizeof(r->biobuf)); if(connect(r) != 0) goto err; if(proccreate(loop, r, mainstacksize) < 0) goto err; /* wait for the connect call to finish */ if((s = recvp(r->c)) != nil){ rtmpclose(r); werrstr("rtmpdial: %s", s); free(s); r = nil; } return r; err: werrstr("rtmpdial: %r"); if(r != nil) rtmpfree(r); if(f >= 0) close(f); free(url); return nil; } void rtmpclose(RTMP *r) { if(r == nil) return; if(r->fd >= 0) close(r->fd); if(r->c != nil) recvp(r->c); rtmpfree(r); }