资讯专栏INFORMATION COLUMN

GAN神经网络的keras实现

Jaden / 3365人阅读

摘要:参考文献主要参考这篇文章为了方便说明和研究,我这里只是设计了一个非常简单的模型,对高斯分布样本进行生成。上面的后面我设置为后,最后好像很难收敛到真正的高斯分布,总是比真的高斯差那么一点。

参考文献:

主要参考这篇文章 Generative Adversarial Networks, link

为了方便说明和研究,我这里只是设计了一个非常简单的模型,对高斯分布样本进行生成。不过从下面的实验中,我还是发现了一些非常有用的特点,可以加深我们对GAN网络的了解。

GAN原理

具体原理可以参考上面的文献,不过在这里还是大概讲一下。
其实GAN的原理非常简单,它有2个子网络组成,一个是Generator,即生成网络,它以噪音样本为输入,通过学习到的权重,把噪音转变(即生成)为有意义的信号;一个是Discriminator,即判别网络,他以信号为输入(可以来自generator生成的信号,也可以是真正的信号),通过学习来判别信号的真假,并输出一个0-1之间的概率。可以把Generator比喻为一个假的印钞机,而Discriminator则是验钞机,他们两个互相竞争,使得印钞机越来越真,同时验钞机也越来越准。但是最终我们是希望Generator越来越真,而Discriminator的输出都是0.5,即难以分辨~~

而在训练的时候,则分两个阶段进行,第一个阶段是Discriminator的学习,此时固定Generator的权重不变,只更新Discriminator的权重。loss函数是:

$$ frac{1}{m}sum_{i=1}^{m}[logD(x^i) + log(1 - D(G(z^i)))] $$

其中m是batch_size, $x$表示真正的信号,$z$表示噪音样本。训练时分别从噪音分布和真实分布中选出m个噪音输入样本和m个真实信号样本,通过对以上的loss function最大化更新Discriminator的权重

第二个阶段是对Generator进行训练,此时的loss function是:

$$ frac{1}{m}sum_{i=1}^{m}[log(1 - D(G(z^i)))] $$

不过,此时是对loss最小化来更新Generator的权重。

另外,这2个阶段并不是交替进行的,而是执行K次Discriminator的更新,再执行1次Generator的更新。
后面的实验结果也显示,K的选择非常关键。

具体实现

主要工具是 python + keras,用keras实现一些常用的网络特别容易,比如MLP、word2vec、LeNet、lstm等等,github上都有详细demo。但是稍微复杂些的就要费些时间自己写了。不过整体看,依然比用原生tf写要方便。而且,我们还可以把keras当初是学习tf的参考代码,里面很多写法都非常值得借鉴。

废话不多说了,直接上代码吧:

GANmodel

只列出最主要的代码

# 这是针对GAN特殊设计的loss function
def log_loss_discriminator(y_true, y_pred):
    return - K.log(K.maximum(K.epsilon(), y_pred))
    
def log_loss_generator(y_true, y_pred):
    return K.log(K.maximum(K.epsilon(), 1. - y_pred))
    
