TensorFlow
更新时间:2020-11-27
TensorFlow
基于tensorflow框架的MNIST图像分类任务示例代码,训练数据集点击这里下载
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def
old_v = tf.logging.get_verbosity()
tf.logging.set_verbosity(tf.logging.ERROR)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.InteractiveSession(config=config)
mnist = input_data.read_data_sets('./train_data/', one_hot=True)
print('training data shape ', mnist.train.images.shape)
print('training label shape ', mnist.train.labels.shape)
# 定义卷积层
def conv2d(x, filter_shape, strides_x, strides_y, padding, name):
"""
Args:
x: 4-D inputs. [batch_size, in_height, in_width, in_channels]
filter_shape: A list of ints.[filter_height, filter_width, in_channels, out_channels]
strides: A list of ints. 1-D tensor of length 4. The stride of the sliding window for each dimension of input.
padding: A string from: "SAME", "VALID". The type of padding algorithm to use.
Returns:
h_conv: A 4-D tensor. [batch_size, out_height, out_width, out_channels].
if padding is 'SAME', then out_height==in_height.
else, out_height = in_height - filter_height + 1.
the same for out_width.
"""
assert padding in ['SAME', 'VALID']
strides = [1, strides_x, strides_y, 1]
with tf.variable_scope(name):
W_conv = tf.get_variable('w', shape=filter_shape)
b_conv = tf.get_variable('b', shape=[filter_shape[-1]])
h_conv = tf.nn.conv2d(x, W_conv, strides=strides, padding=padding)
h_conv_relu = tf.nn.relu(h_conv + b_conv)
return h_conv_relu
def max_pooling(x, k_height, k_width, strides_x, strides_y, padding='SAME'):
"""max pooling layer."""
ksize = [1, k_height, k_width, 1]
strides = [1, strides_x, strides_y, 1]
h_pool = tf.nn.max_pool(x, ksize, strides, padding)
return h_pool
def dropout(x, keep_prob, name=None):
"""dropout layer"""
return tf.nn.dropout(x, keep_prob, name=name)
def fc(x, in_size, out_size, name, activation=None):
"""fully-connect
Args:
x: 2-D tensor, [batch_size, in_size]
in_size: the size of input tensor.
out_size: the size of output tensor.
activation: 'relu' or 'sigmoid' or 'tanh'.
Returns:
h_fc: 2-D tensor, [batch_size, out_size].
"""
if activation is not None:
assert activation in ['relu', 'sigmoid', 'tanh'], 'Wrong activation function.'
with tf.variable_scope(name):
w = tf.get_variable('w', shape=[in_size, out_size], dtype=tf.float32)
b = tf.get_variable('b', [out_size], dtype=tf.float32)
h_fc = tf.nn.xw_plus_b(x, w, b)
if activation == 'relu':
return tf.nn.relu(h_fc)
elif activation == 'tanh':
return tf.nn.tanh(h_fc)
elif activation == 'sigmoid':
return tf.nn.sigmoid(h_fc)
else:
return h_fc
with tf.name_scope('inputs'):
X_ = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 10])
# 把X转为卷积所需要的形式
X = tf.reshape(X_, [-1, 28, 28, 1], name="input")
h_conv1 = conv2d(X, [5, 5, 1, 32], 1, 1, 'SAME', 'conv1')
h_pool1 = max_pooling(h_conv1, 2, 2, 2, 2)
# 14*14*64
h_conv2 = conv2d(h_pool1, [5, 5, 32, 64], 1, 1, 'SAME', 'conv2')
# 7*7*64
h_pool2 = max_pooling(h_conv2, 2, 2, 2, 2)
# flatten 1024
h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
h_fc1 = fc(h_pool2_flat, 7*7*64, 1024, 'fc1', 'relu')
# dropout: 输出的维度和h_fc1一样,只是随机部分值被值为零
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
h_fc2 = fc(h_fc1_drop, 1024, 10, 'fc2')
y_conv = tf.nn.softmax(h_fc2)
tf.identity(y_conv, name="Output")
print('Finished building network.')
# 训练评估
cross_entropy = -tf.reduce_sum(y_*tf.log(y_conv))
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
prediction_labels = tf.argmax(y_conv, axis=1, name="pred_labels")
correct_prediction = tf.equal(prediction_labels, tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
test_acc_sum = tf.Variable(0.0)
batch_acc = tf.placeholder(tf.float32)
new_test_acc_sum = tf.add(test_acc_sum, batch_acc)
update = tf.assign(test_acc_sum, new_test_acc_sum)
sess.run(tf.global_variables_initializer())
for i in range(5000):
X_batch, y_batch = mnist.train.next_batch(batch_size=50)
if (i+1) % 500 == 0:
train_accuracy = accuracy.eval(feed_dict={X_: X_batch, y_: y_batch, keep_prob: 1.0})
print("step %d, training acc %g" % (i+1, train_accuracy))
train_step.run(feed_dict={X_: X_batch, y_: y_batch, keep_prob: 0.5})
# 全部训练完了再做测试,batch_size=100
for i in range(100):
X_batch, y_batch = mnist.test.next_batch(batch_size=100)
test_acc = accuracy.eval(feed_dict={X_: X_batch, y_: y_batch, keep_prob: 1.0})
update.eval(feed_dict={batch_acc: test_acc})
if (i+1) % 20 == 0:
print("testing step %d, test_acc_sum %g" % (i+1, test_acc_sum.eval()))
print(" test_accuracy %g" % (test_acc_sum.eval() / 100.0))
builder = tf.saved_model.builder.SavedModelBuilder('./output/')
signature = predict_signature_def(inputs={'inputs': X_, 'keep_prob': keep_prob},
outputs={'Output': y_conv})
builder.add_meta_graph_and_variables(sess=sess,
tags=[tag_constants.SERVING],
signature_def_map={'predict': signature})
builder.save()