shithub: aubio

Download patch

ref: 885679133f98b041dd358a7eb92550acaef5f325
parent: 9bd769a638c5e83ca90e71686a46763f90838ea8
author: Paul Brossier <piem@piem.org>
date: Thu Sep 29 21:54:50 EDT 2016

src/effects/timestretch_rubberband.c: add initial pthread support

--- a/src/effects/timestretch_rubberband.c
+++ b/src/effects/timestretch_rubberband.c
@@ -33,6 +33,15 @@
 #define MIN_STRETCH_RATIO 0.025
 #define MAX_STRETCH_RATIO 40.
 
+#define HAVE_THREADS 1
+#if 0
+#undef HAVE_THREADS
+#endif
+
+#ifdef HAVE_THREADS
+#include <pthread.h>
+#endif
+
 /** generic time stretching structure */
 struct _aubio_timestretch_t
 {
@@ -48,6 +57,14 @@
 
   RubberBandState rb;
   RubberBandOptions rboptions;
+
+#ifdef HAVE_THREADS
+  pthread_t read_thread;
+  pthread_mutex_t read_mutex;
+  pthread_cond_t read_avail;
+  pthread_cond_t read_request;
+  sint_t available;
+#endif
 };
 
 extern RubberBandOptions aubio_get_rubberband_opts(const char_t *mode);
@@ -54,6 +71,9 @@
 
 static void aubio_timestretch_warmup (aubio_timestretch_t * p);
 static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch);
+#ifdef HAVE_THREADS
+static void *aubio_timestretch_readfn(void *p);
+#endif
 
 aubio_timestretch_t *
 new_aubio_timestretch (const char_t * uri, const char_t * mode,
@@ -62,6 +82,7 @@
   aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
   p->samplerate = samplerate;
   p->hopsize = hopsize;
+  //p->source_hopsize = 2048;
   p->source_hopsize = hopsize;
   p->pitchscale = 1.;
   p->eof = 0;
@@ -90,7 +111,19 @@
   rubberband_set_max_process_size(p->rb, p->source_hopsize);
   //rubberband_set_debug_level(p->rb, 10);
 
+#ifdef HAVE_THREADS
+  pthread_mutex_init(&p->read_mutex, 0);
+  pthread_cond_init (&p->read_avail, 0);
+  pthread_cond_init (&p->read_request, 0);
+  pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
+  //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
+  pthread_mutex_lock(&p->read_mutex);
+  pthread_cond_wait(&p->read_avail, &p->read_mutex);
+  pthread_mutex_unlock(&p->read_mutex);
+  //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
+#else
   aubio_timestretch_warmup(p);
+#endif
 
   return p;
 
@@ -99,17 +132,84 @@
   return NULL;
 }
 
