Coverage Report

Created: 2026-03-31 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/simdjson/include/simdjson/generic/ondemand/document_stream-inl.h
Line
Count
Source
1
#ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
2
3
#ifndef SIMDJSON_CONDITIONAL_INCLUDE
4
#define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
5
#include "simdjson/generic/ondemand/base.h"
6
#include "simdjson/generic/ondemand/document_stream.h"
7
#include "simdjson/generic/ondemand/document-inl.h"
8
#include "simdjson/generic/implementation_simdjson_result_base-inl.h"
9
#endif // SIMDJSON_CONDITIONAL_INCLUDE
10
11
#include <algorithm>
12
#include <stdexcept>
13
14
namespace simdjson {
15
namespace SIMDJSON_IMPLEMENTATION {
16
namespace ondemand {
17
18
#ifdef SIMDJSON_THREADS_ENABLED
19
20
0
inline void stage1_worker::finish() {
21
0
  // After calling "run" someone would call finish() to wait
22
0
  // for the end of the processing.
23
0
  // This function will wait until either the thread has done
24
0
  // the processing or, else, the destructor has been called.
25
0
  std::unique_lock<std::mutex> lock(locking_mutex);
26
0
  cond_var.wait(lock, [this]{return has_work == false;});
27
0
}
28
29
inline stage1_worker::~stage1_worker() {
30
  // The thread may never outlive the stage1_worker instance
31
  // and will always be stopped/joined before the stage1_worker
32
  // instance is gone.
33
  stop_thread();
34
}
35
36
0
inline void stage1_worker::start_thread() {
37
0
  std::unique_lock<std::mutex> lock(locking_mutex);
38
0
  if(thread.joinable()) {
39
0
    return; // This should never happen but we never want to create more than one thread.
40
0
  }
41
0
  thread = std::thread([this]{
42
0
      while(true) {
43
0
        std::unique_lock<std::mutex> thread_lock(locking_mutex);
44
0
        // We wait for either "run" or "stop_thread" to be called.
45
0
        cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
46
0
        // If, for some reason, the stop_thread() method was called (i.e., the
47
0
        // destructor of stage1_worker is called, then we want to immediately destroy
48
0
        // the thread (and not do any more processing).
49
0
        if(!can_work) {
50
0
          break;
51
0
        }
52
0
        this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
53
0
              this->_next_batch_start);
54
0
        this->has_work = false;
55
0
        // The condition variable call should be moved after thread_lock.unlock() for performance
56
0
        // reasons but thread sanitizers may report it as a data race if we do.
57
0
        // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
58
0
        cond_var.notify_one(); // will notify "finish"
59
0
        thread_lock.unlock();
60
0
      }
61
0
    }
62
0
  );
63
0
}
64
65
66
0
inline void stage1_worker::stop_thread() {
67
0
  std::unique_lock<std::mutex> lock(locking_mutex);
68
0
  // We have to make sure that all locks can be released.
69
0
  can_work = false;
70
0
  has_work = false;
71
0
  cond_var.notify_all();
72
0
  lock.unlock();
73
0
  if(thread.joinable()) {
74
0
    thread.join();
75
0
  }
76
0
}
77
78
0
inline void stage1_worker::run(document_stream * ds, parser * stage1, size_t next_batch_start) {
79
0
  std::unique_lock<std::mutex> lock(locking_mutex);
80
0
  owner = ds;
81
0
  _next_batch_start = next_batch_start;
82
0
  stage1_thread_parser = stage1;
83
0
  has_work = true;
84
0
  // The condition variable call should be moved after thread_lock.unlock() for performance
85
0
  // reasons but thread sanitizers may report it as a data race if we do.
86
0
  // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
87
0
  cond_var.notify_one(); // will notify the thread lock that we have work
88
0
  lock.unlock();
89
0
}
90
91
#endif  // SIMDJSON_THREADS_ENABLED
92
93
simdjson_inline document_stream::document_stream(
94
  ondemand::parser &_parser,
95
  const uint8_t *_buf,
96
  size_t _len,
97
  size_t _batch_size,
98
  bool _allow_comma_separated
99
) noexcept
100
  : parser{&_parser},
101
    buf{_buf},
102
    len{_len},
103
    batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
104
    allow_comma_separated{_allow_comma_separated},
105
    error{SUCCESS}
106
    #ifdef SIMDJSON_THREADS_ENABLED
107
    , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
108
    #endif
109
{
110
#ifdef SIMDJSON_THREADS_ENABLED
111
  if(worker.get() == nullptr) {
112
    error = MEMALLOC;
113
  }
114
#endif
115
}
116
117
simdjson_inline document_stream::document_stream() noexcept
118
  : parser{nullptr},
119
    buf{nullptr},
120
    len{0},
121
    batch_size{0},
122
    allow_comma_separated{false},
123
    error{UNINITIALIZED}
124
    #ifdef SIMDJSON_THREADS_ENABLED
125
    , use_thread(false)
126
    #endif
127
{
128
}
129
130
simdjson_inline document_stream::~document_stream() noexcept
131
{
132
  #ifdef SIMDJSON_THREADS_ENABLED
133
  worker.reset();
134
  #endif
135
}
136
137
0
inline size_t document_stream::size_in_bytes() const noexcept {
138
0
  return len;
139
0
}
140
141
0
inline size_t document_stream::truncated_bytes() const noexcept {
142
0
  if(error == CAPACITY) { return len - batch_start; }
143
0
  return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
144
0
}
145
146
simdjson_inline document_stream::iterator::iterator() noexcept
147
  : stream{nullptr}, finished{true} {
148
}
149
150
simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
151
  : stream{_stream}, finished{is_end} {
152
}
153
154
0
simdjson_inline simdjson_result<ondemand::document_reference> document_stream::iterator::operator*() noexcept {
155
0
  return simdjson_result<ondemand::document_reference>(stream->doc, stream->error);
156
0
}
157
158
0
simdjson_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
159
0
  // If there is an error, then we want the iterator
160
0
  // to be finished, no matter what. (E.g., we do not
161
0
  // keep generating documents with errors, or go beyond
162
0
  // a document with errors.)
163
0
  //
164
0
  // Users do not have to call "operator*()" when they use operator++,
165
0
  // so we need to end the stream in the operator++ function.
166
0
  //
167
0
  // Note that setting finished = true is essential otherwise
168
0
  // we would enter an infinite loop.
169
0
  if (stream->error) { finished = true; }
170
0
  // Note that stream->error() is guarded against error conditions
171
0
  // (it will immediately return if stream->error casts to false).
172
0
  // In effect, this next function does nothing when (stream->error)
173
0
  // is true (hence the risk of an infinite loop).
174
0
  stream->next();
175
0
  // If that was the last document, we're finished.
176
0
  // It is the only type of error we do not want to appear
177
0
  // in operator*.
178
0
  if (stream->error == EMPTY) { finished = true; }
179
0
  // If we had any other kind of error (not EMPTY) then we want
180
0
  // to pass it along to the operator* and we cannot mark the result
181
0
  // as "finished" just yet.
182
0
  return *this;
183
0
}
184
185
0
simdjson_inline bool document_stream::iterator::at_end() const noexcept {
186
0
  return finished;
187
0
}
188
189
190
0
simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
191
0
  return finished != other.finished;
192
0
}
193
194
0
simdjson_inline bool document_stream::iterator::operator==(const document_stream::iterator &other) const noexcept {
195
0
  return finished == other.finished;
196
0
}
197
198
0
simdjson_inline document_stream::iterator document_stream::begin() noexcept {
199
0
  start();
200
0
  // If there are no documents, we're finished.
201
0
  return iterator(this, error == EMPTY);
202
0
}
203
204
0
simdjson_inline document_stream::iterator document_stream::end() noexcept {
205
0
  return iterator(this, true);
206
0
}
207
208
0
inline void document_stream::start() noexcept {
209
0
  if (error) { return; }
210
0
  error = parser->allocate(batch_size);
211
0
  if (error) { return; }
212
0
  // Always run the first stage 1 parse immediately
213
0
  batch_start = 0;
214
0
  error = run_stage1(*parser, batch_start);
215
0
  while(error == EMPTY) {
216
0
    // In exceptional cases, we may start with an empty block
217
0
    batch_start = next_batch_start();
218
0
    if (batch_start >= len) { return; }
219
0
    error = run_stage1(*parser, batch_start);
220
0
  }
221
0
  if (error) { return; }
222
0
  doc_index = batch_start;
223
0
  doc = document(json_iterator(&buf[batch_start], parser));
224
0
  doc.iter._streaming = true;
225
0
226
0
  #ifdef SIMDJSON_THREADS_ENABLED
227
0
  if (use_thread && next_batch_start() < len) {
228
0
    // Kick off the first thread on next batch if needed
229
0
    error = stage1_thread_parser.allocate(batch_size);
230
0
    if (error) { return; }
231
0
    worker->start_thread();
232
0
    start_stage1_thread();
233
0
    if (error) { return; }
234
0
  }
235
0
  #endif // SIMDJSON_THREADS_ENABLED
236
0
}
237
238
0
inline void document_stream::next() noexcept {
239
0
  // We always enter at once once in an error condition.
240
0
  if (error) { return; }
241
0
  next_document();
242
0
  if (error) { return; }
243
0
  auto cur_struct_index = doc.iter._root - parser->implementation->structural_indexes.get();
244
0
  doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
245
0
246
0
  // Check if at end of structural indexes (i.e. at end of batch)
247
0
  if(cur_struct_index >= static_cast<int64_t>(parser->implementation->n_structural_indexes)) {
248
0
    error = EMPTY;
249
0
    // Load another batch (if available)
250
0
    while (error == EMPTY) {
251
0
      batch_start = next_batch_start();
252
0
      if (batch_start >= len) { break; }
253
0
      #ifdef SIMDJSON_THREADS_ENABLED
254
0
      if(use_thread) {
255
0
        load_from_stage1_thread();
256
0
      } else {
257
0
        error = run_stage1(*parser, batch_start);
258
0
      }
259
0
      #else
260
0
      error = run_stage1(*parser, batch_start);
261
0
      #endif
262
0
      /**
263
0
       * Whenever we move to another window, we need to update all pointers to make
264
0
       * it appear as if the input buffer started at the beginning of the window.
265
0
       *
266
0
       * Take this input:
267
0
       *
268
0
       * {"z":5}  {"1":1,"2":2,"4":4} [7,  10,   9]  [15,  11,   12, 13]  [154,  110,   112, 1311]
269
0
       *
270
0
       * Say you process the following window...
271
0
       *
272
0
       * '{"z":5}  {"1":1,"2":2,"4":4} [7,  10,   9]'
273
0
       *
274
0
       * When you do so, the json_iterator has a pointer at the beginning of the memory region
275
0
       * (pointing at the beginning of '{"z"...'.
276
0
       *
277
0
       * When you move to the window that starts at...
278
0
       *
279
0
       * '[7,  10,   9]  [15,  11,   12, 13] ...
280
0
       *
281
0
       * then it is not sufficient to just run stage 1. You also need to re-anchor the
282
0
       * json_iterator so that it believes we are starting at '[7,  10,   9]...'.
283
0
       *
284
0
       * Under the DOM front-end, this gets done automatically because the parser owns
285
0
       * the pointer the data, and when you call stage1 and then stage2 on the same
286
0
       * parser, then stage2 will run on the pointer acquired by stage1.
287
0
       *
288
0
       * That is, stage1 calls "this->buf = _buf" so the parser remembers the buffer that
289
0
       * we used. But json_iterator has no callback when stage1 is called on the parser.
290
0
       * In fact, I think that the parser is unaware of json_iterator.
291
0
       *
292
0
       *
293
0
       * So we need to re-anchor the json_iterator after each call to stage 1 so that
294
0
       * all of the pointers are in sync.
295
0
       */
296
0
      doc.iter = json_iterator(&buf[batch_start], parser);
297
0
      doc.iter._streaming = true;
298
0
      /**
299
0
       * End of resync.
300
0
       */
301
0
302
0
      if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
303
0
      doc_index = batch_start;
304
0
    }
305
0
  }
306
0
}
307
308
0
inline void document_stream::next_document() noexcept {
309
0
  // Go to next place where depth=0 (document depth)
310
0
  error = doc.iter.skip_child(0);
311
0
  if (error) { return; }
312
0
  // Always set depth=1 at the start of document
313
0
  doc.iter._depth = 1;
314
0
  // consume comma if comma separated is allowed
315
0
  if (allow_comma_separated) {
316
0
    error_code ignored = doc.iter.consume_character(',');
317
0
    static_cast<void>(ignored); // ignored on purpose
318
0
  }
319
0
  // Resets the string buffer at the beginning, thus invalidating the strings.
320
0
  doc.iter._string_buf_loc = parser->string_buf.get();
321
0
  doc.iter._root = doc.iter.position();
322
0
}
323
324
0
inline size_t document_stream::next_batch_start() const noexcept {
325
0
  return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
326
0
}
327
328
0
inline error_code document_stream::run_stage1(ondemand::parser &p, size_t _batch_start) noexcept {
329
0
  // This code only updates the structural index in the parser, it does not update any json_iterator
330
0
  // instance.
331
0
  size_t remaining = len - _batch_start;
332
0
  if (remaining <= batch_size) {
333
0
    return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
334
0
  } else {
335
0
    return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
336
0
  }
337
0
}
338
339
0
simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
340
0
  return stream->doc_index;