class GANModel:
    def __init__(self, 
                 input_dim,
                 log_dir = None):
        """
            __tensor[0]: 定义了discriminateor的表达式,  对y进行判别,true samples
            __tensor[1]: 定义了generator的表达式, 对x进行生成,noise samples
        """
        if isinstance(input_dim, list):
            input_dim_y, input_dim_x = input_dim[0], input_dim[1]
        elif isinstance(input_dim, int):
            input_dim_x = input_dim_y = input_dim
        else:
            raise ValueError("input_dim should be list or interger, got %r" % input_dim) 
        # 必须使用名字,方便后面分别输入2个信号
        self.__inputs = [layers.Input(shape=(input_dim_y,), name = "y"), 
                            layers.Input(shape=(input_dim_x,), name = "x")]
        self.__tensors = [None, None] 
        self.log_dir = log_dir
        self._discriminate_layers = []
        self._generate_layers = []
        self.train_status = defaultdict(list)
        
    def add_gen_layer(self, layer):
        self._add_layer(layer, True)
    def add_discr_layer(self, layer):
        self._add_layer(layer)
    def _add_layer(self, layer, for_gen=False):
        idx = 0
        if for_gen:
            self._generate_layers.append(layer)
            idx = 1
        else:
            self._discriminate_layers.append(layer)
        
        if self.__tensors[idx] is None:
            self.__tensors[idx] = layer(self.__inputs[idx])
        else:
            self.__tensors[idx] = layer(self.__tensors[idx])
            
    def compile_discriminateor_model(self, optimizer = optimizers.Adam()):
        if len(self._discriminate_layers) <= 0:
            raise ValueError("you need to build discriminateor model before compile it")
        if len(self._generate_layers) <= 0:
            raise ValueError("you need to build generator model before compile discriminateo model")
        # 通过指定trainable = False,可以freeze权重的更新。必须放在compile之前
        for l in self._discriminate_layers:
            l.trainable = True
        for l in self._generate_layers:
            l.trainable = False
        discriminateor_out1 = self.__tensors[0]
        discriminateor_out2 = layers.Lambda(lambda y: 1. - y)(self._discriminate_generated())
        # 如果输出2个信号,keras会分别在各个信号上引用loss function,然后累加,对累加的结果进行
        # minimize 更新。双下划线的model是参与训练的模型。
        self.__discriminateor_model = Model(self.__inputs, [discriminateor_out1, discriminateor_out2])
        self.__discriminateor_model.compile(optimizer, 
                                     loss = log_loss_discriminator)
       
        # 这个才是真正的discriminator model 
        self.discriminateor_model = Model(self.__inputs[0], self.__tensors[0])
        self.discriminateor_model.compile(optimizer, 
                                     loss = log_loss_discriminator)
        if self.log_dir is not None:
            # 需要安装pydot和graphviz。没有的可以先注释掉
            plot_model(self.__discriminateor_model, self.log_dir + "/gan_discriminateor_model.png", show_shapes = True) 
        
    def compile_generator_model(self, optimizer = optimizers.Adam()):
        if len(self._discriminate_layers) <= 0:
            raise ValueError("you need to build discriminateor model before compile generator model")
        if len(self._generate_layers) <= 0:
            raise ValueError("you need to build generator model before compile it")
        
        for l in self._discriminate_layers:
            l.trainable = False
        for l in self._generate_layers:
            l.trainable = True
              
        out = self._discriminate_generated()
        self.__generator_model = Model(self.__inputs[1], out)
        self.__generator_model.compile(optimizer, 
                                     loss = log_loss_generator)
        # 这个才是真正的Generator模型
        self.generator_model = Model(self.__inputs[1], self.__tensors[1])
        if self.log_dir is not None:
            plot_model(self.__generator_model, self.log_dir + "/gan_generator_model.png", show_shapes = True) 

    def train(self, sample_list, epoch = 3, batch_size = 32, step_per = 10, plot=False):
        """
        step_per: 每隔几步训练一次generator,即K
        """
        sample_noise, sample_true = sample_list["x"], sample_list["y"]
        sample_count = sample_noise.shape[0]
        batch_count = sample_count // batch_size 
        # 这里比较trick了,因为keras的model必须要一个y。但是gan其实是没有y的。只好伪造一个
        # 满足keras的“无理”要求
        psudo_y = np.ones((batch_size, ), dtype = "float32")
        if plot:
            # plot the real data
            fig = plt.figure()
            ax = fig.add_subplot(1,1,1)
            plt.ion()
            plt.show() 
        for ei in range(epoch):
            for i in range(step_per):
                idx = random.randint(0, batch_count-1)
                batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size]
                idx = random.randint(0, batch_count-1)
                batch_sample = sample_true[idx * batch_size : (idx+1) * batch_size]
                self.__discriminateor_model.train_on_batch({
                    "y":  batch_sample,
                    "x": batch_noise}, 
                    [psudo_y, psudo_y])

            idx = random.randint(0, batch_count-1)
            batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size]
            self.__generator_model.train_on_batch(batch_noise, psudo_y)
            
            if plot:
                gen_result = self.generator_model.predict_on_batch(batch_noise)
                self.train_status["gen_result"].append(gen_result)
                dis_result = self.discriminateor_model.predict_on_batch(gen_result)
                self.train_status["dis_result"].append(dis_result)
                freq_g, bin_g = np.histogram(gen_result, density=True)
                # norm to sum1
                freq_g = freq_g * (bin_g[1] - bin_g[0])
                bin_g = bin_g[:-1]
                freq_d, bin_d = np.histogram(batch_sample, density=True)
                freq_d = freq_d * (bin_d[1] - bin_d[0])
                bin_d = bin_d[:-1]
                ax.plot(bin_g, freq_g, "go-", markersize = 4)
                ax.plot(bin_d, freq_d, "ko-", markersize = 8)
                gen1d = gen_result.flatten()
                dis1d = dis_result.flatten()
                si = np.argsort(gen1d)
                ax.plot(gen1d[si], dis1d[si], "r--")
                if (ei+1) % 20 == 0:
                    ax.cla()
                plt.title("epoch = %d" % (ei+1))
                plt.pause(0.05)
        if plot:
            plt.ioff()
            plt.close()
