shithub: aubio

Download patch

ref: 45598638afa30afad96ad3b4a0a8399a775e06e1
parent: bdb249b81da3c1f73413348d727d25f5c88cbe8a
author: Paul Brossier <piem@piem.org>
date: Mon Oct 3 07:20:57 EDT 2016

src/effects/timestretch_rubberband.c: improve threading

--- a/src/effects/timestretch.h
+++ b/src/effects/timestretch.h
@@ -166,6 +166,10 @@
 */
 uint_t aubio_timestretch_seek(aubio_timestretch_t * o, uint_t pos);
 
+uint_t aubio_timestretch_queue (aubio_timestretch_t *p, const char_t *uri, uint_t samplerate);
+
+uint_t aubio_timestretch_get_opened (aubio_timestretch_t *p);
+
 #ifdef __cplusplus
 }
 #endif
--- a/src/effects/timestretch_rubberband.c
+++ b/src/effects/timestretch_rubberband.c
@@ -58,11 +58,16 @@
   RubberBandState rb;
   RubberBandOptions rboptions;
 
+  uint_t opened;
+  const char_t *uri;
 #ifdef HAVE_THREADS
   pthread_t read_thread;
   pthread_mutex_t read_mutex;
   pthread_cond_t read_avail;
   pthread_cond_t read_request;
+  pthread_t open_thread;
+  pthread_mutex_t open_mutex;
+  uint_t open_thread_running;
   sint_t available;
   uint_t started;
   uint_t finish;
@@ -75,6 +80,7 @@
 static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch);
 #ifdef HAVE_THREADS
 static void *aubio_timestretch_readfn(void *p);
+static void *aubio_timestretch_openfn(void *z);
 #endif
 
 aubio_timestretch_t *
@@ -82,19 +88,11 @@
     smpl_t stretchratio, uint_t hopsize, uint_t samplerate)
 {
   aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t);
-  p->samplerate = samplerate;
   p->hopsize = hopsize;
   //p->source_hopsize = 2048;
   p->source_hopsize = hopsize;
   p->pitchscale = 1.;
-  p->eof = 0;
 
-  p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
-  if (!p->source) goto beach;
-  if (samplerate == 0 ) p->samplerate = aubio_source_get_samplerate(p->source);
-
-  p->in = new_fvec(p->source_hopsize);
-
   if (stretchratio <= MAX_STRETCH_RATIO && stretchratio >= MIN_STRETCH_RATIO) {
     p->stretchratio = stretchratio;
   } else {
@@ -109,13 +107,20 @@
     goto beach;
   }
 
-  p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
-  rubberband_set_max_process_size(p->rb, p->source_hopsize);
-  //rubberband_set_debug_level(p->rb, 10);
+  p->in = new_fvec(p->source_hopsize);
 
-#ifdef HAVE_THREADS
+#ifndef HAVE_THREADS
+  if (aubio_timestretch_queue(p, uri, samplerate)) goto beach;
+  aubio_timestretch_warmup(p);
+#else
   p->started = 0;
   p->finish = 0;
+  p->open_thread_running = 0;
+  //p->uri = uri;
+  p->eof = 0;
+  //p->samplerate = samplerate;
+  //if (aubio_timestretch_open(p, uri, samplerate)) goto beach;
+  pthread_mutex_init(&p->open_mutex, 0);
   pthread_mutex_init(&p->read_mutex, 0);
   pthread_cond_init (&p->read_avail, 0);
   pthread_cond_init (&p->read_request, 0);
@@ -123,11 +128,15 @@
   pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p);
   //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available);
   pthread_mutex_lock(&p->read_mutex);
+  aubio_timestretch_queue(p, uri, samplerate);
+#if 0
   pthread_cond_wait(&p->read_avail, &p->read_mutex);
+  if (!p->opened) {
+    goto beach;
+  }
+#endif
   pthread_mutex_unlock(&p->read_mutex);
   //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available);
-#else
-  aubio_timestretch_warmup(p);
 #endif
 
   return p;