341
0
}
342
343
0
simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
344
0
  auto depth = stream->doc.iter.depth();
345
0
  auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
346
0
347
0
  // If at root, process the first token to determine if scalar value
348
0
  if (stream->doc.iter.at_root()) {
349
0
    switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
350
0
      case '{': case '[':   // Depth=1 already at start of document
351
0
        break;
352
0
      case '}': case ']':
353
0
        depth--;
354
0
        break;
355
0
      default:    // Scalar value document
356
0
        // TODO: We could remove trailing whitespaces
357
0
        // This returns a string spanning from start of value to the beginning of the next document (excluded)
358
0
        {
359
0
          auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
360
0
          // normally the length would be next_index - current_index() - 1, except for the last document
361
0
          size_t svlen = next_index - current_index();
362
0
          const char *start = reinterpret_cast<const char*>(stream->buf) + current_index();
363
0
          while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == '\0')) {
364
0
            svlen--;
365
0
          }
366
0
          return std::string_view(start, svlen);
367
0
        }
368
0
    }
369
0
    cur_struct_index++;
370
0
  }
371
0
372
0
  while (cur_struct_index <= static_cast<int64_t>(stream->parser->implementation->n_structural_indexes)) {
373
0
    switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
374
0
      case '{': case '[':
375
0
        depth++;
376
0
        break;
377
0
      case '}': case ']':
378
0
        depth--;
379
0
        break;
380
0
    }