main部分

只列出主要部分:从中可以看到主要模型结构和参数取值

    step_per = 20
    sample_size = args.batch_size * 100

    # 整个测试样本集合
    noise_dim = 4
    signal_dim = 1
    x = np.random.uniform(-3, 3, size = (sample_size, noise_dim))
    y = np.random.normal(size = (sample_size, signal_dim))
    samples = {"x": x, 
               "y": y}
    
    gan = GANModel([signal_dim, noise_dim], args.log_dir)
    gan.add_discr_layer(layers.Dense(200, activation="relu"))
    gan.add_discr_layer(layers.Dense(50, activation="softmax"))
    gan.add_discr_layer(layers.Lambda(lambda y: K.max(y, axis=-1, keepdims=True),
                                 output_shape = (1,)))

    gan.add_gen_layer(layers.Dense(200, activation="relu"))
    gan.add_gen_layer(layers.Dense(100, activation="relu"))
    gan.add_gen_layer(layers.Dense(50, activation="relu"))
    gan.add_gen_layer(layers.Dense(signal_dim))
    
    gan.compile_generator_model()
    loger.info("compile generator finished")
    gan.compile_discriminateor_model()
    loger.info("compile discriminator finished")
    
    gan.train(samples, args.epoch, args.batch_size, step_per, plot=True)
实验结果 K的影响

在论文中,作者就提到K对训练结果影响很大,
使用上面的step_per = 20,我得到的结果比较理想:

可以看到,最后Generator生成的数据(绿线)和真实的高斯分布(黑线)非常接近了,导致Discriminator也变得无法辨认了(p = 0.5)。

但是把step_per设为3后,结果就发散的厉害,比较难收敛:

在文章中,作者也提到,Discriminator和Generator必须匹配好,一般要多训练几次Discriminator再训练一次Generator,这是因为Discriminator是Generator的前提,如果D都没有训练好,那G的更新方向就会不准。

输入噪音维度的影响

另外,我还发现,noise_dim对结果影响也非常大。上面的noise_dim = 4, 后面我设置为1后,最后好像很难收敛到真正的高斯分布,总是比真的高斯差那么一点。

所以,我的猜测是:Generator的输入其实可以看成是真实信号在其他维度上的映射,通过模型的学习过程,它找到了二者的映射关系,所以反过来可以认为Generator把真实信号分解到了高维空间里,此时,当然是维度越高信号被分解的越好,越容易接近真实信号。
而且,从信号拟合角度看,因为我实验中的高斯信号是非线性的,而使用的激活函数都是线性函数,如果噪音也是1维的,相当于用一堆线性函数去拟合非线性函数,这种情况必须要在一个更高的维度上才能实现。

训练一个稳定的GAN网络是一个非常复杂的过程,所幸已经有大神在这方面做了很多探索。详细请参考这里

完整代码
# demo_gan.py
# -*- encoding: utf8 -*-
"""
GAN网络Demo
"""
import os
from os import path
import argparse
import logging
import traceback
import random
import pickle
import numpy as np
import tensorflow as tf
from keras import optimizers 
from keras import layers
from keras import callbacks, regularizers, activations
from keras.engine import Model
from keras.utils.vis_utils import plot_model
import keras.backend as K
from collections import defaultdict
from matplotlib import pyplot as plt
import app_logger

