-
Notifications
You must be signed in to change notification settings - Fork 0
/
generator.py
132 lines (115 loc) · 5.17 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import tensorflow as tf
import numpy as np
import os
import shutil
path_to_file = tf.keras.utils.get_file('all.txt', 'https://raw.githubusercontent.com/ayyucedemirbas/Star-Trek-TOS-Transcripts/main/all.txt')
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
print('Length of Character {}'.format(len(text)))
print(text[:250])
vocab = sorted(set(text))
print('{} unique characters'.format(len(vocab)))
print(vocab)
# Create a mapping from characters to numbers and vice versa
char2idx = {u:i for i,u in enumerate(vocab)}
idx2char = np.array(vocab)
text_as_int = np.array([char2idx[c] for c in text])
print("{} is mapped to {}".format(text[:10], text_as_int[:10]))
# Maximum sentence we are inputing to the RNN
seq_length =100
examples_per_epoch = len(text)//(seq_length+1)
print(examples_per_epoch)
# Creating dataset
# from_tensor_slices is like creating a generator for our dataset and is suitable for handling
# large datasets
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
# .take is like iloc in pandas
for i in char_dataset.take(5):
print(idx2char[i.numpy()])
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)
for item in sequences.take(5):
print(repr(''.join(idx2char[item.numpy()])))
def split_input_target(chunk):
"""
This function generate input and target text from the given text.
Input text does not contain last part and target doesnot contain first character
"""
return chunk[:-1], chunk[1:]
dataset = sequences.map(split_input_target)
for input_example, target_example in dataset.take(1):
print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
print("Step {:4d}".format(i))
print(" input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
print(" expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))
BATCH_SIZE = 64
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
dataset
#Constants for model
vocab_size = len(vocab)
embedding_dim = 512
# Using half of rnn_units for LSTM
# Speed of training was reduced to half, so i can try 1024 units
rnn_units = 1024
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential([
tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
tf.keras.layers.LSTM(rnn_units, return_sequences = True, stateful= True, recurrent_initializer='glorot_uniform'),
tf.keras.layers.LSTM(rnn_units, return_sequences = True, stateful= True, recurrent_initializer='glorot_uniform'),
tf.keras.layers.Dense(vocab_size)
])
return model
model = build_model(vocab_size, embedding_dim, rnn_units, BATCH_SIZE)
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
#Here we are chosing the next character randomly based on its probablity
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
#idk what this line does
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
sampled_indices
#Decoding what this means
print("Input: \n", repr("".join(idx2char[input_example_batch[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices ])))
def loss(labels, logits):
return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
model.compile(optimizer='adam', loss=loss)
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True)
EPOCHS = 10 #takes a long time
#loss: 0.8658
#epochs should be at least 20
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])
#tf.train.latest_checkpoint(checkpoint_dir)
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))
model.save('Model_3_LSTM0.h5')
def generate_text(model, start_string):
# Number of characters to generate
num_generate = 5000
# Converting our start string to numbers (vectorizing)
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
# Empty string to store our results
text_generated = []
temperature = 0.7
# Here batch size == 1
model.reset_states()
for i in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0)
predictions = predictions / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return (start_string + ''.join(text_generated))
print(generate_text(model, start_string=u"Spock"))
model.save('Model_3_LSTM.h5')