from keras.utils import plot_model# path = "drive/My Drive/Colab Notebooks/アリス"# #path = "." import sys,os,shutil sys.path.append('/content/drive/My Drive/Colab Notebooks/アリス')# import importlib# from keras.callbacks import ModelCheckpoint as 数回置きにセーブ import numpy as np from keras.optimizers import Adam from keras.losses import mean_squared_error as mse from keras.models import load_model, Model, Sequential import keras from keras.layers import * from PIL import Image from sklearn.utils.extmath import cartesian as デカルト積 # ←lambda式な要素を文字列に変換してしまうバグあり。 デカルト積_ = lambda listA, listB: np.array([ [[A,B] for A in listA] for B in listB ]).reshape(-1,2).tolist() import hashlib sha256 = lambda data :int(hashlib.sha256(str(data).encode('utf-8')).hexdigest(), 16) import re """ (path)/(ex_id)/config.txtの書式 1行目に開始エポック番号 2行目にデコーダオンリーモードか否か 3行目に高さ 4行目にデータの条件式 (evalを使用するのでやや危険。i,x,yを使った条件式。但しi=360*x+y) 5行目にデータを分類する式 (eval。i,x,yを使った式で、訓練データならTRAIN,検証データならVAL,テストデータならPUREと評価される必要がある(TRAIN,VAL,PUREは変数名である)) 分類に乱数を使うと、再開のたびに再現性が失われるので非推奨。代わりにハッシュ関数sha256が利用可能 6行目に潜在変数正規ノイズの標準偏差 7行目にバッチサイズ 8行目に何エポックごとにモデルを保存するか 9行目に何回モデルを保存するごとにテストデータに対する応答を予測するか 10行目に学習率 # 11行目以降はモデルを作るためのkerasコード。 (path)/npy/data.npy """ def learn(**kwargs): arg = kwargs # キーワード引数の仕組みを途中で知ったので、つじつま合わせ def 処理(): 実験ID = 読み取る("実験ID") 実験データ = 実験データクラス(実験ID) if 実験データ.存在する(): 再開モード = 読み取る("再開", msg="実験"+str(実験ID)+"は過去に実行されたことがあるようです。\n続きから学習するなら1, 新規学習なら0を入力", tipe=真偽) if 再開モード: 実験データ.config_txtを(読む) モデル = 実験データ.モデルを読む() if モデル: print("モデルを読み込みました(epoch "+文字列(開始エポック番号)+"から再開") else: ファイルが存在しないエラー.投げる("モデルファイルが次のパスに存在しませんでした。\n"+path+"/"+文字列(self.実験ID)+"/weights.hdf5") else: input("新規学習を行うため、既存のデータを消去します。enterで続行") 実験データ.消去する() if 実験データ.開始エポック番号 is None: 実験データ.新規設定する() print("モデルを作成します") # デバグ用 モデル = モデルを作る(実験データ) print("モデルを作成しました。") print("設定を検証します") # デバグ用 try: 実験データ.設定の検証() except 検証エラー as e: メッセージ = e.args[0] 設定エラー.投げる(メッセージ) print("設定を検証しました") # デバグ用 # print(arg["データを分類する式"]) # デバグ用 npyデータ = npyデータクラス(実験データ) print("学習を開始します") try: モデル.学習する(npyデータ) # 未定義 except KeyboardInterrupt: print("学習を強制停止しました。エポック番号は"+文字列(モデル.エポック番号)) def モデルを作る(実験データ): 高さ = 実験データ.get設定辞書("高さ") デコーダオンリー = 実験データ.get設定辞書("デコーダオンリー") コードヒント = \ """\ ・numpy imported as np ・from keras.layers * imported ・from keras.models load_model, Model, Sequential imported ・各層を表現する関数へ渡すキーワード引数は、次のように英訳されます。 """+\ 文字列(get和英辞書())+\ """ また、Kerasの各層を用いて、次のようなラムダ式が定義済みです。 """+\ 文字列(get層lambda_code(高さ)) def 処理_(): return モデルクラス( # 未定義 (デコーダを作る if デコーダオンリー else EAEを作る) () , 実験データ ) def EAEを作る(): exec(get層lambda_code(高さ)) エンコーダ = エンコーダを作る() デコーダ出力層 = デコーダを作る()(エンコーダ.output) カラーバンド出力層 = カラーバンドを作る()(エンコーダ.output) EAE出力 = 層を合体(軸=1)([デコーダ出力層, カラーバンド出力層]) EAE = Model(EAE入力, EAE出力) return EAE def エンコーダを作る(): msg = \ """\ エンコーダのモデルを指示するkerasコードを入力してください。 ・最初の層は「EAE入力」という変数で表現され、出力形状は("""+文字列(高さ)+","+文字列(高さ*2)+""",3)です。 """+コードヒント+\ """ (def直後のインデントは半角スペース4文字を強制します。) 例1: 「(エンコーダ = f()」 「def f(): )」 「 model = 直列モデル()」 「 model.add(EAE入力)」 「 model.add(畳み込み(チャンネル数=2))」 「 model.add(縮小(行=16, 列=32)) # デフォルト値について。活性化関数="relu", フィルタサイズ=(3,3), ストライド=(1,1), パディング="same"」 「 model.add(変形(形状=(2,)))」 「 return model」 「__end__」 例1の通り、関数定義終了後は「__end__」だけを入力してください。(空白やその他の文字を入れないでください) 「\n__end__」が含まれることにより、連続入力は終了します。\ """ # _仮_ = 読み取る_代入式なし("エンコーダ", msg=msg+"\nエンコーダ = f()\ndef f(): \n") # コード = "def エンコーダ生成関数():\n" + _仮_ コード = 読み取る("エンコーダ", msg=msg) if コード.空白を取り除く() =="f()": if 'エンコーダ生成関数' not in arg: コード = "def エンコーダ生成関数():" while 文字列(コード).含まない("\n__end__"): コード = コード+ "\n" + input("def f():" if コード == "def エンコーダ生成関数():" else "") コード = コード.replace("\n__end__", "\n") else: コード = arg['エンコーダ生成関数'] exec(コード) # 危険 コード = "エンコーダ生成関数()" try: エンコーダ = eval(コード) # 危険 except: モデルエラー.投げる("次のコードはエンコーダを正しく生成するkerasコードではありません: "+コード+"\n"+msg) else: try: if エンコーダ.input_shape != (高さ, 高さ*2): モデルエラー.投げる("エンコーダの入力の大きさが異常です。\n"+文字列((高さ, 高さ*2))+"でなくてはいけませんが、"+文字列(エンコーダ.input_shape)+"でした。") elif エンコーダ.output_shape != (2,): モデルエラー.投げる("エンコーダの出力の大きさが異常です。\n"+文字列((2,))+"でなくてはいけませんが、"+文字列(エンコーダ.output_shape)+"でした。") except モデルエラー as e: e.投げる(e.args[0]) return エンコーダ def デコーダを作る(): msg = \ """\ デコーダのモデルを指示するkerasコードを入力してください。 ・最初の層は「潜在空間」という変数で表現され、出力形状は(2,)です。 ・最終的な出力形状は("""+文字列(高さ)+","+文字列(高さ*2)+""")にしてください。 """+コードヒント+\ """ 例1:「(デコーダ = )デコーダ生成関数()」 「(def デコーダ生成関数(): )」 「 model = 直列モデル()」 「 model.add(潜在空間)」 「 model.add(変形(形状=(1,1,2)))」 「 model.add(畳み込み(チャンネル数=3, フィルタサイズ=(5,5))) # デフォルト値について。活性化関数="relu", フィルタサイズ=(3,3), ストライド=(1,1), パディング="same"」 「 model.add(拡大(行=16, 列=32))」 「 return model」 「__end__」 例1の通り、関数定義終了後は「__end__」だけを入力してください。(空白やその他の文字を入れないでください) 「\n__end__」が含まれることにより、連続入力は終了します。\ """ コード = 読み取る("デコーダ", msg=msg) if コード.空白を取り除く() =="デコーダ生成関数()": if 'デコーダ生成関数' not in arg: コード = "def デコーダ生成関数():" while 文字列(コード).含まない("\n__end__"): コード = コード+ "\n" + input("def デコーダ生成関数():" if コード == "def デコーダ生成関数():" else "") コード = コード.replace("\n__end__", "\n") else: コード = arg['デコーダ生成関数'] 関数頭部の正規表現 = "^(def[ \t]+デコーダ生成関数[ \t]*\([ \t]*\)[ \t]*:[ \t]*)\n" if not re.findall(関数頭部の正規表現, コード): モデルエラー.投げる("コードの先頭が関数「デコーダ生成関数()」の頭部ではありませんでした。\n関数頭部は次の正規表現にマッチする必要があります。\n"+関数頭部の正規表現.replace("\t", "\\t").replace("\n","\\n")) try: with open(path+"/auto_generated.py", mode="w") as pyfile: pyfile.write(\ """\ from learn4 import 辞書, キーを英訳, get和英辞書 import keras from keras.layers import * from keras.models import load_model, Model, Sequential """+get層lambda_code(高さ)+"\n"+コード) import auto_generated importlib.reload(auto_generated) from auto_generated import デコーダ生成関数 デコーダ = デコーダ生成関数() except (SyntaxError, NameError): モデルエラー.投げる("次のコードはデコーダを正しく生成するkerasコードではありません: \n-----\n"+コード+"\n-----\n"+msg) except (ModuleNotFoundError, OSError): エラー.投げる("pyファイル\""+(path+"/auto_generated.py")+"\"の自動生成に失敗したようです。") else: try: if デコーダ.input_shape != (None, 2): モデルエラー.投げる("デコーダの入力の大きさが異常です。\n"+文字列((2,))+"でなくてはいけませんが、"+文字列(デコーダ.input_shape)+"でした。") elif デコーダ.output_shape != (None, 高さ, 高さ*2, 3): モデルエラー.投げる("デコーダの出力の大きさが異常です。\n"+文字列((高さ, 高さ*2))+"でなくてはいけませんが、"+文字列(デコーダ.output_shape)+"でした。") except モデルエラー as e: e.投げる(e.args[0]) return デコーダ def カラーバンドを作る(): 青成分 = 定数層(0) 色成分 = 層を合体(軸=1)([潜在空間, 青成分]) 出力層 = 拡大(行=16, 列=32)\ ( 変形((1,1,3))\ (色成分) ) return Model(inputs=潜在空間, outputs=出力層) return 処理_() # return of モデルを作る(実験データ) 読む = "r" 書く = "w+" class 実験データクラス: """ 実験データクラスは、各実験の詳細情報を表現するクラスである。 """ def __init__(self, 実験ID): self.実験ID = 実験ID self.再開モード = False self.開始エポック番号 = None self.__設定項目リストのコード = "[self.開始エポック番号, self.デコーダオンリー, self.高さ, self.データの条件式, self.データを分類する式, self.ノイズの標準偏差, self.バッチサイズ, self.モデル保存頻度, self.テスト頻度, self.学習率]" exec(self.__設定項目リストのコード + " =[ None, None, None, None, None, None, None, None, None, None]") def 存在する(self): return os.path.exists(self.config_txtのパス()) def 消去する(self): return shutil.rmtree(path+"/"+文字列(self.実験ID)) def config_txtのパス(self): return path+"/"+文字列(self.実験ID)+"/config.txt" def レポートのパス(self): return path+"/"+文字列(self.実験ID)+"/report.txt" def result_npyのパス(self): return path+"/"+文字列(self.実験ID)+"/result.npy" def config_txtを(self, モード): with open(self.config_txtのパス(), mode=モード) as f: 一行読む = lambda tipe : tipe(f.readline().replace("\r", "").replace("\n", "")) 一行書く = lambda obj : f.write(文字列(obj).replace("\r", "").replace("\n", "")+"\n") # f.writeメソッドは書き込まれた文字列を返却してくれる 一行読み書き = lambda tipe, obj: 一行読む(tipe) if モード==読む else tipe(一行書く(obj)) self._設定(処理=一行読み書き, tipe="設定項目の型", obj="設定項目") def モデルを読む(self): try: return load_model(path+"/"+文字列(self.実験ID)+"/weights.hdf5") except FileNotFoundError: return False def 新規設定する(self): if not self.存在する(): try:os.mkdir(path+"/"+文字列(self.実験ID)) except:pass self._設定(処理=読み取る, key="設定項目名", tipe="設定項目の型") def 設定の検証(self): 偽ならエラー(0