ref: fc96bb9cc7c59f99b03f8d1225cc708451b8bf45
parent: 2554a89e02c7fc30a980b4f7e635ceae1ecba5d6
author: Jan Buethe <jbuethe@amazon.de>
date: Fri Jul 26 08:32:53 EDT 2024
added osce testing related scripts (ietf120)
--- /dev/null
+++ b/dnn/torch/osce/stndrd/evaluation/commonvoice_clip_selection.py
@@ -1,0 +1,123 @@
+import argparse
+import os
+import yaml
+import subprocess
+
+import numpy as np
+
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument('commonvoice_base_dir')+parser.add_argument('output_dir')+parser.add_argument('--clips-per-language', required=False, type=int, default=10)+parser.add_argument('--seed', required=False, type=int, default=2024)+
+
+def select_clips(dir, num_clips=10):
+
+ if num_clips % 2:
+ print(f"warning: number of clips will be reduced to {num_clips - 1}")+ female = dict()
+ male = dict()
+
+ clips = np.genfromtxt(os.path.join(dir, 'validated.tsv'), delimiter='\t', dtype=str, invalid_raise=False)
+ clips_by_client = dict()
+
+ if len(clips.shape) < 2 or len(clips) < num_clips:
+ # not enough data to proceed
+ return None
+
+ for client in set(clips[1:,0]):
+ client_clips = clips[clips[:, 0] == client]
+ f, m = False, False
+ if 'female_feminine' in client_clips[:, 8]:
+ female[client] = client_clips[client_clips[:, 8] == 'female_feminine']
+ f = True
+ if 'male_masculine' in client_clips[:, 8]:
+ male[client] = client_clips[client_clips[:, 8] == 'male_masculine']
+ m = True
+
+ if f and m:
+ print(f"both male and female clips under client {client}")+
+
+ if min(len(female), len(male)) < num_clips // 2:
+ return None
+
+ # select num_clips // 2 random female clients
+ female_client_selection = np.array(list(female.keys()), dtype=str)[np.random.choice(len(female), num_clips//2, replace=False)]
+ female_clip_selection = []
+ for c in female_client_selection:
+ s_idx = np.random.randint(0, len(female[c]))
+ female_clip_selection.append(os.path.join(dir, 'clips', female[c][s_idx, 1].item()))
+
+ # select num_clips // 2 random female clients
+ male_client_selection = np.array(list(male.keys()), dtype=str)[np.random.choice(len(male), num_clips//2, replace=False)]
+ male_clip_selection = []
+ for c in male_client_selection:
+ s_idx = np.random.randint(0, len(male[c]))
+ male_clip_selection.append(os.path.join(dir, 'clips', male[c][s_idx, 1].item()))
+
+ return female_clip_selection + male_clip_selection
+
+def ffmpeg_available():
+ try:
+ x = subprocess.run(['ffmpeg', '-h'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ return x.returncode == 0
+ except:
+ return False
+
+
+def convert_clips(selection, outdir):
+ if not ffmpeg_available():
+ raise RuntimeError("ffmpeg not available")+
+ clipdir = os.path.join(outdir, 'clips')
+ os.makedirs(clipdir, exist_ok=True)
+
+ clipdict = dict()
+
+ for lang, clips in selection.items():
+ clipdict[lang] = []
+ for clip in clips:
+ clipname = os.path.splitext(os.path.split(clip)[-1])[0]
+ target_name = os.path.join('clips', clipname + '.wav')+ call_args = ['ffmpeg', '-i', clip, '-ar', '16000', os.path.join(outdir, target_name)]
+ print(call_args)
+ r = subprocess.run(call_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ if r.returncode != 0:
+ raise RuntimeError(f'could not execute {call_args}')+ clipdict[lang].append(target_name)
+
+ return clipdict
+
+
+if __name__ == "__main__":
+ if not ffmpeg_available():
+ raise RuntimeError("ffmpeg not available")+
+ args = parser.parse_args()
+
+ base_dir = args.commonvoice_base_dir
+ output_dir = args.output_dir
+ seed = args.seed
+
+ np.random.seed(seed)
+
+ langs = os.listdir(base_dir)
+ selection = dict()
+
+ for lang in langs:
+ print(f"processing {lang}...")+ clips = select_clips(os.path.join(base_dir, lang))
+ if clips is not None:
+ selection[lang] = clips
+
+
+ os.makedirs(output_dir, exist_ok=True)
+
+ clips = convert_clips(selection, output_dir)
+
+ with open(os.path.join(output_dir, 'clips.yml'), 'w') as f:
+ yaml.dump(clips, f)
--- /dev/null
+++ b/dnn/torch/osce/stndrd/evaluation/run_osce_test.py
@@ -1,0 +1,193 @@
+import os
+import argparse
+import yaml
+import subprocess
+
+import numpy as np
+
+from moc2 import compare as moc
+
+DEBUG=False
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument('inputdir', type=str, help='Input folder with test items')+parser.add_argument('outputdir', type=str, help='Output folder')+parser.add_argument('bitrate', type=int, help='bitrate to test')+parser.add_argument('--reference_opus_demo', type=str, default='./opus_demo', help='reference opus_demo binary for generating bitstreams and reference output')+parser.add_argument('--test_opus_demo', type=str, default='./opus_demo', help='opus_demo binary under test')+parser.add_argument('--test_opus_demo_options', type=str, default='-dec_complexity 7', help='options for test opus_demo (e.g. "-dec_complexity 7")')+parser.add_argument('--verbose', type=int, default=0, help='verbosity level: 0 for quiet (default), 1 for reporting individual test results, 2 for reporting per-item scores in failed tests')+
+def run_opus_encoder(opus_demo_path, input_pcm_path, bitstream_path, application, fs, num_channels, bitrate, options=[], verbose=False):
+
+ call_args = [
+ opus_demo_path,
+ "-e",
+ application,
+ str(fs),
+ str(num_channels),
+ str(bitrate),
+ "-bandwidth",
+ "WB"
+ ]
+
+ call_args += options
+
+ call_args += [
+ input_pcm_path,
+ bitstream_path
+ ]
+
+ try:
+ if verbose:
+ print(f"running {call_args}...")+ subprocess.run(call_args)
+ else:
+ subprocess.run(call_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ except:
+ return 1
+
+ return 0
+
+
+def run_opus_decoder(opus_demo_path, bitstream_path, output_pcm_path, fs, num_channels, options=[], verbose=False):
+
+ call_args = [
+ opus_demo_path,
+ "-d",
+ str(fs),
+ str(num_channels)
+ ]
+
+ call_args += options
+
+ call_args += [
+ bitstream_path,
+ output_pcm_path
+ ]
+
+ try:
+ if verbose:
+ print(f"running {call_args}...")+ subprocess.run(call_args)
+ else:
+ subprocess.run(call_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ except:
+ return 1
+
+ return 0
+
+def compute_moc_score(reference_pcm, test_pcm, delay=91):
+ x_ref = np.fromfile(reference_pcm, dtype=np.int16).astype(np.float32) / (2 ** 15)
+ x_cut = np.fromfile(test_pcm, dtype=np.int16).astype(np.float32) / (2 ** 15)
+
+ moc_score = moc(x_ref, x_cut[delay:])
+
+ return moc_score
+
+def sox(*call_args):
+ try:
+ call_args = ["sox"] + list(call_args)
+ subprocess.run(call_args)
+ return 0
+ except:
+ return 1
+
+def process_clip_factory(ref_opus_demo, test_opus_demo, test_options):
+ def process_clip(clip_path, processdir, bitrate):
+ # derive paths
+ clipname = os.path.splitext(os.path.split(clip_path)[1])[0]
+ pcm_path = os.path.join(processdir, clipname + ".raw")
+ bitstream_path = os.path.join(processdir, clipname + ".bin")
+ ref_path = os.path.join(processdir, clipname + "_ref.raw")
+ test_path = os.path.join(processdir, clipname + "_test.raw")
+
+ # run sox
+ sox(clip_path, pcm_path)
+
+ # run encoder
+ run_opus_encoder(ref_opus_demo, pcm_path, bitstream_path, "voip", 16000, 1, bitrate)
+
+ # run decoder
+ run_opus_decoder(ref_opus_demo, bitstream_path, ref_path, 16000, 1)
+ run_opus_decoder(test_opus_demo, bitstream_path, test_path, 16000, 1, options=test_options)
+
+ d_ref = compute_moc_score(pcm_path, ref_path)
+ d_test = compute_moc_score(pcm_path, test_path)
+
+ return d_ref, d_test
+
+
+ return process_clip
+
+def main(inputdir, outputdir, bitrate, reference_opus_demo, test_opus_demo, test_option_string, verbose):
+
+ # load clips list
+ with open(os.path.join(inputdir, 'clips.yml'), "r") as f:
+ clips = yaml.safe_load(f)
+
+ # parse test options
+ test_options = test_option_string.split()
+
+ process_clip = process_clip_factory(reference_opus_demo, test_opus_demo, test_options)
+
+ os.makedirs(outputdir, exist_ok=True)
+ processdir = os.path.join(outputdir, 'process')
+ os.makedirs(processdir, exist_ok=True)
+
+ num_passed = 0
+ results = dict()
+ min_rel_diff = 1000
+ min_mean = 1000
+ worst_clip = None
+ worst_lang = None
+ for lang, lang_clips in clips.items():
+ if verbose > 0: print(f"processing language {lang}...")+ results[lang] = np.zeros((len(lang_clips), 2))
+ for i, clip in enumerate(lang_clips):
+ clip_path = os.path.join(inputdir, clip)
+ d_ref, d_test = process_clip(clip_path, processdir, bitrate)
+ results[lang][i, 0] = d_ref
+ results[lang][i, 1] = d_test
+
+ alpha = 0.5
+ rel_diff = ((results[lang][:, 0] ** alpha - results[lang][:, 1] ** alpha) /(results[lang][:, 0] ** alpha))
+
+ min_idx = np.argmin(rel_diff).item()
+ if rel_diff[min_idx] < min_rel_diff:
+ min_rel_diff = rel_diff[min_idx]
+ worst_clip = lang_clips[min_idx]
+
+ if np.mean(rel_diff) < min_mean:
+ min_mean = np.mean(rel_diff).item()
+ worst_lang = lang
+
+ if np.min(rel_diff) < -0.1 or np.mean(rel_diff) < -0.025:
+ if verbose > 0: print(f"FAIL ({np.mean(results[lang], axis=0)} {np.mean(rel_diff)} {np.min(rel_diff)})")+ if verbose > 1:
+ for i, c in enumerate(lang_clips):
+ print(f" {c:50s} {results[lang][i]} {rel_diff[i]}")+ else:
+ if verbose > 0: print(f"PASS ({np.mean(results[lang], axis=0)} {np.mean(rel_diff)} {np.min(rel_diff)})")+ num_passed += 1
+
+ print(f"{num_passed}/{len(clips)} tests passed!")+
+ print(f"worst case occured at clip {worst_clip} with relative difference of {min_rel_diff}")+ print(f"worst mean relative difference was {min_mean} for test {worst_lang}")+
+ np.save(os.path.join(outputdir, f'results_' + "_".join(test_options) + f"_{bitrate}.npy"), results, allow_pickle=True)+
+
+
+if __name__ == "__main__":
+ args = parser.parse_args()
+
+ main(args.inputdir,
+ args.outputdir,
+ args.bitrate,
+ args.reference_opus_demo,
+ args.test_opus_demo,
+ args.test_opus_demo_options,
+ args.verbose)
--
⑨