381
0
    if (depth == 0) { break; }
382
0
    cur_struct_index++;
383
0
  }
384
0
385
0
  return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
386
0
}
387
388
0
inline error_code document_stream::iterator::error() const noexcept {
389
0
  return stream->error;
390
0
}
391
392
#ifdef SIMDJSON_THREADS_ENABLED
393
394
0
inline void document_stream::load_from_stage1_thread() noexcept {
395
0
  worker->finish();
396
0
  // Swap to the parser that was loaded up in the thread. Make sure the parser has
397
0
  // enough memory to swap to, as well.
398
0
  std::swap(stage1_thread_parser,*parser);
399
0
  error = stage1_thread_error;
400
0
  if (error) { return; }
401
0
402
0
  // If there's anything left, start the stage 1 thread!
403
0
  if (next_batch_start() < len) {
404
0
    start_stage1_thread();
405
0
  }
406
0
}
407
408
0
inline void document_stream::start_stage1_thread() noexcept {
409
0
  // we call the thread on a lambda that will update
410
0
  // this->stage1_thread_error
411
0
  // there is only one thread that may write to this value
412
0
  // TODO this is NOT exception-safe.
413
0
  this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
414
0
  size_t _next_batch_start = this->next_batch_start();
415
0
416
0
  worker->run(this, & this->stage1_thread_parser, _next_batch_start);
417
0
}
418
419
#endif // SIMDJSON_THREADS_ENABLED
420
421
} // namespace ondemand
422
} // namespace SIMDJSON_IMPLEMENTATION
423
} // namespace simdjson
424
425
namespace simdjson {
426
427
simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
428
  error_code error
429
) noexcept :
430
    implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
431
{
432
}
433
simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
434
  SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
435
) noexcept :
436
    implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
437
      std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
438
    )
439
{
440
}
441
442
}
443
444
#endif // SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H