ref: 885679133f98b041dd358a7eb92550acaef5f325
parent: 9bd769a638c5e83ca90e71686a46763f90838ea8
author: Paul Brossier <piem@piem.org>
date: Thu Sep 29 21:54:50 EDT 2016
src/effects/timestretch_rubberband.c: add initial pthread support
--- a/src/effects/timestretch_rubberband.c
+++ b/src/effects/timestretch_rubberband.c
@@ -33,6 +33,15 @@
#define MIN_STRETCH_RATIO 0.025
#define MAX_STRETCH_RATIO 40.
+#define HAVE_THREADS 1
+#if 0
+#undef HAVE_THREADS
+#endif
+
+#ifdef HAVE_THREADS
+#include <pthread.h>
+#endif
+
/** generic time stretching structure */
struct _aubio_timestretch_t
{
@@ -48,6 +57,14 @@
RubberBandState rb;
RubberBandOptions rboptions;
+
+#ifdef HAVE_THREADS
+ pthread_t read_thread;
+ pthread_mutex_t read_mutex;
+ pthread_cond_t read_avail;
+ pthread_cond_t read_request;
+ sint_t available;
+#endif
};
extern RubberBandOptions aubio_get_rubberband_opts(const char_t *mode);
@@ -54,6 +71,9 @@
static void aubio_timestretch_warmup (aubio_timestretch_t * p);
static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch);
+#ifdef HAVE_THREADS
+static void *aubio_timestretch_readfn(void *p);
+#endif
aubio_timestretch_t *
new_aubio_timestretch (const char_t * uri, const char_t * mode,
@@ -62,6 +82,7 @@
aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
p->samplerate = samplerate;
p->hopsize = hopsize;
+ //p->source_hopsize = 2048;
p->source_hopsize = hopsize;
p->pitchscale = 1.;
p->eof = 0;
@@ -90,7 +111,19 @@
rubberband_set_max_process_size(p->rb, p->source_hopsize);
//rubberband_set_debug_level(p->rb, 10);
+#ifdef HAVE_THREADS
+ pthread_mutex_init(&p->read_mutex, 0);
+ pthread_cond_init (&p->read_avail, 0);
+ pthread_cond_init (&p->read_request, 0);
+ pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
+ //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
+ pthread_mutex_lock(&p->read_mutex);
+ pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ pthread_mutex_unlock(&p->read_mutex);
+ //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
+#else
aubio_timestretch_warmup(p);
+#endif
return p;
@@ -99,17 +132,84 @@
return NULL;
}
+#ifdef HAVE_THREADS
+void *
+aubio_timestretch_readfn(void *z)
+{
+ aubio_timestretch_t *p = z;
+ // signal main-thread when we are done
+ //AUBIO_WRN("timestretch: read_thread locking, got %d available\n", p->available);
+ pthread_mutex_lock(&p->read_mutex);
+ aubio_timestretch_warmup(p);
+ //AUBIO_WRN("timestretch: signaling warmup\n");
+ pthread_cond_signal(&p->read_avail);
+ //AUBIO_WRN("timestretch: unlocking in readfn\n");
+ pthread_mutex_unlock(&p->read_mutex);
+ AUBIO_WRN("timestretch: entering readfn loop\n");
+ while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
+ //AUBIO_WRN("timestretch: locking in readfn\n");
+ pthread_mutex_lock(&p->read_mutex);
+ p->available = aubio_timestretch_fetch(p, p->hopsize);
+ //AUBIO_WRN("timestretch: read_thread read %d\n", p->available);
+ // signal main-thread when we are done
+ //AUBIO_WRN("timestretch: signaling new read\n");
+ pthread_cond_signal(&p->read_avail);
+ if (p->eof != 1) {
+ pthread_cond_wait(&p->read_request, &p->read_mutex);
+ }
+ if (p->eof == 1) {
+ AUBIO_WRN("timestretch: read_thread eof reached %d, %d/%d\n", p->available,
+ p->hopsize, p->source_hopsize);
+ pthread_mutex_unlock(&p->read_mutex);
+ break;
+ }
+ //AUBIO_WRN("timestretch: unlocking in readfn\n");
+ pthread_mutex_unlock(&p->read_mutex);
+ }
+#if 1
+ pthread_mutex_lock(&p->read_mutex);
+ //AUBIO_WRN("timestretch: signaling end\n");
+ pthread_cond_signal(&p->read_avail);
+ pthread_mutex_unlock(&p->read_mutex);
+#endif
+ //AUBIO_WRN("timestretch: exiting readfn\n");
+ pthread_exit(NULL);
+}
+#endif
+
static void
aubio_timestretch_warmup (aubio_timestretch_t * p)
{
// warm up rubber band
unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
+#ifdef HAVE_THREADS
+ p->available = aubio_timestretch_fetch(p, latency);
+#else
aubio_timestretch_fetch(p, latency);
+#endif
}
void
del_aubio_timestretch (aubio_timestretch_t * p)
{
+#ifdef HAVE_THREADS
+ pthread_mutex_lock(&p->read_mutex);
+ pthread_cond_signal(&p->read_request);
+ //pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ pthread_mutex_unlock(&p->read_mutex);
+#if 1
+ void *threadfn;
+ if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
+ AUBIO_WRN("timestretch: cancelling thread failed\n");
+ }
+ if (pthread_join(p->read_thread, &threadfn)) {
+ AUBIO_WRN("timestretch: joining thread failed\n");
+ }
+#endif
+ pthread_mutex_destroy(&p->read_mutex);
+ pthread_cond_destroy(&p->read_avail);
+ pthread_cond_destroy(&p->read_request);
+#endif
if (p->in) del_fvec(p->in);
if (p->source) del_aubio_source(p->source);
if (p->rb) {
@@ -204,7 +304,20 @@
void
aubio_timestretch_do (aubio_timestretch_t * p, fvec_t * out, uint_t * read)
{
+#ifndef HAVE_THREADS
int available = aubio_timestretch_fetch(p, p->hopsize);
+#else /* HAVE_THREADS */
+ int available;
+ pthread_mutex_lock(&p->read_mutex);
+ if (p->eof != 1) {
+ // signal a read request
+ pthread_cond_signal(&p->read_request);
+ // wait for an available signal
+ pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ } else {
+ available = rubberband_available(p->rb);
+ }
+#endif /* HAVE_THREADS */
// now retrieve the samples and write them into out->data
if (available >= (int)p->hopsize) {
rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize);
@@ -216,14 +329,37 @@
fvec_zeros(out);
*read = 0;
}
+#ifdef HAVE_THREADS
+ pthread_mutex_unlock(&p->read_mutex);
+#endif
}
uint_t
aubio_timestretch_seek (aubio_timestretch_t *p, uint_t pos)
{
+ uint_t err = AUBIO_OK;
+#if HAVE_THREADS
+ AUBIO_WRN("timestretch: seek_ waiting for warmup, got %d available\n", p->available);
+ pthread_mutex_lock(&p->read_mutex);
+#endif
p->eof = 0;
rubberband_reset(p->rb);
- return aubio_source_seek(p->source, pos);
+ err = aubio_source_seek(p->source, pos);
+#if HAVE_THREADS
+ p->available = 0;
+ void *threadfn;
+ if ((p->eof == 0) && (pthread_cancel(p->read_thread) == 0)) {
+ AUBIO_WRN("timestretch: cancelling thread failed\n");
+ }
+ if (pthread_join(p->read_thread, &threadfn)) {
+ AUBIO_WRN("timestretch: joining thread failed\n");
+ }
+ pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
+ pthread_cond_wait(&p->read_avail, &p->read_mutex);
+ pthread_mutex_unlock(&p->read_mutex);
+ //AUBIO_WRN("timestretch: seek_ warm up success, got %d available\n", p->available);
+#endif
+ return err;
}
#endif