loger = logging.getLogger(__name__)

# 注意pred不能为负数,因为pred是一个概率。所以最后一个激活函数的选择要注意
def log_loss_discriminator(y_true, y_pred):
    return - K.log(K.maximum(K.epsilon(), y_pred))
    
def log_loss_generator(y_true, y_pred):
    return K.log(K.maximum(K.epsilon(), 1. - y_pred))

class GANModel:
    def __init__(self, 
                 input_dim,
                 log_dir = None):
        """
            __tensor[0]: 定义了discriminateor的表达式
            __tensor[1]: 定义了generator的表达式
        """
        # discriminateor 对y进行判别,true samples
        # generator 对x进行生成,noise samples
        if isinstance(input_dim, list):
            input_dim_y, input_dim_x = input_dim[0], input_dim[1]
        elif isinstance(input_dim, int):
            input_dim_x = input_dim_y = input_dim
        else:
            raise ValueError("input_dim should be list or interger, got %r" % input_dim) 
    
        self.__inputs = [layers.Input(shape=(input_dim_y,), name = "y"), 
                            layers.Input(shape=(input_dim_x,), name = "x")]
        self.__tensors = [None, None] 
        self.log_dir = log_dir
        self._discriminate_layers = []
        self._generate_layers = []
        self.train_status = defaultdict(list)
        
    def add_gen_layer(self, layer):
        self._add_layer(layer, True)
    def add_discr_layer(self, layer):
        self._add_layer(layer)
    def _add_layer(self, layer, for_gen=False):
        idx = 0
        if for_gen:
            self._generate_layers.append(layer)
            idx = 1
        else:
            self._discriminate_layers.append(layer)
        
        if self.__tensors[idx] is None:
            self.__tensors[idx] = layer(self.__inputs[idx])
        else:
            self.__tensors[idx] = layer(self.__tensors[idx])
            
    def compile_discriminateor_model(self, optimizer = optimizers.Adam()):
        if len(self._discriminate_layers) <= 0:
            raise ValueError("you need to build discriminateor model before compile it")
        if len(self._generate_layers) <= 0:
            raise ValueError("you need to build generator model before compile discriminateo model")
        
        for l in self._discriminate_layers:
            l.trainable = True
        for l in self._generate_layers:
            l.trainable = False
        discriminateor_out1 = self.__tensors[0]
        discriminateor_out2 = layers.Lambda(lambda y: 1. - y)(self._discriminate_generated())
        self.__discriminateor_model = Model(self.__inputs, [discriminateor_out1, discriminateor_out2])
        self.__discriminateor_model.compile(optimizer, 
                                     loss = log_loss_discriminator)
       
        # 这个才是需要的discriminateor model 
        self.discriminateor_model = Model(self.__inputs[0], self.__tensors[0])
        self.discriminateor_model.compile(optimizer, 
                                     loss = log_loss_discriminator)
        #if self.log_dir is not None:
        #    plot_model(self.__discriminateor_model, self.log_dir + "/gan_discriminateor_model.png", show_shapes = True) 
        
    def compile_generator_model(self, optimizer = optimizers.Adam()):
        if len(self._discriminate_layers) <= 0:
            raise ValueError("you need to build discriminateor model before compile generator model")
        if len(self._generate_layers) <= 0:
            raise ValueError("you need to build generator model before compile it")
        
        for l in self._discriminate_layers:
            l.trainable = False
        for l in self._generate_layers:
            l.trainable = True
              
        out = self._discriminate_generated()
        self.__generator_model = Model(self.__inputs[1], out)
        self.__generator_model.compile(optimizer, 
                                     loss = log_loss_generator)
        # 这个才是真正需要的模型
        self.generator_model = Model(self.__inputs[1], self.__tensors[1])
        #if self.log_dir is not None:
        #    plot_model(self.__generator_model, self.log_dir + "/gan_generator_model.png", show_shapes = True) 

    def train(self, sample_list, epoch = 3, batch_size = 32, step_per = 10, plot=False):
        """
        step_per: 每隔几步训练一次generator
        """
        sample_noise, sample_true = sample_list["x"], sample_list["y"]
        sample_count = sample_noise.shape[0]
        batch_count = sample_count // batch_size 
        psudo_y = np.ones((batch_size, ), dtype = "float32")
        if plot:
            # plot the real data
            fig = plt.figure()
            ax = fig.add_subplot(1,1,1)
            plt.ion()
            plt.show() 
        for ei in range(epoch):
            for i in range(step_per):
                idx = random.randint(0, batch_count-1)
                batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size]
                idx = random.randint(0, batch_count-1)
                batch_sample = sample_true[idx * batch_size : (idx+1) * batch_size]
                self.__discriminateor_model.train_on_batch({
                    "y":  batch_sample,
                    "x": batch_noise}, 
                    [psudo_y, psudo_y])

            idx = random.randint(0, batch_count-1)
            batch_noise = sample_noise[idx * batch_size : (idx+1) * batch_size]
            self.__generator_model.train_on_batch(batch_noise, psudo_y)
            
            if plot:
                gen_result = self.generator_model.predict_on_batch(batch_noise)
                self.train_status["gen_result"].append(gen_result)
                dis_result = self.discriminateor_model.predict_on_batch(gen_result)
                self.train_status["dis_result"].append(dis_result)
                freq_g, bin_g = np.histogram(gen_result, density=True)
                # norm to sum1
                freq_g = freq_g * (bin_g[1] - bin_g[0])
                bin_g = bin_g[:-1]
                freq_d, bin_d = np.histogram(batch_sample, density=True)
                freq_d = freq_d * (bin_d[1] - bin_d[0])
                bin_d = bin_d[:-1]
                ax.plot(bin_g, freq_g, "go-", markersize = 4)
                ax.plot(bin_d, freq_d, "ko-", markersize = 8)
                gen1d = gen_result.flatten()
                dis1d = dis_result.flatten()
                si = np.argsort(gen1d)
                ax.plot(gen1d[si], dis1d[si], "r--")
                if (ei+1) % 20 == 0:
                    ax.cla()
                plt.title("epoch = %d" % (ei+1))
                plt.pause(0.05)
        if plot:
            plt.ioff()
            plt.close()
            
            
    def save_model(self, path_dir):
        self.generator_model.save(path_dir + "/gan_generator.h5")
        self.discriminateor_model.save(path_dir + "/gan_discriminateor.h5")
    
    def load_model(self, path_dir):
        from keras.models import load_model
        custom_obj = {
            "log_loss_discriminateor": log_loss_discriminateor,
            "log_loss_generator": log_loss_generator}
        self.generator_model = load_model(path_dir + "/gan_generator.h5", custom_obj)
        self.discriminateor_model = load_model(path_dir + "/gan_discriminateor.h5", custom_obj)
    
    def _discriminate_generated(self):
        # 必须每次重新生成一下 
        disc_t = self.__tensors[1]
        for l in self._discriminate_layers:
            disc_t = l(disc_t)            
        return disc_t
    
