ref: 43206799c120845686eb1a6f4154199146dc1644
parent: f36eceab2fa563ccd8158c47113206994840f77d
author: Paul Brossier <piem@piem.org>
date: Sat Jan 11 21:58:06 EST 2014
lib/aubio/slicing.py: allow any regions, overlaping or not, add more tests
--- a/python/lib/aubio/slicing.py
+++ b/python/lib/aubio/slicing.py
@@ -14,11 +14,15 @@
if timestamps[0] != 0:
timestamps = [0] + timestamps
- if timestamps_end != None and len(timestamps_end) != len(timestamps):
- raise ValueError ("len(timestamps_end) != len(timestamps)")
+ if timestamps_end != None:
+ if len(timestamps_end) != len(timestamps):
+ raise ValueError ("len(timestamps_end) != len(timestamps)")
else:
timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ]
+ regions = zip(timestamps, timestamps_end)
+ #print regions
+
source_base_name, source_ext = os.path.splitext(os.path.basename(source_file))
if output_dir != None:
if not os.path.isdir(output_dir):
@@ -32,41 +36,48 @@
# reopen source file
s = source(source_file, samplerate, hopsize)
- if samplerate == 0: samplerate = s.get_samplerate()
+ samplerate = s.get_samplerate()
+
total_frames = 0
- # get next region
- start_stamp = int(timestamps.pop(0))
- end_stamp = int(timestamps_end.pop(0))
+ slices = []
- # create first sink
- new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
- #print "new slice", total_frames, "+", remaining, "=", end_stamp
- g = sink(new_sink_path, samplerate)
-
while True:
# get hopsize new samples from source
vec, read = s()
- # number of samples until end of region
- remaining = end_stamp - total_frames
- # not enough frames remaining, time to split
- if remaining < read:
- if remaining != 0:
- # write remaining samples from current region
- g(vec[0:remaining], remaining)
- # close this file
- del g
- # get the next region
- start_stamp = int(timestamps.pop(0))
- end_stamp = int(timestamps_end.pop(0))
- # create a new file for the new region
+ # if the total number of frames read will exceed the next region start
+ if len(regions) and total_frames + read >= regions[0][0]:
+ #print "getting", regions[0], "at", total_frames
+ # get next region
+ start_stamp, end_stamp = regions.pop(0)
+ # create a name for the sink
new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
- #print "new slice", total_frames, "+", remaining, "=", end_stamp
+ # create its sink
g = sink(new_sink_path, samplerate)
- # write the remaining samples in the new file
- g(vec[remaining:read], read - remaining)
- elif read > 0:
- # write all the samples
- g(vec[0:read], read)
+ # create a dictionary containing all this
+ new_slice = {'start_stamp': start_stamp, 'end_stamp': end_stamp, 'sink': g}
+ # append the dictionary to the current list of slices
+ slices.append(new_slice)
+
+ for current_slice in slices:
+ start_stamp = current_slice['start_stamp']
+ end_stamp = current_slice['end_stamp']
+ g = current_slice['sink']
+ # sample index to start writing from new source vector
+ start = max(start_stamp - total_frames, 0)
+ # number of samples yet to written be until end of region
+ remaining = end_stamp - total_frames + 1
+ #print current_slice, remaining, start
+ # not enough frames remaining, time to split
+ if remaining < read:
+ if remaining > start:
+ # write remaining samples from current region
+ g(vec[start:remaining], remaining - start)
+ #print "closing region", "remaining", remaining
+ # close this file
+ del g
+ elif read > start:
+ # write all the samples
+ g(vec[start:read], read - start)
total_frames += read
if read < hopsize: break
--- a/python/tests/test_slicing.py
+++ b/python/tests/test_slicing.py
@@ -9,7 +9,7 @@
import tempfile
import shutil
-n_slices = 8
+n_slices = 4
class aubio_slicing_test_case(TestCase):
@@ -33,7 +33,6 @@
def test_slice_start_every_blocksize(self):
hopsize = 200
regions_start = [i*hopsize for i in range(1, n_slices)]
- regions_start += [count_samples_in_file(self.source_file) + 1000]
slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir,
hopsize = 200)
@@ -47,6 +46,55 @@
"number of samples written different from number of original samples")
shutil.rmtree(self.output_dir)
+class aubio_slicing_with_ends_test_case(TestCase):
+
+ def setUp(self):
+ self.source_file = get_default_test_sound(self)
+ self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case')
+
+ def test_slice_start_and_ends_no_gap(self):
+ regions_start = [i*1000 for i in range(n_slices)]
+ regions_ends = [start - 1 for start in regions_start[1:]] + [1e120]
+ slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+ output_dir = self.output_dir)
+ original_samples = count_samples_in_file(self.source_file)
+ written_samples = count_samples_in_directory(self.output_dir)
+ total_files = count_files_in_directory(self.output_dir)
+ assert_equal(n_slices, total_files,
+ "number of slices created different from expected")
+ assert_equal(written_samples, original_samples,
+ "number of samples written different from number of original samples")
+
+ def test_slice_start_and_ends_200_gap(self):
+ regions_start = [i*1000 for i in range(n_slices)]
+ regions_ends = [start + 199 for start in regions_start]
+ slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+ output_dir = self.output_dir)
+ expected_samples = 200 * n_slices
+ written_samples = count_samples_in_directory(self.output_dir)
+ total_files = count_files_in_directory(self.output_dir)
+ assert_equal(n_slices, total_files,
+ "number of slices created different from expected")
+ assert_equal(written_samples, expected_samples,
+ "number of samples written different from number of original samples")
+
+ def test_slice_start_and_ends_overlaping(self):
+ regions_start = [i*1000 for i in range(n_slices)]
+ regions_ends = [start + 1199 for start in regions_start]
+ slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+ output_dir = self.output_dir)
+ expected_samples = 1200 * n_slices
+ written_samples = count_samples_in_directory(self.output_dir)
+ total_files = count_files_in_directory(self.output_dir)
+ assert_equal(n_slices, total_files,
+ "number of slices created different from expected")
+ assert_equal(written_samples, expected_samples,
+ "number of samples written different from number of original samples")
+
+ def tearDown(self):
+ shutil.rmtree(self.output_dir)
+
+
class aubio_slicing_wrong_starts_test_case(TestCase):
def setUp(self):
@@ -86,6 +134,9 @@
regions_end = None
slice_source_at_stamps (self.source_file, regions_start, regions_end,
output_dir = self.output_dir)
+ total_files = count_files_in_directory(self.output_dir)
+ assert_equal(n_slices, total_files,
+ "number of slices created different from expected")
original_samples = count_samples_in_file(self.source_file)
written_samples = count_samples_in_directory(self.output_dir)
assert_equal(written_samples, original_samples,