@@ -137,18 +146,154 @@
   return NULL;
 }
 
+#define HAVE_OPENTHREAD 1
+//#undef HAVE_OPENTHREAD
+
+uint_t
+aubio_timestretch_queue(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
+{
 #ifdef HAVE_THREADS
+#ifdef HAVE_OPENTHREAD
+  if (p->open_thread_running) {
+#if 1
+    if (pthread_cancel(p->open_thread)) {
+      AUBIO_WRN("timestretch: cancelling open thread failed\n");
+      return AUBIO_FAIL;
+    } else {
+      AUBIO_WRN("timestretch: previous open of '%s' cancelled\n", p->uri);
+    }
+    p->open_thread_running = 0;
+#else
+    void *threadfn;
+    if (pthread_join(p->open_thread, &threadfn)) {
+      AUBIO_WRN("timestretch: failed joining existing open thread\n");
+      return AUBIO_FAIL;
+    }
+#endif
+  }
+  //AUBIO_WRN("timestretch: queueing %s\n", uri);
+  //pthread_mutex_lock(&p->read_mutex);
+  p->opened = 0;
+  p->started = 0;
+  p->available = 0;
+  p->uri = uri;
+  p->samplerate = samplerate;
+  //AUBIO_WRN("timestretch: creating thread\n");
+  pthread_create(&p->open_thread, 0, aubio_timestretch_openfn, p);
+#endif
+  //pthread_mutex_unlock(&p->read_mutex);
+  return AUBIO_OK;
+}
+
+uint_t
+aubio_timestretch_open(aubio_timestretch_t *p, const char_t* uri, uint_t samplerate)
+{
+  uint_t err = AUBIO_FAIL;
+  p->available = 0;
+  pthread_mutex_lock(&p->open_mutex);
+  p->open_thread_running = 1;
+#else
+  uint_t err = AUBIO_FAIL;
+#endif
+  p->opened = 0;
+  if (p->source) del_aubio_source(p->source);
+  p->source = new_aubio_source(uri, samplerate, p->source_hopsize);
+  if (!p->source) goto fail;
+  p->uri = uri;
+  p->samplerate = aubio_source_get_samplerate(p->source);
+  p->eof = 0;
+
+  if (p->rb == NULL) {
+    AUBIO_WRN("timestretch: creating with stretch: %.2f pitchscale: %.2f\n",
+        p->stretchratio, p->pitchscale);
+    p->rb = rubberband_new(p->samplerate, 1, p->rboptions, p->stretchratio, p->pitchscale);
+    //rubberband_set_debug_level(p->rb, 10);
+    rubberband_set_max_process_size(p->rb, p->source_hopsize);
+  } else {
+    if (samplerate != p->samplerate) {
+      AUBIO_WRN("timestretch: samplerate change requested, but not implemented\n");
+    }
+    rubberband_reset(p->rb);
+  }
+  p->opened = 1;
+  err = AUBIO_OK;
+  goto unlock;
+fail:
+  p->opened = 2;
+  AUBIO_ERR("timestretch: opening %s failed\n", uri);
+unlock:
+#ifdef HAVE_THREADS
+  p->open_thread_running = 0;
+  pthread_mutex_unlock(&p->open_mutex);
+  //AUBIO_WRN("timestretch: failed opening %s at %dHz\n", uri, samplerate);
+#endif
+  return err;
+}
+
+#ifdef HAVE_THREADS
 void *
+aubio_timestretch_openfn(void *z) {
+  aubio_timestretch_t *p = z;
+  int oldtype;
+  pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
+  //AUBIO_WRN("timestretch: creating thread\n");
+  void *ret;
+  uint_t err = aubio_timestretch_open(p, p->uri, p->samplerate);
+  ret = &err;
+  pthread_exit(ret);
+}
+#endif
+
+uint_t
+aubio_timestretch_get_opened(aubio_timestretch_t *p)
+{
+  if (p == NULL) return 0;
+  else return p->opened;
+}
+
+#ifdef HAVE_THREADS
+void *
 aubio_timestretch_readfn(void *z)
 {
   aubio_timestretch_t *p = z;
+  //AUBIO_WRN("timestretch: entering thread with %s at %dHz\n", p->uri, p->samplerate);
   while(1) { //p->available < (int)p->hopsize && p->eof != 1) {
     //AUBIO_WRN("timestretch: locking in readfn\n");
     pthread_mutex_lock(&p->read_mutex);
+#if 1
+    if (p->opened == 2) {
+      pthread_cond_signal(&p->read_avail);
+    } else
+    if (p->opened == 0) {
+#ifdef HAVE_OPENTHREAD
+      //(!aubio_timestretch_open(p, p->uri, p->samplerate)) {
+      void * threadfn;
+      if (p->open_thread_running && pthread_join(p->open_thread, &threadfn)) {
+        AUBIO_WRN("timestretch: failed to join opening thread %s at %dHz in thread "
+            "(opened: %d, playing: %d, eof: %d)\n",
+            p->uri, p->samplerate, p->opened, p->started, p->eof);
+      }
+#else
+      //AUBIO_WRN("timestretch: opening source %s\n", p->uri);
+      if (!aubio_timestretch_open(p, p->uri, p->samplerate)) {
+        AUBIO_WRN("timestretch: opened %s at %dHz in thread "
+            "(opened: %d, playing: %d, eof: %d)\n",
+            p->uri, p->samplerate, p->opened, p->started, p->eof);
+        //pthread_cond_signal(&p->read_avail);
+      } else {
+        AUBIO_WRN("timestretch: failed opening %s, exiting thread\n", p->uri);
+        //pthread_cond_signal(&p->read_avail);
+        //pthread_mutex_unlock(&p->read_mutex);
+        //goto end;
+      }
+#endif
+    } else
     if (!p->started && !p->eof) {
+#endif
       // fetch the first few samples and mark as started
-      //AUBIO_WRN("timestretch: fetching first samples\n");
       aubio_timestretch_warmup(p);
+      pthread_cond_signal(&p->read_avail);
+      //pthread_cond_wait(&p->read_request, &p->read_mutex);
       p->started = 1;
     } else if (!p->eof) {
       // fetch at least p->hopsize stretched samples
@@ -171,6 +316,7 @@
     //AUBIO_WRN("timestretch: unlocking in readfn\n");
     pthread_mutex_unlock(&p->read_mutex);
   }
+end:
   //AUBIO_WRN("timestretch: exiting readfn\n");
   pthread_exit(NULL);
 }
@@ -180,6 +326,7 @@
 aubio_timestretch_warmup (aubio_timestretch_t * p)
 {
   // warm up rubber band
+  //AUBIO_WRN("timestretch: warming-up\n");
   unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb));
 #ifdef HAVE_THREADS
   p->available = aubio_timestretch_fetch(p, latency);
@@ -186,6 +333,7 @@
 #else
   aubio_timestretch_fetch(p, latency);
 #endif
+  //AUBIO_WRN("timestretch: warmup got %d\n", latency);
 }
 
 void
