import numpy as np import pandas as pd import chainer from chainer import Variable, optimizers, serializers, cuda import chainer.functions as F import chainer.links as L from chainer import training from chainer import serializers from chainer.datasets import tuple_dataset from chainer.training import extensions import glob import os import inspect import config as C # ネットワーク class myNN(chainer.Chain): def __init__(self, class_num=C.class_num): super(myNN, self).__init__( conv1 = L.Convolution2D(in_channels=1, out_channels=32, ksize=(3, 3), stride=(3, 3), pad=0), conv2 = L.Convolution2D(in_channels=32, out_channels=32, ksize=(3, 3), stride=(3, 3), pad=0), conv3 = L.Convolution2D(in_channels=32, out_channels=32, ksize=(3, 3), stride=(3, 3), pad=0), fc1 = L.Linear(in_size=32*19*34, out_size=2048), fc2 = L.Linear(in_size=2048, out_size=1024), fc3 = L.Linear(in_size=1024, out_size=C.class_num), ) def __call__(self, x): c1 = F.relu(self.conv1(x)) c2 = F.relu(self.conv2(c1)) c3 = F.relu(self.conv3(c2)) l1 = F.relu(self.fc1(c3)) l2 = F.relu(self.fc2(l1)) l3 = self.fc3(l2) return l3 # データセットの作成 class MusicSpecDataset(chainer.dataset.DatasetMixin): def __init__(self, home, dtype=np.float32): if not os.path.exists(home): sys.exit() self._dtype = dtype dir_list = os.listdir(home) sort_list = [] for num in dir_list: sort_list.append(int(num)) sort_list.sort() dirs = [] for name in sort_list: dirs.append(os.path.join(home, str(name))) # 正解ラベルとペアにする pairs = [] memo = [] for i, dname in enumerate(dirs): files = glob.glob("{}/*.npy".format(dname)) label = [i] memo.append([dname.split("/")[-1], i]) for fname in files: pairs.append([fname, label]) # 正解ラベルと予測ラベルをlabel.csvに保存 memo = pd.DataFrame(memo, columns=['bpm', 'label']) memo.to_csv(os.path.join(os.path.dirname(home), "label.csv")) self._pairs = pairs def __len__(self): return len(self._pairs) def get_example(self, i): fname = self._pairs[i][0] spec = np.load(fname) if spec.ndim == 2: spec = spec[np.newaxis,:] spec = spec.astype(np.float32) label = np.int32(self._pairs[i][1]) label = np.int32(label[0]) return spec, label # ネットワークをtxtファイルに保存 def SaveParam(epoch, batch, param_path=C.log_path): t = inspect.getsource(myNN) t = t + "epoch:{}".format(epoch) + "\n" + "batch:{}".format(batch) + "\n" with open(os.path.join(param_path, "param.txt"), mode='w') as f: f.write(t) print("Save Path :",os.path.join(param_path)) # 訓練設定 def TrainModel(epoch, batch, device=-1, data_path=C.fft_path, save="simple_tempo.model"): print("Training Start") result = C.log_path # ネットワークの保存 SaveParam(epoch, batch, result) model = L.Classifier(myNN()) # GPUの使用 if device >= 0: chainer.cuda.get_device(device).use() model.to_gpu() optimizer = optimizers.SGD() optimizer.setup(model) # データセットの読み込み dataset = MusicSpecDataset(data_path) split = int(len(dataset) * 0.8) train, test = chainer.datasets.split_dataset_random(dataset, split, seed=0) train_iter = chainer.iterators.MultithreadIterator(train, batch_size=batch) test_iter = chainer.iterators.MultithreadIterator(test, batch_size=batch, repeat=False, shuffle=False) updater = training.StandardUpdater(train_iter, optimizer, device=device) trainer = training.Trainer(updater, (epoch, 'epoch'), out=result) trainer.extend(extensions.Evaluator(test_iter, model, device=device)) trainer.extend(extensions.LogReport()) trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'main/accuracy', 'validation/main/loss', 'validation/main/accuracy', 'elapsed_time'])) trainer.extend(extensions.snapshot(filename='snapshot_epoch-{.updater.epoch}'), trigger=(10, 'epoch')) trainer.extend(extensions.ProgressBar()) trainer.run() model.to_cpu() serializers.save_npz(os.path.join(result ,save), model)