ref: f36eceab2fa563ccd8158c47113206994840f77d
parent: d9459762732e2a3463fbe2c5a35f2bb63d1ac5d2
author: Paul Brossier <piem@piem.org>
date: Sat Jan 11 19:54:42 EST 2014
python/lib/aubio/slicing.py: use start and end stamps, make sure read > 0, improve tests
--- a/python/lib/aubio/slicing.py
+++ b/python/lib/aubio/slicing.py
@@ -1,6 +1,8 @@
from aubio import source, sink
import os
+max_timestamp = 1e120
+
def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
output_dir = None,
samplerate = 0,
@@ -9,8 +11,13 @@
if timestamps == None or len(timestamps) == 0:
raise ValueError ("no timestamps given")
+ if timestamps[0] != 0:
+ timestamps = [0] + timestamps
+
if timestamps_end != None and len(timestamps_end) != len(timestamps):
raise ValueError ("len(timestamps_end) != len(timestamps)")
+ else:
+ timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ]
source_base_name, source_ext = os.path.splitext(os.path.basename(source_file))
if output_dir != None:
@@ -18,22 +25,29 @@
os.makedirs(output_dir)
source_base_name = os.path.join(output_dir, source_base_name)
- def new_sink_name(source_base_name, timestamp):
- return source_base_name + '_%02.3f' % (timestamp) + '.wav'
+ def new_sink_name(source_base_name, timestamp, samplerate):
+ timestamp_seconds = timestamp / float(samplerate)
+ #print source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav'
+ return source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav'
# reopen source file
s = source(source_file, samplerate, hopsize)
if samplerate == 0: samplerate = s.get_samplerate()
- # create first sink at 0
- g = sink(new_sink_name(source_base_name, 0.), samplerate)
total_frames = 0
# get next region
- next_stamp = int(timestamps.pop(0))
+ start_stamp = int(timestamps.pop(0))
+ end_stamp = int(timestamps_end.pop(0))
+ # create first sink
+ new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
+ #print "new slice", total_frames, "+", remaining, "=", end_stamp
+ g = sink(new_sink_path, samplerate)
+
while True:
# get hopsize new samples from source
vec, read = s()
- remaining = next_stamp - total_frames
+ # number of samples until end of region
+ remaining = end_stamp - total_frames
# not enough frames remaining, time to split
if remaining < read:
if remaining != 0:
@@ -41,17 +55,17 @@
g(vec[0:remaining], remaining)
# close this file
del g
+ # get the next region
+ start_stamp = int(timestamps.pop(0))
+ end_stamp = int(timestamps_end.pop(0))
# create a new file for the new region
- new_sink_path = new_sink_name(source_base_name, next_stamp / float(samplerate))
- #print "new slice", total_frames, "+", remaining, "=", next_stamp
+ new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
+ #print "new slice", total_frames, "+", remaining, "=", end_stamp
g = sink(new_sink_path, samplerate)
# write the remaining samples in the new file
g(vec[remaining:read], read - remaining)
- if len(timestamps):
- next_stamp = int(timestamps.pop(0))
- else:
- next_stamp = 1e120
- else:
+ elif read > 0:
+ # write all the samples
g(vec[0:read], read)
total_frames += read
if read < hopsize: break
--- a/python/tests/test_slicing.py
+++ b/python/tests/test_slicing.py
@@ -4,8 +4,7 @@
from numpy.testing import assert_equal, assert_almost_equal
from aubio import slice_source_at_stamps
-from utils import count_samples_in_file, count_samples_in_directory
-from utils import get_default_test_sound
+from utils import *
import tempfile
import shutil
@@ -28,13 +27,22 @@
def test_slice_start_beyond_end(self):
regions_start = [i*1000 for i in range(1, n_slices)]
- regions_start += [count_samples_in_file(self.source_file)]
regions_start += [count_samples_in_file(self.source_file) + 1000]
slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir)
+ def test_slice_start_every_blocksize(self):
+ hopsize = 200
+ regions_start = [i*hopsize for i in range(1, n_slices)]
+ regions_start += [count_samples_in_file(self.source_file) + 1000]
+ slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir,
+ hopsize = 200)
+
def tearDown(self):
original_samples = count_samples_in_file(self.source_file)
written_samples = count_samples_in_directory(self.output_dir)
+ total_files = count_files_in_directory(self.output_dir)
+ assert_equal(n_slices, total_files,
+ "number of slices created different from expected")
assert_equal(written_samples, original_samples,
"number of samples written different from number of original samples")
shutil.rmtree(self.output_dir)
@@ -67,7 +75,7 @@
self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case')
def test_slice_wrong_ends(self):
- regions_start = [i*1000 for i in range(1, 100)]
+ regions_start = [i*1000 for i in range(1, n_slices)]
regions_end = []
self.assertRaises (ValueError,
slice_source_at_stamps, self.source_file, regions_start, regions_end,
@@ -74,7 +82,7 @@
output_dir = self.output_dir)
def test_slice_no_ends(self):
- regions_start = [i*1000 for i in range(1, 100)]
+ regions_start = [i*1000 for i in range(1, n_slices)]
regions_end = None
slice_source_at_stamps (self.source_file, regions_start, regions_end,
output_dir = self.output_dir)
--- a/python/tests/utils.py
+++ b/python/tests/utils.py
@@ -47,3 +47,14 @@
if file_path:
total_frames += count_samples_in_file(file_path)
return total_frames
+
+def count_files_in_directory(samples_dir):
+ import os
+ total_files = 0
+ for f in os.walk(samples_dir):
+ if len(f[2]):
+ for each in f[2]:
+ file_path = os.path.join(f[0], each)
+ if file_path:
+ total_files += 1
+ return total_files