ref: 45598638afa30afad96ad3b4a0a8399a775e06e1
parent: bdb249b81da3c1f73413348d727d25f5c88cbe8a
author: Paul Brossier <piem@piem.org>
date: Mon Oct 3 07:20:57 EDT 2016
src/effects/timestretch_rubberband.c: improve threading
--- a/src/effects/timestretch.h
+++ b/src/effects/timestretch.h
@@ -166,6 +166,10 @@
*/
uint_t aubio_timestretch_seek(aubio_timestretch_t * o, uint_t pos);
+uint_t aubio_timestretch_queue (aubio_timestretch_t *p, const char_t *uri, uint_t samplerate);
+
+uint_t aubio_timestretch_get_opened (aubio_timestretch_t *p);
+
#ifdef __cplusplus
}
#endif
--- a/src/effects/timestretch_rubberband.c
+++ b/src/effects/timestretch_rubberband.c
@@ -58,11 +58,16 @@
RubberBandState rb;
RubberBandOptions rboptions;
+ uint_t opened;
+ const char_t *uri;
#ifdef HAVE_THREADS
pthread_t read_thread;
pthread_mutex_t read_mutex;
pthread_cond_t read_avail;
pthread_cond_t read_request;
+ pthread_t open_thread;
+ pthread_mutex_t open_mutex;
+ uint_t open_thread_running;
sint_t available;
uint_t started;
uint_t finish;
@@ -75,6 +80,7 @@
static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch);
#ifdef HAVE_THREADS
static void *aubio_timestretch_readfn(void *p);
+static void *aubio_timestretch_openfn(void *z);
#endif
aubio_timestretch_t *
@@ -82,19 +88,11 @@
smpl_t stretchratio, uint_t hopsize, uint_t samplerate)
{
aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
- p->samplerate = samplerate;
p->hopsize = hopsize;
//p->source_hopsize = 2048;
p->source_hopsize = hopsize;
p->pitchscale = 1.;
- p->eof = 0;
- p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
- if (!p->source) goto beach;
- if (samplerate == 0 ) p->samplerate = aubio_source_get_samplerate(p->source);
-
- p->in = new_fvec(p->source_hopsize);
-
if (stretchratio <= MAX_STRETCH_RATIO && stretchratio >= MIN_STRETCH_RATIO) {
p->stretchratio = stretchratio;
} else {
@@ -109,13 +107,20 @@
goto beach;
}
- p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
- rubberband_set_max_process_size(p->rb, p->source_hopsize);
- //rubberband_set_debug_level(p->rb, 10);
+ p->in = new_fvec(p->source_hopsize);
-#ifdef HAVE_THREADS
+#ifndef HAVE_THREADS
+ if (aubio_timestretch_queue(p, uri, samplerate)) goto beach;
+ aubio_timestretch_warmup(p);
+#else
p->started = 0;
p->finish = 0;
+ p->open_thread_running = 0;
+ //p->uri = uri;
+ p->eof = 0;
+ //p->samplerate = samplerate;
+ //if (aubio_timestretch_open(p, uri, samplerate)) goto beach;
+ pthread_mutex_init(&p->open_mutex, 0);
pthread_mutex_init(&p->read_mutex, 0);
pthread_cond_init (&p->read_avail, 0);
pthread_cond_init (&p->read_request, 0);
@@ -123,11 +128,15 @@
pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
//AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
pthread_mutex_lock(&p->read_mutex);
+ aubio_timestretch_queue(p, uri, samplerate);
+#if 0
pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ if (!p->opened) {
+ goto beach;
+ }
+#endif
pthread_mutex_unlock(&p->read_mutex);
//AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
-#else
- aubio_timestretch_warmup(p);
#endif
return p;
@@ -137,18 +146,154 @@
return NULL;
}
+#define HAVE_OPENTHREAD 1
+//#undef HAVE_OPENTHREAD
+
+uint_t
+aubio_timestretch_queue(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
+{
#ifdef HAVE_THREADS
+#ifdef HAVE_OPENTHREAD
+ if (p->open_thread_running) {
+#if 1
+ if (pthread_cancel(p->open_thread)) {
+ AUBIO_WRN("timestretch: cancelling open thread failed\n");
+ return AUBIO_FAIL;
+ } else {
+ AUBIO_WRN("timestretch: previous open of '%s' cancelled\n", p->uri);
+ }
+ p->open_thread_running = 0;
+#else
+ void *threadfn;
+ if (pthread_join(p->open_thread, &threadfn)) {
+ AUBIO_WRN("timestretch: failed joining existing open thread\n");
+ return AUBIO_FAIL;
+ }
+#endif
+ }
+ //AUBIO_WRN("timestretch: queueing %s\n", uri);
+ //pthread_mutex_lock(&p->read_mutex);
+ p->opened = 0;
+ p->started = 0;
+ p->available = 0;
+ p->uri = uri;
+ p->samplerate = samplerate;
+ //AUBIO_WRN("timestretch: creating thread\n");
+ pthread_create(&p->open_thread, 0, aubio_timestretch_openfn, p);
+#endif
+ //pthread_mutex_unlock(&p->read_mutex);
+ return AUBIO_OK;
+}
+
+uint_t
+aubio_timestretch_open(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
+{
+ uint_t err = AUBIO_FAIL;
+ p->available = 0;
+ pthread_mutex_lock(&p->open_mutex);
+ p->open_thread_running = 1;
+#else
+ uint_t err = AUBIO_FAIL;
+#endif
+ p->opened = 0;
+ if (p->source) del_aubio_source(p->source);
+ p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
+ if (!p->source) goto fail;
+ p->uri = uri;
+ p->samplerate = aubio_source_get_samplerate(p->source);
+ p->eof = 0;
+
+ if (p->rb == NULL) {
+ AUBIO_WRN("timestretch: creating with stretch: %.2f pitchscale: %.2f\n",
+ p->stretchratio, p->pitchscale);
+ p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
+ //rubberband_set_debug_level(p->rb, 10);
+ rubberband_set_max_process_size(p->rb, p->source_hopsize);
+ } else {
+ if (samplerate != p->samplerate) {
+ AUBIO_WRN("timestretch: samplerate change requested, but not implemented\n");
+ }
+ rubberband_reset(p->rb);
+ }
+ p->opened = 1;
+ err = AUBIO_OK;
+ goto unlock;
+fail:
+ p->opened = 2;
+ AUBIO_ERR("timestretch: opening %s failed\n", uri);
+unlock:
+#ifdef HAVE_THREADS
+ p->open_thread_running = 0;
+ pthread_mutex_unlock(&p->open_mutex);
+ //AUBIO_WRN("timestretch: failed opening %s at %dHz\n", uri, samplerate);
+#endif
+ return err;
+}
+
+#ifdef HAVE_THREADS
void *
+aubio_timestretch_openfn(void *z) {
+ aubio_timestretch_t *p = z;
+ int oldtype;
+ pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
+ //AUBIO_WRN("timestretch: creating thread\n");
+ void *ret;
+ uint_t err = aubio_timestretch_open(p, p->uri, p->samplerate);
+ ret = &err;
+ pthread_exit(ret);
+}
+#endif
+
+uint_t
+aubio_timestretch_get_opened(aubio_timestretch_t *p)
+{
+ if (p == NULL) return 0;
+ else return p->opened;
+}
+
+#ifdef HAVE_THREADS
+void *
aubio_timestretch_readfn(void *z)
{
aubio_timestretch_t *p = z;
+ //AUBIO_WRN("timestretch: entering thread with %s at %dHz\n", p->uri, p->samplerate);
while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
//AUBIO_WRN("timestretch: locking in readfn\n");
pthread_mutex_lock(&p->read_mutex);
+#if 1
+ if (p->opened == 2) {
+ pthread_cond_signal(&p->read_avail);
+ } else
+ if (p->opened == 0) {
+#ifdef HAVE_OPENTHREAD
+ //(!aubio_timestretch_open(p, p->uri, p->samplerate)) {
+ void * threadfn;
+ if (p->open_thread_running && pthread_join(p->open_thread, &threadfn)) {
+ AUBIO_WRN("timestretch: failed to join opening thread %s at %dHz in thread "
+ "(opened: %d, playing: %d, eof: %d)\n",
+ p->uri, p->samplerate, p->opened, p->started, p->eof);
+ }
+#else
+ //AUBIO_WRN("timestretch: opening source %s\n", p->uri);
+ if (!aubio_timestretch_open(p, p->uri, p->samplerate)) {
+ AUBIO_WRN("timestretch: opened %s at %dHz in thread "
+ "(opened: %d, playing: %d, eof: %d)\n",
+ p->uri, p->samplerate, p->opened, p->started, p->eof);
+ //pthread_cond_signal(&p->read_avail);
+ } else {
+ AUBIO_WRN("timestretch: failed opening %s, exiting thread\n", p->uri);
+ //pthread_cond_signal(&p->read_avail);
+ //pthread_mutex_unlock(&p->read_mutex);
+ //goto end;
+ }
+#endif
+ } else
if (!p->started && !p->eof) {
+#endif
// fetch the first few samples and mark as started
- //AUBIO_WRN("timestretch: fetching first samples\n");
aubio_timestretch_warmup(p);
+ pthread_cond_signal(&p->read_avail);
+ //pthread_cond_wait(&p->read_request, &p->read_mutex);
p->started = 1;
} else if (!p->eof) {
// fetch at least p->hopsize stretched samples
@@ -171,6 +316,7 @@
//AUBIO_WRN("timestretch: unlocking in readfn\n");
pthread_mutex_unlock(&p->read_mutex);
}
+end:
//AUBIO_WRN("timestretch: exiting readfn\n");
pthread_exit(NULL);
}
@@ -180,6 +326,7 @@
aubio_timestretch_warmup (aubio_timestretch_t * p)
{
// warm up rubber band
+ //AUBIO_WRN("timestretch: warming-up\n");
unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
#ifdef HAVE_THREADS
p->available = aubio_timestretch_fetch(p, latency);
@@ -186,6 +333,7 @@
#else
aubio_timestretch_fetch(p, latency);
#endif
+ //AUBIO_WRN("timestretch: warmup got %d\n", latency);
}
void
@@ -194,6 +342,15 @@
#ifdef HAVE_THREADS
void *threadfn;
//AUBIO_WRN("timestretch: entering delete\n");
+ if (p->open_thread_running) {
+ if (pthread_cancel(p->open_thread)) {
+ AUBIO_WRN("timestretch: cancelling open thread failed\n");
+ }
+ if (pthread_join(p->open_thread, &threadfn)) {
+ AUBIO_WRN("timestretch: joining open thread failed\n");
+ }
+ }
+ if (!p->opened) goto cleanup;
pthread_mutex_lock(&p->read_mutex);
p->finish = 1;
pthread_cond_signal(&p->read_request);
@@ -200,14 +357,15 @@
//pthread_cond_wait(&p->read_avail, &p->read_mutex);
pthread_mutex_unlock(&p->read_mutex);
if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
- AUBIO_WRN("timestretch: cancelling thread failed\n");
+ AUBIO_WRN("timestretch: cancelling thread failed\n");
}
if (pthread_join(p->read_thread, &threadfn)) {
- AUBIO_WRN("timestretch: joining thread failed\n");
+ AUBIO_WRN("timestretch: joining thread failed\n");
}
pthread_mutex_destroy(&p->read_mutex);
pthread_cond_destroy(&p->read_avail);
pthread_cond_destroy(&p->read_request);
+cleanup:
#endif
if (p->in) del_fvec(p->in);
if (p->source) del_aubio_source(p->source);
@@ -230,6 +388,10 @@
uint_t
aubio_timestretch_set_stretch (aubio_timestretch_t * p, smpl_t stretch)
{
+ if (!p->rb) {
+ AUBIO_WRN("timestretch: could not set stretch ratio, rubberband not created\n");
+ return AUBIO_FAIL;
+ }
if (stretch >= MIN_STRETCH_RATIO && stretch <= MAX_STRETCH_RATIO) {
p->stretchratio = stretch;
rubberband_set_time_ratio(p->rb, 1./p->stretchratio);
@@ -249,6 +411,10 @@
uint_t
aubio_timestretch_set_pitchscale (aubio_timestretch_t * p, smpl_t pitchscale)
{
+ if (!p->rb) {
+ AUBIO_WRN("timestretch: could not set pitch scale, rubberband not created\n");
+ return AUBIO_FAIL;
+ }
if (pitchscale >= 0.0625 && pitchscale <= 4.) {
p->pitchscale = pitchscale;
rubberband_set_pitch_scale(p->rb, p->pitchscale);
@@ -287,6 +453,10 @@
aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t length)
{
uint_t source_read = p->source_hopsize;
+ if (p->source == NULL) {
+ AUBIO_ERR("timestretch: trying to fetch on NULL source\n");
+ return 0;
+ }
// read more samples from source until we have enough available or eof is reached
int available = rubberband_available(p->rb);
while ((available < (int)length) && (p->eof == 0)) {
@@ -308,14 +478,28 @@
#else /* HAVE_THREADS */
int available;
pthread_mutex_lock(&p->read_mutex);
+#if 1
+ if (!p->opened) {
+ // this may occur if _do was was called while being opened
+ //AUBIO_WRN("timestretch: calling _do before opening a file\n");
+ pthread_cond_signal(&p->read_request);
+ //available = 0;
+ //pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ available = 0; //p->available;
+ } else
+#endif
if (p->eof != 1) {
+ //AUBIO_WRN("timestretch: calling _do after opening a file\n");
// signal a read request
pthread_cond_signal(&p->read_request);
// wait for an available signal
pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ available = p->available;
} else {
available = rubberband_available(p->rb);
+ //AUBIO_WRN("timestretch: reached eof (%d/%d)\n", p->hopsize, available);
}
+ pthread_mutex_unlock(&p->read_mutex);
#endif /* HAVE_THREADS */
// now retrieve the samples and write them into out->data
if (available >= (int)p->hopsize) {
@@ -322,14 +506,17 @@
rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize);
*read = p->hopsize;
} else if (available > 0) {
+ // this occurs each time the end of file is reached
+ //AUBIO_WRN("timestretch: short read\n");
rubberband_retrieve(p->rb, (float* const*)&(out->data), available);
*read = available;
} else {
+ // this may occur if the previous was a short read available == hopsize
fvec_zeros(out);
*read = 0;
}
#ifdef HAVE_THREADS
- pthread_mutex_unlock(&p->read_mutex);
+ //pthread_mutex_unlock(&p->read_mutex);
#endif
}
@@ -338,15 +525,46 @@
{
uint_t err = AUBIO_OK;
#if HAVE_THREADS
+ if (p == NULL) {
+ AUBIO_WRN("seeking but object not set yet (ignoring)\n");
+ return AUBIO_FAIL;
+ }
pthread_mutex_lock(&p->read_mutex);
+ if (p->open_thread_running) {
+ //AUBIO_WRN("seeking but opening thread not completed yet (ignoring)\n");
+ err = AUBIO_OK;
+ goto beach;
+ }
+ if (!p->opened || !p->source) {
+ //AUBIO_WRN("timestretch: seeking but source not opened yet (ignoring)\n");
+ err = AUBIO_OK;
+ goto beach;
+ }
#endif
p->eof = 0;
- rubberband_reset(p->rb);
- err = aubio_source_seek(p->source, pos);
+ if (p->rb) {
+ rubberband_reset(p->rb);
+ }
+#ifdef HAVE_THREADS
+#ifdef HAVE_OPENTHREAD
+ pthread_mutex_lock(&p->open_mutex);
+#endif
+#endif
+ if (p->source) {
+ err = aubio_source_seek(p->source, pos);
+ } else {
+ AUBIO_WRN("timestretch: seeking but p->source not created?!\n");
+ err = AUBIO_FAIL;
+ goto beach;
+ }
#if HAVE_THREADS
+ pthread_mutex_unlock(&p->open_mutex);
p->available = 0;
p->started = 1;
+beach:
pthread_mutex_unlock(&p->read_mutex);
+#else
+beach:
#endif
return err;
}
--- a/tests/src/effects/test-timestretch.c
+++ b/tests/src/effects/test-timestretch.c
@@ -51,13 +51,53 @@
if (transpose != 0) aubio_timestretch_set_transpose(ps, transpose);
+#if 0
do {
+ if (aubio_timestretch_get_opened(ps) == 0)
+ PRINT_MSG("not opened!\n");
+ aubio_timestretch_get_opened(ps);
aubio_timestretch_set_stretch(ps, stretch);
aubio_timestretch_set_transpose(ps, transpose);
aubio_timestretch_do(ps, out, &read);
+ if (samplerate == 0) {
+ PRINT_MSG("setting samplerate now to %d\n", aubio_timestretch_get_samplerate(ps));
+ samplerate = aubio_timestretch_get_samplerate(ps);
+ aubio_sink_preset_samplerate(o, samplerate);
+ aubio_sink_preset_channels(o, 1);
+ }
aubio_sink_do(o, out, read);
n_frames += read;
} while ( read == hop_size );
+#else
+
+ aubio_timestretch_queue(ps, source_path, samplerate);
+
+ do {
+ aubio_timestretch_get_opened(ps);
+ aubio_timestretch_set_stretch(ps, stretch);
+ aubio_timestretch_set_transpose(ps, transpose);
+ aubio_timestretch_do(ps, out, &read);
+ if (n_frames == 34999 * hop_size) {
+ PRINT_MSG("instant queuing?\n");
+ aubio_timestretch_queue(ps, source_path, samplerate);
+ }
+ if (n_frames == 64999 * hop_size) {
+ PRINT_MSG("instant queuing 2\n");
+ aubio_timestretch_queue(ps, "/dev/null", samplerate);
+ }
+ if (n_frames == 54999 * hop_size) {
+ PRINT_MSG("instant queuing?\n");
+ aubio_timestretch_queue(ps, source_path, samplerate);
+ }
+ if (n_frames == 74999 * hop_size) {
+ PRINT_MSG("instant queuing?\n");
+ aubio_timestretch_queue(ps, source_path, samplerate);
+ }
+ aubio_sink_do(o, out, read);
+ //} while ( read == hop_size );
+ n_frames += hop_size;
+ } while ( n_frames < 100000 * hop_size);
+#endif
PRINT_MSG("wrote %d frames at %dHz (%d blocks) from %s written to %s\n",
n_frames, samplerate, n_frames / hop_size,