@@ -194,6 +342,15 @@
 #ifdef HAVE_THREADS
   void *threadfn;
   //AUBIO_WRN("timestretch: entering delete\n");
+  if (p->open_thread_running) {
+    if (pthread_cancel(p->open_thread)) {
+      AUBIO_WRN("timestretch: cancelling open thread failed\n");
+    }
+    if (pthread_join(p->open_thread, &threadfn)) {
+      AUBIO_WRN("timestretch: joining open thread failed\n");
+    }
+  }
+  if (!p->opened) goto cleanup;
   pthread_mutex_lock(&p->read_mutex);
   p->finish = 1;
   pthread_cond_signal(&p->read_request);
@@ -200,14 +357,15 @@
   //pthread_cond_wait(&p->read_avail, &p->read_mutex);
   pthread_mutex_unlock(&p->read_mutex);
   if ((p->eof == 0) && (pthread_cancel(p->read_thread))) {
-      AUBIO_WRN("timestretch: cancelling thread failed\n");
+    AUBIO_WRN("timestretch: cancelling thread failed\n");
   }
   if (pthread_join(p->read_thread, &threadfn)) {
-      AUBIO_WRN("timestretch: joining thread failed\n");
+    AUBIO_WRN("timestretch: joining thread failed\n");
   }
   pthread_mutex_destroy(&p->read_mutex);
   pthread_cond_destroy(&p->read_avail);
   pthread_cond_destroy(&p->read_request);