+#ifdef HAVE_THREADS
+void *
+aubio_timestretch_readfn(void *z)
+{
+  aubio_timestretch_t *p = z;
+  // signal main-thread when we are done
+  //AUBIO_WRN("timestretch: read_thread locking, got %d available\n", p->available);
+  pthread_mutex_lock(&p->read_mutex);
+  aubio_timestretch_warmup(p);
+  //AUBIO_WRN("timestretch: signaling warmup\n");
+  pthread_cond_signal(&p->read_avail);
+  //AUBIO_WRN("timestretch: unlocking in readfn\n");
+  pthread_mutex_unlock(&p->read_mutex);
+  AUBIO_WRN("timestretch: entering readfn loop\n");
+  while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
+    //AUBIO_WRN("timestretch: locking in readfn\n");
+    pthread_mutex_lock(&p->read_mutex);
+    p->available = aubio_timestretch_fetch(p, p->hopsize);
+    //AUBIO_WRN("timestretch: read_thread read %d\n", p->available);
+    // signal main-thread when we are done
+    //AUBIO_WRN("timestretch: signaling new read\n");
+    pthread_cond_signal(&p->read_avail);
+    if (p->eof != 1) {
+      pthread_cond_wait(&p->read_request, &p->read_mutex);
+    }
+    if (p->eof == 1) {
+      AUBIO_WRN("timestretch: read_thread eof reached %d, %d/%d\n", p->available,
+        p->hopsize, p->source_hopsize);
+      pthread_mutex_unlock(&p->read_mutex);
+      break;
+    }
+    //AUBIO_WRN("timestretch: unlocking in readfn\n");
+    pthread_mutex_unlock(&p->read_mutex);
+  }
+#if 1
+  pthread_mutex_lock(&p->read_mutex);
+  //AUBIO_WRN("timestretch: signaling end\n");
+  pthread_cond_signal(&p->read_avail);
+  pthread_mutex_unlock(&p->read_mutex);
+#endif
+  //AUBIO_WRN("timestretch: exiting readfn\n");
+  pthread_exit(NULL);
+}
+#endif
+
 static void
 aubio_timestretch_warmup (aubio_timestretch_t * p)
 {
   // warm up rubber band
   unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
+#ifdef HAVE_THREADS
+  p->available = aubio_timestretch_fetch(p, latency);
+#else
   aubio_timestretch_fetch(p, latency);
+#endif
 }
 
 void
 del_aubio_timestretch (aubio_timestretch_t * p)
 {
+#ifdef HAVE_THREADS
+  pthread_mutex_lock(&p->read_mutex);
+  pthread_cond_signal(&p->read_request);
+  //pthread_cond_wait(&p->read_avail, &p->read_mutex);
+  pthread_mutex_unlock(&p->read_mutex);
+#if 1
+  void *threadfn;
+  if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
+      AUBIO_WRN("timestretch: cancelling thread failed\n");
+  }
+  if (pthread_join(p->read_thread, &threadfn)) {
+      AUBIO_WRN("timestretch: joining thread failed\n");
+  }
+#endif
+  pthread_mutex_destroy(&p->read_mutex);
+  pthread_cond_destroy(&p->read_avail);
+  pthread_cond_destroy(&p->read_request);
+#endif
   if (p->in) del_fvec(p->in);
   if (p->source) del_aubio_source(p->source);
   if (p->rb) {
@@ -204,7 +304,20 @@
 void
 aubio_timestretch_do (aubio_timestretch_t * p, fvec_t * out, uint_t * read)
 {
+#ifndef HAVE_THREADS
   int available = aubio_timestretch_fetch(p, p->hopsize);
+#else /* HAVE_THREADS */
+  int available;
+  pthread_mutex_lock(&p->read_mutex);
+  if (p->eof != 1) {
+    // signal a read request
+    pthread_cond_signal(&p->read_request);
+    // wait for an available signal
+    pthread_cond_wait(&p->read_avail, &p->read_mutex);
+  } else {
+    available = rubberband_available(p->rb);
+  }
+#endif /* HAVE_THREADS */
   // now retrieve the samples and write them into out->data
   if (available >= (int)p->hopsize) {
     rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize);
@@ -216,14 +329,37 @@
     fvec_zeros(out);
     *read = 0;
   }
+#ifdef HAVE_THREADS
+  pthread_mutex_unlock(&p->read_mutex);
+#endif
 }
 
 uint_t
 aubio_timestretch_seek (aubio_timestretch_t *p, uint_t pos)
 {
+  uint_t err = AUBIO_OK;
+#if HAVE_THREADS
+  AUBIO_WRN("timestretch: seek_ waiting for warmup, got %d available\n", p->available);
+  pthread_mutex_lock(&p->read_mutex);
+#endif
   p->eof = 0;
   rubberband_reset(p->rb);
-  return aubio_source_seek(p->source, pos);
+  err = aubio_source_seek(p->source, pos);
+#if HAVE_THREADS
+  p->available = 0;
+  void *threadfn;
+  if ((p->eof == 0) && (pthread_cancel(p->read_thread) == 0)) {
+      AUBIO_WRN("timestretch: cancelling thread failed\n");
+  }
+  if (pthread_join(p->read_thread, &threadfn)) {
+      AUBIO_WRN("timestretch: joining thread failed\n");
+  }
+  pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
+  pthread_cond_wait(&p->read_avail, &p->read_mutex);
+  pthread_mutex_unlock(&p->read_mutex);
+  //AUBIO_WRN("timestretch: seek_ warm up success, got %d available\n", p->available);
+#endif
+  return err;
 }
 
 #endif