이 포스트는 Github 접속 제약이 있을 경우를 위한 것이며, 아래와 동일 내용을 실행 결과와 함께 Jupyter notebook으로도 보실 수 있습니다.
You can also see the following as Jupyter notebook along with execution result screens if you have no trouble connecting to the Github.
5-1. Data load & pre-processing function
import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist
from time import time
import os
tf.__version__
(train_data, train_labels), (test_data, test_labels) = mnist.load_data()
print("train_data.shape : ", train_data.shape)
print("train_labels.shape : ", train_labels.shape)
print("test_data.shape : ", test_data.shape)
print("test_labels.shape : ", test_labels.shape)
- data max 값 확인
train_data[0].max()
- chanel 차원 추가 : [N, 28, 28] -> [N, 28, 28, 1]
print(np.expand_dims(train_data, axis=-1).shape)
- 분류 갯수 추가 [N,] -> [N, 10]
print(to_categorical(train_labels, 10).shape)
def load_mnist() :
(train_data, train_labels), (test_data, test_labels) = mnist.load_data()
train_data = np.expand_dims(train_data, axis=-1) # [N, 28, 28] -> [N, 28, 28, 1]
test_data = np.expand_dims(test_data, axis=-1) # [N, 28, 28] -> [N, 28, 28, 1]
train_data, test_data = normalize(train_data, test_data)
train_labels = to_categorical(train_labels, 10) # [N,] -> [N, 10]
test_labels = to_categorical(test_labels, 10) # [N,] -> [N, 10]
return train_data, train_labels, test_data, test_labels
def normalize(train_data, test_data):
train_data = train_data.astype(np.float32) / 255.0
test_data = test_data.astype(np.float32) / 255.0
return train_data, test_data
5-2. Performance function
- Loss 함수 : tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=False, label_smoothing=0)
y_true = [[0, 1, 0], [0, 0, 1]]
y_pred = [[0.05, 0.95, 0], [0.1, 0.8, 0.1]]
loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred)
print (loss.numpy())
def loss_fn(model, images, labels):
logits = model(images, training=True)
loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_pred=logits, y_true=labels,
from_logits=True))
return loss
def accuracy_fn(model, images, labels):
logits = model(images, training=False)
prediction = tf.equal(tf.argmax(logits, -1), tf.argmax(labels, -1))
accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))
return accuracy
def grad(model, images, labels):
with tf.GradientTape() as tape:
loss = loss_fn(model, images, labels)
return tape.gradient(loss, model.variables)
5-3. Model function
def flatten() :
return tf.keras.layers.Flatten()
def dense(label_dim, weight_init) :
return tf.keras.layers.Dense(units=label_dim, use_bias=True, kernel_initializer=weight_init)
def sigmoid() :
return tf.keras.layers.Activation(tf.keras.activations.sigmoid)
5-4. Modeling
5-4-1. class version
class create_model_class(tf.keras.Model):
def __init__(self, label_dim):
super(create_model_class, self).__init__()
weight_init = tf.keras.initializers.RandomNormal()
self.model = tf.keras.Sequential()
self.model.add(flatten())
for i in range(2):
self.model.add(dense(256, weight_init))
self.model.add(sigmoid())
self.model.add(dense(label_dim, weight_init))
def call(self, x, training=None, mask=None):
x = self.model(x)
return x
5-4-2. function
def create_model_function(label_dim) :
weight_init = tf.keras.initializers.RandomNormal()
model = tf.keras.Sequential()
model.add(flatten())
for i in range(2) :
model.add(dense(256, weight_init))
model.add(sigmoid())
model.add(dense(label_dim, weight_init))
return model
5-5. Define data & hyper-parameter
""" dataset """
train_x, train_y, test_x, test_y = load_mnist()
""" parameters """
learning_rate = 0.001
batch_size = 128
training_epochs = 1
training_iterations = len(train_x) // batch_size
label_dim = 10
train_flag = True
5-6. tf.data.Dataset (API) 설정
5-6-1. tf.data.Dataset 사용법
test_data = [[k, k+2] for k in range(4)]
test_dataset1 = tf.data.Dataset.from_tensor_slices(test_data)
list(test_dataset1)
- 각 메서드를 하나씩 설정하는 방식
test_dataset1 = test_dataset1.shuffle(buffer_size=20) list(test_dataset1)
test_dataset1 = test_dataset1.batch(4) list(test_dataset1)
- 각 메서드를 한꺼번에 설정하는 방식
test_dataset2 = tf.data.Dataset.from_tensor_slices(test_data).\ shuffle(buffer_size=20).\ batch(4) list(test_dataset2)
5-6-2. tf.data.Dataset 설정
""" Graph Input using Dataset API """
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).\
shuffle(buffer_size=100000).\
prefetch(buffer_size=batch_size).\
batch(batch_size, drop_remainder=True)
test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).\
shuffle(buffer_size=100000).\
prefetch(buffer_size=len(test_x)).\
batch(len(test_x))
5-7. Checkpoint function
def load(model, checkpoint_dir):
print(" [*] Reading checkpoints...")
ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
if ckpt :
ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
checkpoint = tf.train.Checkpoint(dnn=model)
checkpoint.restore(save_path=os.path.join(checkpoint_dir, ckpt_name))
counter = int(ckpt_name.split('-')[1])
print(" [*] Success to read {}".format(ckpt_name))
return True, counter
else:
print(" [*] Failed to find a checkpoint")
return False, 0
def check_folder(dir):
if not os.path.exists(dir):
os.makedirs(dir)
return dir
5-8. Define model & optimizer & writer
""" Model """
model = create_model_function(label_dim)
""" Training """
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
""" Writer """
checkpoint_dir = 'checkpoints_2'
logs_dir = 'logs_2'
model_dir = 'nn_softmax_2'
checkpoint_dir = os.path.join(checkpoint_dir, model_dir)
check_folder(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, model_dir)
logs_dir = os.path.join(logs_dir, model_dir)
5-9. Restore checkpoint & start train or test phase
if train_flag :
checkpoint = tf.train.Checkpoint(dnn=model)
# create writer for tensorboard
summary_writer = tf.summary.create_file_writer(logdir=logs_dir)
start_time = time()
# restore check-point if it exits
could_load, checkpoint_counter = load(model, checkpoint_dir)
if could_load:
start_epoch = (int)(checkpoint_counter / training_iterations)
counter = checkpoint_counter
print(" [*] Load SUCCESS")
else:
start_epoch = 0
start_iteration = 0
counter = 0
print(" [!] Load failed...")
# train phase
with summary_writer.as_default(): # for tensorboard
for epoch in range(start_epoch, training_epochs): # training_epochs = 1
print()
for idx, (train_input, train_label) in enumerate(train_dataset):
grads = grad(model, train_input, train_label)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
train_loss = loss_fn(model, train_input, train_label)
train_accuracy = accuracy_fn(model, train_input, train_label)
for test_input, test_label in test_dataset:
test_accuracy = accuracy_fn(model, test_input, test_label)
tf.summary.scalar(name='train_loss', data=train_loss, step=counter)
tf.summary.scalar(name='train_accuracy', data=train_accuracy, step=counter)
tf.summary.scalar(name='test_accuracy', data=test_accuracy, step=counter)
if idx % 50 == 0:
print(
"Epoch: [%2d] [%5d/%5d] time: %4.4f, train_loss: %.8f, train_accuracy: %.4f, test_Accuracy: %.4f" \
% (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy,
test_accuracy))
counter += 1
checkpoint.save(file_prefix=checkpoint_prefix + '-{}'.format(counter))
# test phase
else :
_, _ = load(network, checkpoint_dir)
for test_input, test_label in test_dataset:
test_accuracy = accuracy_fn(network, test_input, test_label)
print("test_Accuracy: %.4f" % (test_accuracy))