if __name__ == "__main__":
    parser = argparse.ArgumentParser("""gan model demo (gaussian sample)""")
    parser.add_argument("-m", "--model_dir")
    parser.add_argument("-log", "--log_dir")
    parser.add_argument("-b", "--batch_size", type = int, default = 32)
    parser.add_argument("-log_lvl", "--log_lvl", default = "info",
                        metavar = "可以指定INFO,DEBUG,WARN, ERROR")
    parser.add_argument("-e", "--epoch", type = int, default = 10)
    
    args = parser.parse_args()
    
    log_lvl = {"info": logging.INFO,
               "debug": logging.DEBUG,
               "warn": logging.WARN,
               "warning": logging.WARN,
               "error": logging.ERROR,
               "err": logging.ERROR}[args.log_lvl.lower()]
    app_logger.init(log_lvl)
        
    loger.info("args: %r" % args)
    step_per = 20
    sample_size = args.batch_size * 100

    # 整个测试样本集合
    noise_dim = 4
    signal_dim = 1
    x = np.random.uniform(-3, 3, size = (sample_size, noise_dim))
    y = np.random.normal(size = (sample_size, signal_dim))
    samples = {"x": x, 
               "y": y}
    
    gan = GANModel([signal_dim, noise_dim], args.log_dir)
    gan.add_discr_layer(layers.Dense(200, activation="relu"))
    gan.add_discr_layer(layers.Dense(50, activation="softmax"))
    gan.add_discr_layer(layers.Lambda(lambda y: K.max(y, axis=-1, keepdims=True),
                                 output_shape = (1,)))

    gan.add_gen_layer(layers.Dense(200, activation="relu"))
    gan.add_gen_layer(layers.Dense(100, activation="relu"))
    gan.add_gen_layer(layers.Dense(50, activation="relu"))
    gan.add_gen_layer(layers.Dense(signal_dim))
    
    gan.compile_generator_model()
    loger.info("compile generator finished")
    gan.compile_discriminateor_model()
    loger.info("compile discriminator finished")
    
    gan.train(samples, args.epoch, args.batch_size, step_per, plot=True)
    gen_results = gan.train_status["gen_result"]
    dis_results = gan.train_status["dis_result"]

    gen_result = gen_results[-1]
    dis_result = dis_results[-1]
    freq_g, bin_g = np.histogram(gen_result, density=True)
    # norm to sum1
    freq_g = freq_g * (bin_g[1] - bin_g[0])
    bin_g = bin_g[:-1]
    freq_d, bin_d = np.histogram(y, bins = 100, density=True)
    freq_d = freq_d * (bin_d[1] - bin_d[0])
    bin_d = bin_d[:-1]
    plt.plot(bin_g, freq_g, "go-", markersize = 4)
    plt.plot(bin_d, freq_d, "ko-", markersize = 8)
    gen1d = gen_result.flatten()
    dis1d = dis_result.flatten()
    si = np.argsort(gen1d)
    plt.plot(gen1d[si], dis1d[si], "r--")
    plt.savefig("img/gan_results.png")
    if not path.exists(args.model_dir):
        os.mkdir(args.model_dir)
    gan.save_model(args.model_dir)


