摘要:参考文献主要参考这篇文章为了方便说明和研究,我这里只是设计了一个非常简单的模型,对高斯分布样本进行生成。上面的后面我设置为后,最后好像很难收敛到真正的高斯分布,总是比真的高斯差那么一点。
参考文献:
主要参考这篇文章 Generative Adversarial Networks, link
为了方便说明和研究,我这里只是设计了一个非常简单的模型,对高斯分布样本进行生成。不过从下面的实验中,我还是发现了一些非常有用的特点,可以加深我们对GAN网络的了解。
GAN原理具体原理可以参考上面的文献,不过在这里还是大概讲一下。
其实GAN的原理非常简单,它有2个子网络组成,一个是Generator,即生成网络,它以噪音样本为输入,通过学习到的权重,把噪音转变(即生成)为有意义的信号;一个是Discriminator,即判别网络,他以信号为输入(可以来自generator生成的信号,也可以是真正的信号),通过学习来判别信号的真假,并输出一个0-1之间的概率。可以把Generator比喻为一个假的印钞机,而Discriminator则是验钞机,他们两个互相竞争,使得印钞机越来越真,同时验钞机也越来越准。但是最终我们是希望Generator越来越真,而Discriminator的输出都是0.5,即难以分辨~~
而在训练的时候,则分两个阶段进行,第一个阶段是Discriminator的学习,此时固定Generator的权重不变,只更新Discriminator的权重。loss函数是:
$$ frac{1}{m}sum_{i=1}^{m}[logD(x^i) + log(1 - D(G(z^i)))] $$
其中m是batch_size, $x$表示真正的信号,$z$表示噪音样本。训练时分别从噪音分布和真实分布中选出m个噪音输入样本和m个真实信号样本,通过对以上的loss function最大化更新Discriminator的权重
第二个阶段是对Generator进行训练,此时的loss function是:
$$ frac{1}{m}sum_{i=1}^{m}[log(1 - D(G(z^i)))] $$
不过,此时是对loss最小化来更新Generator的权重。
另外,这2个阶段并不是交替进行的,而是执行K次Discriminator的更新,再执行1次Generator的更新。
后面的实验结果也显示,K的选择非常关键。
主要工具是 python + keras,用keras实现一些常用的网络特别容易,比如MLP、word2vec、LeNet、lstm等等,github上都有详细demo。但是稍微复杂些的就要费些时间自己写了。不过整体看,依然比用原生tf写要方便。而且,我们还可以把keras当初是学习tf的参考代码,里面很多写法都非常值得借鉴。
废话不多说了,直接上代码吧:
GANmodel只列出最主要的代码
# 这是针对GAN特殊设计的loss function def log_loss_discriminator(y_true, y_pred): return - K.log(K.maximum(K.epsilon(), y_pred)) def log_loss_generator(y_true, y_pred): return K.log(K.maximum(K.epsilon(), 1. - y_pred)) class GANModel: def __init__(self, input_dim, log_dir = None): """ __tensor[0]: 定义了discriminateor的表达式, 对y进行判别,true samples __tensor[1]: 定义了generator的表达式, 对x进行生成,noise samples """ if isinstance(input_dim, list): input_dim_y, input_dim_x = input_dim[0], input_dim[1] elif isinstance(input_dim, int): input_dim_x = input_dim_y = input_dim else: raise ValueError("input_dim should be list or interger, got %r" % input_dim) # 必须使用名字,方便后面分别输入2个信号 self.__inputs = [layers.Input(shape=(input_dim_y,), name = "y"), layers.Input(shape=(input_dim_x,), name = "x")] self.__tensors = [None, None] self.log_dir = log_dir self._discriminate_layers = [] self._generate_layers = [] self.train_status = defaultdict(list) def add_gen_layer(self, layer): self._add_layer(layer, True) def add_discr_layer(self, layer): self._add_layer(layer) def _add_layer(self, layer, for_gen=False): idx = 0 if for_gen: self._generate_layers.append(layer) idx = 1 else: self._discriminate_layers.append(layer) if self.__tensors[idx] is None: self.__tensors[idx] = layer(self.__inputs[idx]) else: self.__tensors[idx] = layer(self.__tensors[idx]) def compile_discriminateor_model(self, optimizer = optimizers.Adam()): if len(self._discriminate_layers) <= 0: raise ValueError("you need to build discriminateor model before compile it") if len(self._generate_layers) <= 0: raise ValueError("you need to build generator model before compile discriminateo model") # 通过指定trainable = False,可以freeze权重的更新。必须放在compile之前 for l in self._discriminate_layers: l.trainable = True for l in self._generate_layers: l.trainable = False discriminateor_out1 = self.__tensors[0] discriminateor_out2 = layers.Lambda(lambda y: 1. - y)(self._discriminate_generated()) # 如果输出2个信号,keras会分别在各个信号上引用loss function,然后累加,对累加的结果进行 # minimize 更新。双下划线的model是参与训练的模型。 self.__discriminateor_model = Model(self.__inputs, [discriminateor_out1, discriminateor_out2]) self.__discriminateor_model.compile(optimizer, loss = log_loss_discriminator) # 这个才是真正的discriminator model self.discriminateor_model = Model(self.__inputs[0], self.__tensors[0]) self.discriminateor_model.compile(optimizer, loss = log_loss_discriminator) if self.log_dir is not None: # 需要安装pydot和graphviz。没有的可以先注释掉 plot_model(self.__discriminateor_model, self.log_dir + "/gan_discriminateor_model.png", show_shapes = True) def compile_generator_model(self, optimizer = optimizers.Adam()): if len(self._discriminate_layers) <= 0: raise ValueError("you need to build discriminateor model before compile generator model") if len(self._generate_layers) <= 0: raise ValueError("you need to build generator model before compile it") for l in self._discriminate_layers: l.trainable = False for l in self._generate_layers: l.trainable = True out = self._discriminate_generated() self.__generator_model = Model(self.__inputs[1], out) self.__generator_model.compile(optimizer, loss = log_loss_generator) # 这个才是真正的Generator模型 self.generator_model = Model(self.__inputs[1], self.__tensors[1]) if self.log_dir is not None: plot_model(self.__generator_model, self.log_dir + "/gan_generator_model.png", show_shapes = True) def train(self, sample_list, epoch = 3, batch_size = 32, step_per = 10, plot=False): """ step_per: 每隔几步训练一次generator,即K """ sample_noise, sample_true = sample_list["x"], sample_list["y"] sample_count = sample_noise.shape[0] batch_count = sample_count // batch_size # 这里比较trick了,因为keras的model必须要一个y。但是gan其实是没有y的。只好伪造一个 # 满足keras的“无理”要求 psudo_y = np.ones((batch_size, ), dtype = "float32") if plot: # plot the real data fig = plt.figure() ax = fig.add_subplot(1,1,1) plt.ion() plt.show() for ei in range(epoch): for i in range(step_per): idx = random.randint(0, batch_count-1) batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size] idx = random.randint(0, batch_count-1) batch_sample = sample_true[idx * batch_size : (idx+1) * batch_size] self.__discriminateor_model.train_on_batch({ "y": batch_sample, "x": batch_noise}, [psudo_y, psudo_y]) idx = random.randint(0, batch_count-1) batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size] self.__generator_model.train_on_batch(batch_noise, psudo_y) if plot: gen_result = self.generator_model.predict_on_batch(batch_noise) self.train_status["gen_result"].append(gen_result) dis_result = self.discriminateor_model.predict_on_batch(gen_result) self.train_status["dis_result"].append(dis_result) freq_g, bin_g = np.histogram(gen_result, density=True) # norm to sum1 freq_g = freq_g * (bin_g[1] - bin_g[0]) bin_g = bin_g[:-1] freq_d, bin_d = np.histogram(batch_sample, density=True) freq_d = freq_d * (bin_d[1] - bin_d[0]) bin_d = bin_d[:-1] ax.plot(bin_g, freq_g, "go-", markersize = 4) ax.plot(bin_d, freq_d, "ko-", markersize = 8) gen1d = gen_result.flatten() dis1d = dis_result.flatten() si = np.argsort(gen1d) ax.plot(gen1d[si], dis1d[si], "r--") if (ei+1) % 20 == 0: ax.cla() plt.title("epoch = %d" % (ei+1)) plt.pause(0.05) if plot: plt.ioff() plt.close()main部分
只列出主要部分:从中可以看到主要模型结构和参数取值
step_per = 20 sample_size = args.batch_size * 100 # 整个测试样本集合 noise_dim = 4 signal_dim = 1 x = np.random.uniform(-3, 3, size = (sample_size, noise_dim)) y = np.random.normal(size = (sample_size, signal_dim)) samples = {"x": x, "y": y} gan = GANModel([signal_dim, noise_dim], args.log_dir) gan.add_discr_layer(layers.Dense(200, activation="relu")) gan.add_discr_layer(layers.Dense(50, activation="softmax")) gan.add_discr_layer(layers.Lambda(lambda y: K.max(y, axis=-1, keepdims=True), output_shape = (1,))) gan.add_gen_layer(layers.Dense(200, activation="relu")) gan.add_gen_layer(layers.Dense(100, activation="relu")) gan.add_gen_layer(layers.Dense(50, activation="relu")) gan.add_gen_layer(layers.Dense(signal_dim)) gan.compile_generator_model() loger.info("compile generator finished") gan.compile_discriminateor_model() loger.info("compile discriminator finished") gan.train(samples, args.epoch, args.batch_size, step_per, plot=True)实验结果 K的影响
在论文中,作者就提到K对训练结果影响很大,
使用上面的step_per = 20,我得到的结果比较理想:
可以看到,最后Generator生成的数据(绿线)和真实的高斯分布(黑线)非常接近了,导致Discriminator也变得无法辨认了(p = 0.5)。
但是把step_per设为3后,结果就发散的厉害,比较难收敛:
在文章中,作者也提到,Discriminator和Generator必须匹配好,一般要多训练几次Discriminator再训练一次Generator,这是因为Discriminator是Generator的前提,如果D都没有训练好,那G的更新方向就会不准。
另外,我还发现,noise_dim对结果影响也非常大。上面的noise_dim = 4, 后面我设置为1后,最后好像很难收敛到真正的高斯分布,总是比真的高斯差那么一点。
所以,我的猜测是:Generator的输入其实可以看成是真实信号在其他维度上的映射,通过模型的学习过程,它找到了二者的映射关系,所以反过来可以认为Generator把真实信号分解到了高维空间里,此时,当然是维度越高信号被分解的越好,越容易接近真实信号。
而且,从信号拟合角度看,因为我实验中的高斯信号是非线性的,而使用的激活函数都是线性函数,如果噪音也是1维的,相当于用一堆线性函数去拟合非线性函数,这种情况必须要在一个更高的维度上才能实现。
训练一个稳定的GAN网络是一个非常复杂的过程,所幸已经有大神在这方面做了很多探索。详细请参考这里
完整代码# demo_gan.py # -*- encoding: utf8 -*- """ GAN网络Demo """ import os from os import path import argparse import logging import traceback import random import pickle import numpy as np import tensorflow as tf from keras import optimizers from keras import layers from keras import callbacks, regularizers, activations from keras.engine import Model from keras.utils.vis_utils import plot_model import keras.backend as K from collections import defaultdict from matplotlib import pyplot as plt import app_logger loger = logging.getLogger(__name__) # 注意pred不能为负数,因为pred是一个概率。所以最后一个激活函数的选择要注意 def log_loss_discriminator(y_true, y_pred): return - K.log(K.maximum(K.epsilon(), y_pred)) def log_loss_generator(y_true, y_pred): return K.log(K.maximum(K.epsilon(), 1. - y_pred)) class GANModel: def __init__(self, input_dim, log_dir = None): """ __tensor[0]: 定义了discriminateor的表达式 __tensor[1]: 定义了generator的表达式 """ # discriminateor 对y进行判别,true samples # generator 对x进行生成,noise samples if isinstance(input_dim, list): input_dim_y, input_dim_x = input_dim[0], input_dim[1] elif isinstance(input_dim, int): input_dim_x = input_dim_y = input_dim else: raise ValueError("input_dim should be list or interger, got %r" % input_dim) self.__inputs = [layers.Input(shape=(input_dim_y,), name = "y"), layers.Input(shape=(input_dim_x,), name = "x")] self.__tensors = [None, None] self.log_dir = log_dir self._discriminate_layers = [] self._generate_layers = [] self.train_status = defaultdict(list) def add_gen_layer(self, layer): self._add_layer(layer, True) def add_discr_layer(self, layer): self._add_layer(layer) def _add_layer(self, layer, for_gen=False): idx = 0 if for_gen: self._generate_layers.append(layer) idx = 1 else: self._discriminate_layers.append(layer) if self.__tensors[idx] is None: self.__tensors[idx] = layer(self.__inputs[idx]) else: self.__tensors[idx] = layer(self.__tensors[idx]) def compile_discriminateor_model(self, optimizer = optimizers.Adam()): if len(self._discriminate_layers) <= 0: raise ValueError("you need to build discriminateor model before compile it") if len(self._generate_layers) <= 0: raise ValueError("you need to build generator model before compile discriminateo model") for l in self._discriminate_layers: l.trainable = True for l in self._generate_layers: l.trainable = False discriminateor_out1 = self.__tensors[0] discriminateor_out2 = layers.Lambda(lambda y: 1. - y)(self._discriminate_generated()) self.__discriminateor_model = Model(self.__inputs, [discriminateor_out1, discriminateor_out2]) self.__discriminateor_model.compile(optimizer, loss = log_loss_discriminator) # 这个才是需要的discriminateor model self.discriminateor_model = Model(self.__inputs[0], self.__tensors[0]) self.discriminateor_model.compile(optimizer, loss = log_loss_discriminator) #if self.log_dir is not None: # plot_model(self.__discriminateor_model, self.log_dir + "/gan_discriminateor_model.png", show_shapes = True) def compile_generator_model(self, optimizer = optimizers.Adam()): if len(self._discriminate_layers) <= 0: raise ValueError("you need to build discriminateor model before compile generator model") if len(self._generate_layers) <= 0: raise ValueError("you need to build generator model before compile it") for l in self._discriminate_layers: l.trainable = False for l in self._generate_layers: l.trainable = True out = self._discriminate_generated() self.__generator_model = Model(self.__inputs[1], out) self.__generator_model.compile(optimizer, loss = log_loss_generator) # 这个才是真正需要的模型 self.generator_model = Model(self.__inputs[1], self.__tensors[1]) #if self.log_dir is not None: # plot_model(self.__generator_model, self.log_dir + "/gan_generator_model.png", show_shapes = True) def train(self, sample_list, epoch = 3, batch_size = 32, step_per = 10, plot=False): """ step_per: 每隔几步训练一次generator """ sample_noise, sample_true = sample_list["x"], sample_list["y"] sample_count = sample_noise.shape[0] batch_count = sample_count // batch_size psudo_y = np.ones((batch_size, ), dtype = "float32") if plot: # plot the real data fig = plt.figure() ax = fig.add_subplot(1,1,1) plt.ion() plt.show() for ei in range(epoch): for i in range(step_per): idx = random.randint(0, batch_count-1) batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size] idx = random.randint(0, batch_count-1) batch_sample = sample_true[idx * batch_size : (idx+1) * batch_size] self.__discriminateor_model.train_on_batch({ "y": batch_sample, "x": batch_noise}, [psudo_y, psudo_y]) idx = random.randint(0, batch_count-1) batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size] self.__generator_model.train_on_batch(batch_noise, psudo_y) if plot: gen_result = self.generator_model.predict_on_batch(batch_noise) self.train_status["gen_result"].append(gen_result) dis_result = self.discriminateor_model.predict_on_batch(gen_result) self.train_status["dis_result"].append(dis_result) freq_g, bin_g = np.histogram(gen_result, density=True) # norm to sum1 freq_g = freq_g * (bin_g[1] - bin_g[0]) bin_g = bin_g[:-1] freq_d, bin_d = np.histogram(batch_sample, density=True) freq_d = freq_d * (bin_d[1] - bin_d[0]) bin_d = bin_d[:-1] ax.plot(bin_g, freq_g, "go-", markersize = 4) ax.plot(bin_d, freq_d, "ko-", markersize = 8) gen1d = gen_result.flatten() dis1d = dis_result.flatten() si = np.argsort(gen1d) ax.plot(gen1d[si], dis1d[si], "r--") if (ei+1) % 20 == 0: ax.cla() plt.title("epoch = %d" % (ei+1)) plt.pause(0.05) if plot: plt.ioff() plt.close() def save_model(self, path_dir): self.generator_model.save(path_dir + "/gan_generator.h5") self.discriminateor_model.save(path_dir + "/gan_discriminateor.h5") def load_model(self, path_dir): from keras.models import load_model custom_obj = { "log_loss_discriminateor": log_loss_discriminateor, "log_loss_generator": log_loss_generator} self.generator_model = load_model(path_dir + "/gan_generator.h5", custom_obj) self.discriminateor_model = load_model(path_dir + "/gan_discriminateor.h5", custom_obj) def _discriminate_generated(self): # 必须每次重新生成一下 disc_t = self.__tensors[1] for l in self._discriminate_layers: disc_t = l(disc_t) return disc_t if __name__ == "__main__": parser = argparse.ArgumentParser("""gan model demo (gaussian sample)""") parser.add_argument("-m", "--model_dir") parser.add_argument("-log", "--log_dir") parser.add_argument("-b", "--batch_size", type = int, default = 32) parser.add_argument("-log_lvl", "--log_lvl", default = "info", metavar = "可以指定INFO,DEBUG,WARN, ERROR") parser.add_argument("-e", "--epoch", type = int, default = 10) args = parser.parse_args() log_lvl = {"info": logging.INFO, "debug": logging.DEBUG, "warn": logging.WARN, "warning": logging.WARN, "error": logging.ERROR, "err": logging.ERROR}[args.log_lvl.lower()] app_logger.init(log_lvl) loger.info("args: %r" % args) step_per = 20 sample_size = args.batch_size * 100 # 整个测试样本集合 noise_dim = 4 signal_dim = 1 x = np.random.uniform(-3, 3, size = (sample_size, noise_dim)) y = np.random.normal(size = (sample_size, signal_dim)) samples = {"x": x, "y": y} gan = GANModel([signal_dim, noise_dim], args.log_dir) gan.add_discr_layer(layers.Dense(200, activation="relu")) gan.add_discr_layer(layers.Dense(50, activation="softmax")) gan.add_discr_layer(layers.Lambda(lambda y: K.max(y, axis=-1, keepdims=True), output_shape = (1,))) gan.add_gen_layer(layers.Dense(200, activation="relu")) gan.add_gen_layer(layers.Dense(100, activation="relu")) gan.add_gen_layer(layers.Dense(50, activation="relu")) gan.add_gen_layer(layers.Dense(signal_dim)) gan.compile_generator_model() loger.info("compile generator finished") gan.compile_discriminateor_model() loger.info("compile discriminator finished") gan.train(samples, args.epoch, args.batch_size, step_per, plot=True) gen_results = gan.train_status["gen_result"] dis_results = gan.train_status["dis_result"] gen_result = gen_results[-1] dis_result = dis_results[-1] freq_g, bin_g = np.histogram(gen_result, density=True) # norm to sum1 freq_g = freq_g * (bin_g[1] - bin_g[0]) bin_g = bin_g[:-1] freq_d, bin_d = np.histogram(y, bins = 100, density=True) freq_d = freq_d * (bin_d[1] - bin_d[0]) bin_d = bin_d[:-1] plt.plot(bin_g, freq_g, "go-", markersize = 4) plt.plot(bin_d, freq_d, "ko-", markersize = 8) gen1d = gen_result.flatten() dis1d = dis_result.flatten() si = np.argsort(gen1d) plt.plot(gen1d[si], dis1d[si], "r--") plt.savefig("img/gan_results.png") if not path.exists(args.model_dir): os.mkdir(args.model_dir) gan.save_model(args.model_dir) # app_logger.py import logging def init(lvl=logging.DEBUG): log_handler = logging.StreamHandler() # create formatter formatter = logging.Formatter("[%(asctime)s] %(levelname)s %(filename)s:%(funcName)s:%(lineno)d > %(message)s") log_handler.setFormatter(formatter) logging.basicConfig(level = lvl, handlers = [log_handler])
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/44492.html
摘要:深度卷积对抗生成网络是的变体,是一种将卷积引入模型的网络。特点是判别器使用来替代空间池化,生成器使用反卷积使用稳定学习,有助于处理初始化不良导致的训练问题生成器输出层使用激活函数,其它层使用激活函数。 介绍 showImg(https://segmentfault.com/img/bVbkDEF?w=2572&h=1080); 如图所示,GAN网络会同时训练两个模型。生成器:负责生成数...
摘要:深度学习框架的作者人工智能专家最近开发了一个专注于开源项目的讨论合作的平台地址。网站首页表明了它的三个目标专注重要却被小看了的研究问题把研究者联系起来,并鼓励开放的科学合作为想增加机器学习经验的学生提供学习的环境。 深度学习框架Keras的作者、Google人工智能专家François Chollet 最近开发了一个专注于AI开源项目的讨论&合作的平台AI·ON(地址:http://ai-o...
摘要:介绍权重正则化可以减轻深度神经网络模型的过拟合问题,可以提升对新数据的泛化能力。代码展示在卷积层中使用正则化。许多正则化方法通过向训练数据添加噪声来防止过拟合。模型使用损失函数,优化器。 showImg(https://segmentfault.com/img/bVbpa1n?w=384&h=131); 介绍 权重正则化可以减轻深度神经网络模型的过拟合问题,可以提升对新数据的泛化能力。...
阅读 1123·2023-04-26 00:12
阅读 3247·2021-11-17 09:33
阅读 1059·2021-09-04 16:45
阅读 1186·2021-09-02 15:40
阅读 2144·2019-08-30 15:56
阅读 2949·2019-08-30 15:53
阅读 3546·2019-08-30 11:23
阅读 1931·2019-08-29 13:54