Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/keras/src/utils/timeseries_dataset.py: 11%

63 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Keras timeseries dataset utilities.""" 

16 

17import numpy as np 

18import tensorflow.compat.v2 as tf 

19 

20# isort: off 

21from tensorflow.python.util.tf_export import keras_export 

22 

23 

24@keras_export( 

25 "keras.utils.timeseries_dataset_from_array", 

26 "keras.preprocessing.timeseries_dataset_from_array", 

27 v1=[], 

28) 

29def timeseries_dataset_from_array( 

30 data, 

31 targets, 

32 sequence_length, 

33 sequence_stride=1, 

34 sampling_rate=1, 

35 batch_size=128, 

36 shuffle=False, 

37 seed=None, 

38 start_index=None, 

39 end_index=None, 

40): 

41 """Creates a dataset of sliding windows over a timeseries provided as array. 

42 

43 This function takes in a sequence of data-points gathered at 

44 equal intervals, along with time series parameters such as 

45 length of the sequences/windows, spacing between two sequence/windows, etc., 

46 to produce batches of timeseries inputs and targets. 

47 

48 Args: 

49 data: Numpy array or eager tensor 

50 containing consecutive data points (timesteps). 

51 Axis 0 is expected to be the time dimension. 

52 targets: Targets corresponding to timesteps in `data`. 

53 `targets[i]` should be the target 

54 corresponding to the window that starts at index `i` 

55 (see example 2 below). 

56 Pass None if you don't have target data (in this case the dataset will 

57 only yield the input data). 

58 sequence_length: Length of the output sequences (in number of timesteps). 

59 sequence_stride: Period between successive output sequences. 

60 For stride `s`, output samples would 

61 start at index `data[i]`, `data[i + s]`, `data[i + 2 * s]`, etc. 

62 sampling_rate: Period between successive individual timesteps 

63 within sequences. For rate `r`, timesteps 

64 `data[i], data[i + r], ... data[i + sequence_length]` 

65 are used for creating a sample sequence. 

66 batch_size: Number of timeseries samples in each batch 

67 (except maybe the last one). If `None`, the data will not be batched 

68 (the dataset will yield individual samples). 

69 shuffle: Whether to shuffle output samples, 

70 or instead draw them in chronological order. 

71 seed: Optional int; random seed for shuffling. 

72 start_index: Optional int; data points earlier (exclusive) 

73 than `start_index` will not be used 

74 in the output sequences. This is useful to reserve part of the 

75 data for test or validation. 

76 end_index: Optional int; data points later (exclusive) than `end_index` 

77 will not be used in the output sequences. 

78 This is useful to reserve part of the data for test or validation. 

79 

80 Returns: 

81 A tf.data.Dataset instance. If `targets` was passed, the dataset yields 

82 tuple `(batch_of_sequences, batch_of_targets)`. If not, the dataset yields 

83 only `batch_of_sequences`. 

84 

85 Example 1: 

86 

87 Consider indices `[0, 1, ... 98]`. 

88 With `sequence_length=10, sampling_rate=2, sequence_stride=3`, 

89 `shuffle=False`, the dataset will yield batches of sequences 

90 composed of the following indices: 

91 

92 ``` 

93 First sequence: [0 2 4 6 8 10 12 14 16 18] 

94 Second sequence: [3 5 7 9 11 13 15 17 19 21] 

95 Third sequence: [6 8 10 12 14 16 18 20 22 24] 

96 ... 

97 Last sequence: [78 80 82 84 86 88 90 92 94 96] 

98 ``` 

99 

100 In this case the last 2 data points are discarded since no full sequence 

101 can be generated to include them (the next sequence would have started 

102 at index 81, and thus its last step would have gone over 98). 

103 

104 Example 2: Temporal regression. 

105 

106 Consider an array `data` of scalar values, of shape `(steps,)`. 

107 To generate a dataset that uses the past 10 

108 timesteps to predict the next timestep, you would use: 

109 

110 ```python 

111 input_data = data[:-10] 

112 targets = data[10:] 

113 dataset = tf.keras.utils.timeseries_dataset_from_array( 

114 input_data, targets, sequence_length=10) 

115 for batch in dataset: 

116 inputs, targets = batch 

117 assert np.array_equal(inputs[0], data[:10]) # First sequence: steps [0-9] 

118 # Corresponding target: step 10 

119 assert np.array_equal(targets[0], data[10]) 

120 break 

121 ``` 

122 

123 Example 3: Temporal regression for many-to-many architectures. 

124 

125 Consider two arrays of scalar values `X` and `Y`, 

126 both of shape `(100,)`. The resulting dataset should consist samples with 

127 20 timestamps each. The samples should not overlap. 

128 To generate a dataset that uses the current timestamp 

129 to predict the corresponding target timestep, you would use: 

130 

131 ```python 

132 X = np.arange(100) 

133 Y = X*2 

134 

135 sample_length = 20 

136 input_dataset = tf.keras.utils.timeseries_dataset_from_array( 

137 X, None, sequence_length=sample_length, sequence_stride=sample_length) 

138 target_dataset = tf.keras.utils.timeseries_dataset_from_array( 

139 Y, None, sequence_length=sample_length, sequence_stride=sample_length) 

140 

141 for batch in zip(input_dataset, target_dataset): 

142 inputs, targets = batch 

143 assert np.array_equal(inputs[0], X[:sample_length]) 

144 

145 # second sample equals output timestamps 20-40 

146 assert np.array_equal(targets[1], Y[sample_length:2*sample_length]) 

147 break 

148 ``` 