# app_logger.py
import logging

def init(lvl=logging.DEBUG):
    log_handler = logging.StreamHandler()
    # create formatter
    formatter = logging.Formatter("[%(asctime)s] %(levelname)s %(filename)s:%(funcName)s:%(lineno)d > %(message)s")
    log_handler.setFormatter(formatter)
    logging.basicConfig(level = lvl, handlers = [log_handler])

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/44492.html

相关文章

  • DCGAN(深度卷积对抗网络)案例

    摘要:深度卷积对抗生成网络是的变体,是一种将卷积引入模型的网络。特点是判别器使用来替代空间池化,生成器使用反卷积使用稳定学习,有助于处理初始化不良导致的训练问题生成器输出层使用激活函数,其它层使用激活函数。 介绍 showImg(https://segmentfault.com/img/bVbkDEF?w=2572&h=1080); 如图所示,GAN网络会同时训练两个模型。生成器:负责生成数...

    derek_334892 评论0 收藏0
  • Keras作者发布专注AI开源项目讨论&合作平台 AI·ON

    摘要:深度学习框架的作者人工智能专家最近开发了一个专注于开源项目的讨论合作的平台地址。网站首页表明了它的三个目标专注重要却被小看了的研究问题把研究者联系起来,并鼓励开放的科学合作为想增加机器学习经验的学生提供学习的环境。 深度学习框架Keras的作者、Google人工智能专家François Chollet 最近开发了一个专注于AI开源项目的讨论&合作的平台AI·ON(地址:http://ai-o...

    wdzgege 评论0 收藏0
  • 使用权重正则化较少模型过拟合

    摘要:介绍权重正则化可以减轻深度神经网络模型的过拟合问题,可以提升对新数据的泛化能力。代码展示在卷积层中使用正则化。许多正则化方法通过向训练数据添加噪声来防止过拟合。模型使用损失函数,优化器。 showImg(https://segmentfault.com/img/bVbpa1n?w=384&h=131); 介绍 权重正则化可以减轻深度神经网络模型的过拟合问题,可以提升对新数据的泛化能力。...

    neroneroffy 评论0 收藏0

发表评论

0条评论

Jaden

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<