shithub: rtmp

Download patch

ref: 996f6e5c036260a18fd4c46405027ef4a23a7923
parent: f867fbf66e9843d74161ec57fe12980feb9aedce
author: Sigrid Solveig Haflínudóttir <ftrvxmtrx@gmail.com>
date: Thu Jul 29 09:18:50 EDT 2021

createStream

--- a/amf0.c
+++ b/amf0.c
@@ -228,6 +228,7 @@
 	case Tarr:
 		for(i = 0; i < a->arr.n; i++)
 			amf0free(a->arr.v[i]);
+	case Tnull:
 	case Tnum:
 	case Tbool:
 		break;
@@ -258,7 +259,6 @@
 	case Anull:
 		a->type = Tnull;
 		break;
-
 	case Anum:
 		atleast("num", 8);
 		for(n = 0, x.u = 0; n < 8; n++)
@@ -376,6 +376,9 @@
 		break;
 	case Tbool:
 		fmtprint(f, a->bool ? "true" : "false");
+		break;
+	case Tnull:
+		fmtprint(f, "null");
 		break;
 	default:
 		sysfatal("unknown amf0 type %d", a->type);
--- a/amf0.h
+++ b/amf0.h
@@ -1,10 +1,10 @@
 enum {
-	Tnull,
 	Tstr,
 	Tnum,
 	Tbool,
 	Tarr,
 	Tobj,
+	Tnull,
 };
 
 typedef struct Amf0 Amf0;
--- a/main.c
+++ b/main.c
@@ -57,6 +57,9 @@
 	srand(time(nil));
 	if((r = rtmpdial(argv[0])) == nil)
 		sysfatal("%r");
+	ulong sid;
+	if(rtmpstream(r, &sid) == 0)
+		fprint(2, "stream: %lud\n", sid);
 
 	while(1)
 		sleep(100);
--- a/rtmp.c
+++ b/rtmp.c
@@ -83,6 +83,7 @@
 	int type;
 	int fmt;
 	int cs;
+	int sid;
 	u32int ts;
 	u8int *data;
 	int sz;
@@ -91,6 +92,7 @@
 
 struct RTMP {
 	Biobufhdr;
+	QLock;
 	Channel *c;
 	char *app;
 	char *path;
@@ -112,6 +114,7 @@
 	u8int biobuf[Biobufsz];
 };
 
+#define putnull() do{ r->p = amf0null(r->p, r->e); }while(0)
 #define puti16(i) do{ r->p = amf0i16(r->p, r->e, i); }while(0)
 #define puti24(i) do{ r->p = amf0i24(r->p, r->e, i); }while(0)
 #define puti32(i) do{ r->p = amf0i32(r->p, r->e, i); }while(0)
@@ -213,13 +216,13 @@
 
 	r->p = r->b;
 	if(readn(r->i, r->p, 1) != 1)
-		goto err;
+		goto eof;
 	r->msg.fmt = (r->p[0] & 0xc0)>>6;
 	r->msg.cs = r->p[0] & 0x3f;
 	n = r->msg.cs + 1;
 	if(n <= 2){
 		if(readn(r->i, r->p, n) != n)
-			goto err;
+			goto eof;
 		r->msg.cs = 64 + r->p[0];
 		if(n == 2)
 			r->msg.cs += 256 * r->p[1];
@@ -227,7 +230,7 @@
 
 	hsz = szs[r->msg.fmt];
 	if(readn(r->i, r->p, hsz) != hsz)
-		goto err;
+		goto eof;
 
 	h = r->p;
 	e = r->p + hsz;
@@ -260,7 +263,7 @@
 	for(;;){
 		n = min(len, r->chunkin);
 		if(readn(r->i, h, n) != n)
-			goto err;
+			goto eof;
 		len -= n;
 		h += n;
 		if(len < 1)
@@ -274,7 +277,8 @@
 	}
 
 	return 0;
-
+eof:
+	werrstr("eof");
 err:
 	werrstr("rtmprecv: %r");
 	return -1;
@@ -303,7 +307,7 @@
 			h = amf0i24(h, e, r->msg.sz);
 			h = amf0byte(h, e, r->msg.type);
 			if(hsz >= szs[Type0])
-				h = amf0i32(h, e, 0); /* FIXME message stream id */
+				h = amf0i32(h, e, r->msg.sid);
 		}
 	}
 	assert(h != nil);
@@ -418,6 +422,8 @@
 			amf0free(a[n]);
 		memset(a, 0, sizeof(a));
 
+		qlock(r);
+
 		if(res != 0 || (res = rtmprecv(r)) != 0){
 			if(debug)
 				fprint(2, "rtmp loop: %r\n");
@@ -442,7 +448,7 @@
 				switch(n){
 				case CbCommand:
 					if(a[n]->type != Tstr){
-						werrstr("command name is not a string");
+						werrstr("command name is not a string: %A", a[n]);
 						goto err;
 					}
 					if(strcmp(a[n]->str, "_error") == 0)
@@ -545,9 +551,13 @@
 			res = -1;
 			break;
 		}
+
+		qunlock(r);
 	}
 
+	qunlock(r);
 	rtmpfree(r);
+
 	threadexitsall(res == 0 ? nil : "error");
 }
 
@@ -581,6 +591,50 @@
 err:
 	werrstr("handshake: %r");
 	return -1;
+}
+
+static void
+streamcreated(RTMP *, int ok, Amf0 *a[NumCb], void *aux)
+{
+	Channel *sid;
+
+	sid = aux;
+	if(strcmp(a[CbCommand]->str, "_result") != 0)
+		fprint(2, "createStream: expected '_result', got %#q\n", a[CbCommand]->str);
+	else if(a[CbResponse]->type != Tnum)
+		fprint(2, "createStream: expected stream ID, got NaN\n");
+	else if(!ok)
+		fprint(2, "createStream: %A\n", a[CbResponse]);
+	else
+		sendul(sid, (ulong)a[CbResponse]->num);
+
+	chanclose(sid);
+}
+
+int
+rtmpstream(RTMP *r, ulong *sid)
+{
+	Channel *c;
+	int n;
+
+	qlock(r);
+
+	newmsg(r, AMF0Command, Type0, CSCtl);
+	putstr("createStream");
+	putnum(r->msg.cmd.tid);
+	putnull();
+
+	c = chancreate(sizeof(ulong), 0);
+	r->msg.cmd.cb = streamcreated;
+	r->msg.cmd.aux = c;
+	n = rtmpsend(r);
+
+	qunlock(r);
+
+	n = (n == 0 && recv(c, sid) == 1) ? 0 : -1;
+	chanfree(c);
+
+	return n;
 }
 
 static void
--- a/rtmp.h
+++ b/rtmp.h
@@ -2,5 +2,7 @@
 
 #pragma incomplete RTMP
 
+int rtmpstream(RTMP *r, ulong *sid);
+
 RTMP *rtmpdial(char *url);
 void rtmpclose(RTMP *r);