149 """ 

150 if start_index: 

151 if start_index < 0: 

152 raise ValueError( 

153 "`start_index` must be 0 or greater. Received: " 

154 f"start_index={start_index}" 

155 ) 

156 if start_index >= len(data): 

157 raise ValueError( 

158 "`start_index` must be lower than the length of the " 

159 f"data. Received: start_index={start_index}, for data " 

160 f"of length {len(data)}" 

161 ) 

162 if end_index: 

163 if start_index and end_index <= start_index: 

164 raise ValueError( 

165 "`end_index` must be higher than `start_index`. " 

166 f"Received: start_index={start_index}, and " 

167 f"end_index={end_index} " 

168 ) 

169 if end_index >= len(data): 

170 raise ValueError( 

171 "`end_index` must be lower than the length of the " 

172 f"data. Received: end_index={end_index}, for data of " 

173 f"length {len(data)}" 

174 ) 

175 if end_index <= 0: 

176 raise ValueError( 

177 "`end_index` must be higher than 0. " 

178 f"Received: end_index={end_index}" 

179 ) 

180 

181 # Validate strides 

182 if sampling_rate <= 0: 

183 raise ValueError( 

184 "`sampling_rate` must be higher than 0. Received: " 

185 f"sampling_rate={sampling_rate}" 

186 ) 

187 if sampling_rate >= len(data): 

188 raise ValueError( 

189 "`sampling_rate` must be lower than the length of the " 

190 f"data. Received: sampling_rate={sampling_rate}, for data " 

191 f"of length {len(data)}" 

192 ) 

193 if sequence_stride <= 0: 

194 raise ValueError( 

195 "`sequence_stride` must be higher than 0. Received: " 

196 f"sequence_stride={sequence_stride}" 

197 ) 

198 if sequence_stride >= len(data): 

199 raise ValueError( 

200 "`sequence_stride` must be lower than the length of the " 

201 f"data. Received: sequence_stride={sequence_stride}, for " 

202 f"data of length {len(data)}" 

203 ) 

204 

205 if start_index is None: 

206 start_index = 0 

207 if end_index is None: 

208 end_index = len(data) 

209 

210 # Determine the lowest dtype to store start positions (to lower memory 

211 # usage). 

212 num_seqs = end_index - start_index - (sequence_length - 1) * sampling_rate 

213 if targets is not None: 

214 num_seqs = min(num_seqs, len(targets)) 

215 if num_seqs < 2147483647: 

216 index_dtype = "int32" 

217 else: 

218 index_dtype = "int64" 

219 

220 # Generate start positions 

221 start_positions = np.arange(0, num_seqs, sequence_stride, dtype=index_dtype) 

222 if shuffle: 

223 if seed is None: 

224 seed = np.random.randint(1e6) 

225 rng = np.random.RandomState(seed) 

226 rng.shuffle(start_positions) 

227 

228 sequence_length = tf.cast(sequence_length, dtype=index_dtype) 

229 sampling_rate = tf.cast(sampling_rate, dtype=index_dtype) 

230 

231 positions_ds = tf.data.Dataset.from_tensors(start_positions).repeat() 

232 

233 # For each initial window position, generates indices of the window elements 

234 indices = tf.data.Dataset.zip( 

235 (tf.data.Dataset.range(len(start_positions)), positions_ds) 

236 ).map( 

237 lambda i, positions: tf.range( 

238 positions[i], 

239 positions[i] + sequence_length * sampling_rate, 

240 sampling_rate, 

241 ), 

242 num_parallel_calls=tf.data.AUTOTUNE, 

243 ) 

244 

245 dataset = sequences_from_indices(data, indices, start_index, end_index) 

246 if targets is not None: 

247 indices = tf.data.Dataset.zip( 

248 (tf.data.Dataset.range(len(start_positions)), positions_ds) 

249 ).map( 

250 lambda i, positions: positions[i], 

251 num_parallel_calls=tf.data.AUTOTUNE, 

252 ) 

253 target_ds = sequences_from_indices( 

254 targets, indices, start_index, end_index 

255 ) 

256 dataset = tf.data.Dataset.zip((dataset, target_ds)) 

257 dataset = dataset.prefetch(tf.data.AUTOTUNE) 

258 if batch_size is not None: 

259 if shuffle: 

260 # Shuffle locally at each iteration 

261 dataset = dataset.shuffle(buffer_size=batch_size * 8, seed=seed) 

262 dataset = dataset.batch(batch_size) 

263 else: 

264 if shuffle: 

265 dataset = dataset.shuffle(buffer_size=1024, seed=seed) 

266 return dataset 

267 

268 

269def sequences_from_indices(array, indices_ds, start_index, end_index): 

270 dataset = tf.data.Dataset.from_tensors(array[start_index:end_index]) 

271 dataset = tf.data.Dataset.zip((dataset.repeat(), indices_ds)).map( 

272 lambda steps, inds: tf.gather(steps, inds), 

273 num_parallel_calls=tf.data.AUTOTUNE, 

274 ) 

275 return dataset 

276