ref: 7c9ad741f6f9584821ab660d827d8ed2d8aa528d
parent: 50e99cc1567a617ea03de844a3928637e38a8e39
author: Paul Brossier <piem@altern.org>
date: Mon Dec 19 16:24:51 EST 2005
added tasks fot onset, silence and cut added tasks fot onset, silence and cut
--- a/python/aubio/tasks.py
+++ b/python/aubio/tasks.py
@@ -201,6 +201,7 @@
self.silence = -70
self.derivate = False
self.localmin = False
+ self.storefunc = False
self.bufsize = 512
self.hopsize = 256
self.samplerate = 44100
@@ -207,26 +208,37 @@
self.tol = 0.05
self.step = float(self.hopsize)/float(self.samplerate)
self.threshold = 0.1
- self.mode = 'yin'
+ self.onsetmode = 'dual'
+ self.pitchmode = 'yin'
self.omode = aubio_pitchm_freq
class task(taskparams):
+ """ default template class to apply tasks on a stream """
def __init__(self,input,output=None,params=None):
- """ open the input file and initialize default argument """
+ """ open the input file and initialize default argument
+ parameters should be set *before* calling this method.
+ """
if params == None: self.params = taskparams()
else: self.params = params
+ self.frameread = 0
+ self.readsize = self.params.hopsize
self.input = input
self.filei = sndfile(self.input)
self.srate = self.filei.samplerate()
self.channels = self.filei.channels()
+ self.step = float(self.srate)/float(self.params.hopsize)
+ self.myvec = fvec(self.params.hopsize,self.channels)
self.output = output
- def compute_step(self):
- pass
+ def __call__(self):
+ self.readsize = self.filei.read(self.params.hopsize,self.myvec)
+ self.frameread += 1
+
def compute_all(self):
""" Compute data """
mylist = []
while(self.readsize==self.params.hopsize):
- mylist.append(self())
+ tmp = self()
+ if tmp: mylist.append(tmp)
return mylist
def eval(self,results):
@@ -237,17 +249,28 @@
""" Plot data """
pass
+class tasksilence(task):
+ wassilence = 1
+ issilence = 1
+ def __call__(self):
+ task.__call__(self)
+ if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
+ if self.wassilence == 1: self.issilence = 1
+ else: self.issilence = 2
+ self.wassilence = 1
+ else:
+ if self.wassilence <= 0: self.issilence = 0
+ else: self.issilence = -1
+ self.wassilence = 0
+ if self.issilence == -1:
+ return -1, self.frameread
+ elif self.issilence == 2:
+ return 2, self.frameread
+
class taskpitch(task):
- #def __init__(self,input,output):
- # pass
- # task.__init__(self,input)
- # #taskparams.__init__(self)
def __init__(self,input,params=None):
task.__init__(self,input,params=params)
- self.myvec = fvec(self.params.hopsize,self.channels)
- self.frameread = 0
- self.readsize = self.params.hopsize
- self.pitchdet = pitchdetection(mode=get_pitch_mode(self.params.mode),
+ self.pitchdet = pitchdetection(mode=get_pitch_mode(self.params.pitchmode),
bufsize=self.params.bufsize,
hopsize=self.params.hopsize,
channels=self.channels,
@@ -255,10 +278,9 @@
omode=self.params.omode)
def __call__(self):
- self.readsize = self.filei.read(self.params.hopsize,self.myvec)
- freq = self.pitchdet(self.myvec)
#print "%.3f %.2f" % (now,freq)
- self.frameread += 1
+ task.__call__(self)
+ freq = self.pitchdet(self.myvec)
if (aubio_silence_detection(self.myvec(),self.params.silence)!=1):
return freq
else:
@@ -265,6 +287,7 @@
return -1.
def gettruth(self):
+ """ big hack to extract midi note from /path/to/file.<midinote>.wav """
return float(self.input.split('.')[-2])
@@ -280,9 +303,12 @@
res.append(i)
sum += i
num += 1
- avg = aubio_freqtomidi(sum / float(num))
+ if num == 0:
+ avg = 0; med = 0
+ else:
+ avg = aubio_freqtomidi(sum / float(num))
+ med = aubio_freqtomidi(short_find(res,len(res)/2))
avgdist = self.truth - avg
- med = aubio_freqtomidi(short_find(res,len(res)/2))
meddist = self.truth - med
return avgdist, meddist
@@ -295,4 +321,184 @@
outplot=options.outplot)
+class taskonset(task):
+ def __init__(self,input,output=None,params=None):
+ """ open the input file and initialize arguments
+ parameters should be set *before* calling this method.
+ """
+ task.__init__(self,input,params=params)
+ self.opick = onsetpick(self.params.bufsize,
+ self.params.hopsize,
+ self.channels,
+ self.myvec,
+ self.params.threshold,
+ mode=get_onset_mode(self.params.onsetmode),
+ derivate=self.params.derivate)
+ self.olist = []
+ self.ofunc = []
+ self.d,self.d2 = [],[]
+ self.maxofunc = 0
+ if self.params.localmin:
+ ovalist = [0., 0., 0., 0., 0.]
+ def __call__(self):
+ task.__call__(self)
+ isonset,val = self.opick.do(self.myvec)
+ if (aubio_silence_detection(self.myvec(),self.params.silence)):
+ isonset=0
+ if self.params.storefunc:
+ self.ofunc.append(val)
+ if self.params.localmin:
+ if val > 0: ovalist.append(val)
+ else: ovalist.append(0)
+ ovalist.pop(0)
+ if (isonset == 1):
+ if self.params.localmin:
+ i=len(self.ovalist)-1
+ # find local minima before peak
+ while self.ovalist[i-1] < self.ovalist[i] and i > 0:
+ i -= 1
+ now = (self.frameread+1-i)
+ else:
+ now = self.frameread
+ if now < 0 :
+ now = 0
+ return now, val
+
+ def eval(self,lres):
+ from txtfile import read_datafile
+ from onsetcompare import onset_roc
+ amode = 'roc'
+ vmode = 'verbose'
+ vmode = ''
+ for i in range(len(lres)): lres[i] = lres[i][0]*self.params.step
+ ltru = read_datafile(self.input.replace('.wav','.txt'),depth=0)
+ if vmode=='verbose':
+ print "Running with mode %s" % self.params.mode,
+ print " and threshold %f" % self.params.threshold,
+ print " on file", input
+ #print ltru; print lres
+ if amode == 'localisation':
+ l = onset_diffs(ltru,lres,self.params.tol)
+ mean = 0
+ for i in l: mean += i
+ if len(l): print "%.3f" % (mean/len(l))
+ else: print "?0"
+ elif amode == 'roc':
+ self.orig, self.missed, self.merged, \
+ self.expc, self.bad, self.doubled = \
+ onset_roc(ltru,lres,self.params.tol)
+
+ def plot(self,onsets,ofunc):
+ import Gnuplot, Gnuplot.funcutils
+ import aubio.txtfile
+ import os.path
+ import numarray
+ from aubio.onsetcompare import onset_roc
+
+ self.lenofunc = len(ofunc)
+ self.maxofunc = max(max(ofunc), self.maxofunc)
+ # onset detection function
+ downtime = numarray.arange(len(ofunc))/self.step
+ self.d.append(Gnuplot.Data(downtime,ofunc,with='lines'))
+
+ # detected onsets
+ x1 = numarray.array(onsets)/self.step
+ y1 = self.maxofunc*numarray.ones(len(onsets))
+ self.d.append(Gnuplot.Data(x1,y1,with='impulses'))
+ self.d2.append(Gnuplot.Data(x1,-y1,with='impulses'))
+
+ # check if datafile exists truth
+ datafile = self.input.replace('.wav','.txt')
+ if datafile == self.input: datafile = ""
+ if not os.path.isfile(datafile):
+ self.title = "truth file not found"
+ t = Gnuplot.Data(0,0,with='impulses')
+ else:
+ t_onsets = aubio.txtfile.read_datafile(datafile)
+ y2 = self.maxofunc*numarray.ones(len(t_onsets))
+ x2 = numarray.array(t_onsets).resize(len(t_onsets))
+ self.d2.append(Gnuplot.Data(x2,y2,with='impulses'))
+
+ tol = 0.050
+
+ orig, missed, merged, expc, bad, doubled = \
+ onset_roc(x2,x1,tol)
+ self.title = "GD %2.3f%% FP %2.3f%%" % \
+ ((100*float(orig-missed-merged)/(orig)),
+ (100*float(bad+doubled)/(orig)))
+
+
+ def plotplot(self,outplot=None):
+ from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
+ import re
+ # audio data
+ time,data = audio_to_array(self.input)
+ self.d2.append(make_audio_plot(time,data))
+ # prepare the plot
+ g = gnuplot_init(outplot)
+
+ g('set title \'%s %s\'' % (re.sub('.*/','',self.input),self.title))
+
+ g('set multiplot')
+
+ # hack to align left axis
+ g('set lmargin 15')
+
+ # plot waveform and onsets
+ g('set size 1,0.3')
+ g('set origin 0,0.7')
+ g('set xrange [0:%f]' % max(time))
+ g('set yrange [-1:1]')
+ g.ylabel('amplitude')
+ g.plot(*self.d2)
+
+ g('unset title')
+
+ # plot onset detection function
+ g('set size 1,0.7')
+ g('set origin 0,0')
+ g('set xrange [0:%f]' % (self.lenofunc/self.step))
+ g('set yrange [0:%f]' % (self.maxofunc*1.01))
+ g.xlabel('time')
+ g.ylabel('onset detection value')
+ g.plot(*self.d)
+
+ g('unset multiplot')
+
+class taskcut(task):
+ def __init__(self,input,slicetimes,params=None,output=None):
+ """ open the input file and initialize arguments
+ parameters should be set *before* calling this method.
+ """
+ task.__init__(self,input,output=None,params=params)
+ self.newname = "%s%s%09.5f%s%s" % (self.input.split(".")[0].split("/")[-1],".",
+ self.frameread/self.step,".",self.input.split(".")[-1])
+ self.fileo = sndfile(self.newname,model=self.filei)
+ self.myvec = fvec(self.params.hopsize,self.channels)
+ self.mycopy = fvec(self.params.hopsize,self.channels)
+ self.slicetimes = slicetimes
+
+ def __call__(self):
+ task.__call__(self)
+ # write to current file
+ if len(self.slicetimes) and self.frameread >= self.slicetimes[0]:
+ self.slicetimes.pop(0)
+ # write up to 1st zero crossing
+ zerocross = 0
+ while ( abs( self.myvec.get(zerocross,0) ) > self.params.zerothres ):
+ zerocross += 1
+ writesize = self.fileo.write(zerocross,self.myvec)
+ fromcross = 0
+ while (zerocross < self.readsize):
+ for i in range(self.channels):
+ self.mycopy.set(self.myvec.get(zerocross,i),fromcross,i)
+ fromcross += 1
+ zerocross += 1
+ del self.fileo
+ self.fileo = sndfile("%s%s%09.5f%s%s" %
+ (self.input.split(".")[0].split("/")[-1],".",
+ self.frameread/self.step,".",self.input.split(".")[-1]),model=self.filei)
+ writesize = self.fileo.write(fromcross,self.mycopy)
+ else:
+ writesize = self.fileo.write(self.readsize,self.myvec)