+cleanup:
 #endif
   if (p->in) del_fvec(p->in);
   if (p->source) del_aubio_source(p->source);
@@ -230,6 +388,10 @@
 uint_t
 aubio_timestretch_set_stretch (aubio_timestretch_t * p, smpl_t stretch)
 {
+  if (!p->rb) {
+    AUBIO_WRN("timestretch: could not set stretch ratio, rubberband not created\n");
+    return AUBIO_FAIL;
+  }
   if (stretch >= MIN_STRETCH_RATIO && stretch <= MAX_STRETCH_RATIO) {
     p->stretchratio = stretch;
     rubberband_set_time_ratio(p->rb, 1./p->stretchratio);
@@ -249,6 +411,10 @@
 uint_t
 aubio_timestretch_set_pitchscale (aubio_timestretch_t * p, smpl_t pitchscale)
 {
+  if (!p->rb) {
+    AUBIO_WRN("timestretch: could not set pitch scale, rubberband not created\n");
+    return AUBIO_FAIL;
+  }
   if (pitchscale >= 0.0625  && pitchscale <= 4.) {
     p->pitchscale = pitchscale;
     rubberband_set_pitch_scale(p->rb, p->pitchscale);
@@ -287,6 +453,10 @@
 aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t length)
 {
   uint_t source_read = p->source_hopsize;
+  if (p->source == NULL) {
+    AUBIO_ERR("timestretch: trying to fetch on NULL source\n");
+    return 0;
+  }
   // read more samples from source until we have enough available or eof is reached
   int available = rubberband_available(p->rb);
   while ((available < (int)length) && (p->eof == 0)) {
@@ -308,14 +478,28 @@
 #else /* HAVE_THREADS */
   int available;
   pthread_mutex_lock(&p->read_mutex);
+#if 1
+  if (!p->opened) {
+    // this may occur if _do was was called while being opened
+    //AUBIO_WRN("timestretch: calling _do before opening a file\n");
+    pthread_cond_signal(&p->read_request);
+    //available = 0;
+    //pthread_cond_wait(&p->read_avail, &p->read_mutex);
+    available = 0; //p->available;
+  } else
+#endif
   if (p->eof != 1) {
+    //AUBIO_WRN("timestretch: calling _do after opening a file\n");
     // signal a read request
     pthread_cond_signal(&p->read_request);
     // wait for an available signal
     pthread_cond_wait(&p->read_avail, &p->read_mutex);
+    available = p->available;
   } else {
     available = rubberband_available(p->rb);
+    //AUBIO_WRN("timestretch: reached eof (%d/%d)\n", p->hopsize, available);
   }
+  pthread_mutex_unlock(&p->read_mutex);
 #endif /* HAVE_THREADS */
   // now retrieve the samples and write them into out->data
   if (available >= (int)p->hopsize) {
@@ -322,14 +506,17 @@
     rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize);
     *read = p->hopsize;
   } else if (available > 0) {
+    // this occurs each time the end of file is reached
+    //AUBIO_WRN("timestretch: short read\n");
     rubberband_retrieve(p->rb, (float* const*)&(out->data), available);
     *read = available;
   } else {
+    // this may occur if the previous was a short read available == hopsize
     fvec_zeros(out);
     *read = 0;
   }
 #ifdef HAVE_THREADS
-  pthread_mutex_unlock(&p->read_mutex);
+  //pthread_mutex_unlock(&p->read_mutex);
 #endif
 }
 
@@ -338,15 +525,46 @@
 {
   uint_t err = AUBIO_OK;
 #if HAVE_THREADS
+  if (p == NULL) {
+    AUBIO_WRN("seeking but object not set yet (ignoring)\n");
+    return AUBIO_FAIL;
+  }
   pthread_mutex_lock(&p->read_mutex);
+  if (p->open_thread_running) {
+    //AUBIO_WRN("seeking but opening thread not completed yet (ignoring)\n");
+    err = AUBIO_OK;
+    goto beach;
+  }
+  if (!p->opened || !p->source) {
+    //AUBIO_WRN("timestretch: seeking but source not opened yet (ignoring)\n");
+    err = AUBIO_OK;
+    goto beach;
+  }
 #endif
   p->eof = 0;
-  rubberband_reset(p->rb);
-  err = aubio_source_seek(p->source, pos);
+  if (p->rb) {
+    rubberband_reset(p->rb);
+  }
+#ifdef HAVE_THREADS
+#ifdef HAVE_OPENTHREAD
+  pthread_mutex_lock(&p->open_mutex);
+#endif
+#endif
+  if (p->source) {
+    err = aubio_source_seek(p->source, pos);
+  } else {
+    AUBIO_WRN("timestretch: seeking but p->source not created?!\n");
+    err = AUBIO_FAIL;
+    goto beach;
+  }
 #if HAVE_THREADS
+  pthread_mutex_unlock(&p->open_mutex);
   p->available = 0;
   p->started = 1;
+beach:
   pthread_mutex_unlock(&p->read_mutex);
+#else
+beach:
 #endif
   return err;
 }
--- a/tests/src/effects/test-timestretch.c
+++ b/tests/src/effects/test-timestretch.c
@@ -51,13 +51,53 @@
 
   if (transpose != 0) aubio_timestretch_set_transpose(ps, transpose);
 
+#if 0
   do {
+    if (aubio_timestretch_get_opened(ps) == 0)
+      PRINT_MSG("not opened!\n");
+    aubio_timestretch_get_opened(ps);
     aubio_timestretch_set_stretch(ps, stretch);
     aubio_timestretch_set_transpose(ps, transpose);
     aubio_timestretch_do(ps, out, &read);
+    if (samplerate == 0) {
+      PRINT_MSG("setting samplerate now to %d\n", aubio_timestretch_get_samplerate(ps));
+      samplerate = aubio_timestretch_get_samplerate(ps);
+      aubio_sink_preset_samplerate(o, samplerate);
+      aubio_sink_preset_channels(o, 1);
+    }
     aubio_sink_do(o, out, read);
     n_frames += read;
   } while ( read == hop_size );
+#else
+
+  aubio_timestretch_queue(ps, source_path, samplerate);
+
+  do {
+    aubio_timestretch_get_opened(ps);
+    aubio_timestretch_set_stretch(ps, stretch);
+    aubio_timestretch_set_transpose(ps, transpose);
+    aubio_timestretch_do(ps, out, &read);
+    if (n_frames == 34999 * hop_size) {
+      PRINT_MSG("instant queuing?\n");
+      aubio_timestretch_queue(ps, source_path, samplerate);
+    }
+    if (n_frames == 64999 * hop_size) {
+      PRINT_MSG("instant queuing 2\n");
+      aubio_timestretch_queue(ps, "/dev/null", samplerate);
+    }
+    if (n_frames == 54999 * hop_size) {
+      PRINT_MSG("instant queuing?\n");
+      aubio_timestretch_queue(ps, source_path, samplerate);
+    }
+    if (n_frames == 74999 * hop_size) {
+      PRINT_MSG("instant queuing?\n");
+      aubio_timestretch_queue(ps, source_path, samplerate);
+    }
+    aubio_sink_do(o, out, read);
+  //} while ( read == hop_size );
+    n_frames += hop_size;
+  } while ( n_frames < 100000 * hop_size);
+#endif
 
   PRINT_MSG("wrote %d frames at %dHz (%d blocks) from %s written to %s\n",
       n_frames, samplerate, n_frames / hop_size,