Coverage Report

Created: 2025-07-01 07:09

/src/glib/gio/ginputstream.c
Line
Count
Source (jump to first uncovered line)
1
/* GIO - GLib Input, Output and Streaming Library
2
 * 
3
 * Copyright (C) 2006-2007 Red Hat, Inc.
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9
 *
10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU Lesser General
16
 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17
 *
18
 * Author: Alexander Larsson <alexl@redhat.com>
19
 */
20
21
#include "config.h"
22
#include <glib.h>
23
#include "glibintl.h"
24
25
#include "ginputstream.h"
26
#include "gioprivate.h"
27
#include "gseekable.h"
28
#include "gcancellable.h"
29
#include "gasyncresult.h"
30
#include "gioerror.h"
31
#include "gpollableinputstream.h"
32
33
/**
34
 * SECTION:ginputstream
35
 * @short_description: Base class for implementing streaming input
36
 * @include: gio/gio.h
37
 *
38
 * #GInputStream has functions to read from a stream (g_input_stream_read()),
39
 * to close a stream (g_input_stream_close()) and to skip some content
40
 * (g_input_stream_skip()). 
41
 *
42
 * To copy the content of an input stream to an output stream without 
43
 * manually handling the reads and writes, use g_output_stream_splice().
44
 *
45
 * See the documentation for #GIOStream for details of thread safety of
46
 * streaming APIs.
47
 *
48
 * All of these functions have async variants too.
49
 **/
50
51
struct _GInputStreamPrivate {
52
  guint closed : 1;
53
  guint pending : 1;
54
  GAsyncReadyCallback outstanding_callback;
55
};
56
57
G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58
59
static gssize   g_input_stream_real_skip         (GInputStream         *stream,
60
              gsize                 count,
61
              GCancellable         *cancellable,
62
              GError              **error);
63
static void     g_input_stream_real_read_async   (GInputStream         *stream,
64
              void                 *buffer,
65
              gsize                 count,
66
              int                   io_priority,
67
              GCancellable         *cancellable,
68
              GAsyncReadyCallback   callback,
69
              gpointer              user_data);
70
static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
71
              GAsyncResult         *result,
72
              GError              **error);
73
static void     g_input_stream_real_skip_async   (GInputStream         *stream,
74
              gsize                 count,
75
              int                   io_priority,
76
              GCancellable         *cancellable,
77
              GAsyncReadyCallback   callback,
78
              gpointer              data);
79
static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
80
              GAsyncResult         *result,
81
              GError              **error);
82
static void     g_input_stream_real_close_async  (GInputStream         *stream,
83
              int                   io_priority,
84
              GCancellable         *cancellable,
85
              GAsyncReadyCallback   callback,
86
              gpointer              data);
87
static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
88
              GAsyncResult         *result,
89
              GError              **error);
90
91
static void
92
g_input_stream_dispose (GObject *object)
93
1.72M
{
94
1.72M
  GInputStream *stream;
95
96
1.72M
  stream = G_INPUT_STREAM (object);
97
  
98
1.72M
  if (!stream->priv->closed)
99
1.72M
    g_input_stream_close (stream, NULL, NULL);
100
101
1.72M
  G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102
1.72M
}
103
104
105
static void
106
g_input_stream_class_init (GInputStreamClass *klass)
107
36
{
108
36
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109
  
110
36
  gobject_class->dispose = g_input_stream_dispose;
111
  
112
36
  klass->skip = g_input_stream_real_skip;
113
36
  klass->read_async = g_input_stream_real_read_async;
114
36
  klass->read_finish = g_input_stream_real_read_finish;
115
36
  klass->skip_async = g_input_stream_real_skip_async;
116
36
  klass->skip_finish = g_input_stream_real_skip_finish;
117
36
  klass->close_async = g_input_stream_real_close_async;
118
36
  klass->close_finish = g_input_stream_real_close_finish;
119
36
}
120
121
static void
122
g_input_stream_init (GInputStream *stream)
123
1.72M
{
124
1.72M
  stream->priv = g_input_stream_get_instance_private (stream);
125
1.72M
}
126
127
/**
128
 * g_input_stream_read:
129
 * @stream: a #GInputStream.
130
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131
 *     a buffer to read data into (which should be at least count bytes long).
132
 * @count: the number of bytes that will be read from the stream
133
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134
 * @error: location to store the error occurring, or %NULL to ignore
135
 *
136
 * Tries to read @count bytes from the stream into the buffer starting at
137
 * @buffer. Will block during this read.
138
 * 
139
 * If count is zero returns zero and does nothing. A value of @count
140
 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141
 *
142
 * On success, the number of bytes read into the buffer is returned.
143
 * It is not an error if this is not the same as the requested size, as it
144
 * can happen e.g. near the end of a file. Zero is returned on end of file
145
 * (or if @count is zero),  but never otherwise.
146
 *
147
 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148
 * at any position, and this function doesn't nul-terminate the @buffer.
149
 *
150
 * If @cancellable is not %NULL, then the operation can be cancelled by
151
 * triggering the cancellable object from another thread. If the operation
152
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153
 * operation was partially finished when the operation was cancelled the
154
 * partial result will be returned, without an error.
155
 *
156
 * On error -1 is returned and @error is set accordingly.
157
 * 
158
 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159
 **/
160
gssize
161
g_input_stream_read  (GInputStream  *stream,
162
          void          *buffer,
163
          gsize          count,
164
          GCancellable  *cancellable,
165
          GError       **error)
166
219M
{
167
219M
  GInputStreamClass *class;
168
219M
  gssize res;
169
170
219M
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171
219M
  g_return_val_if_fail (buffer != NULL, 0);
172
173
219M
  if (count == 0)
174
2.24M
    return 0;
175
  
176
217M
  if (((gssize) count) < 0)
177
0
    {
178
0
      g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179
0
       _("Too large count value passed to %s"), G_STRFUNC);
180
0
      return -1;
181
0
    }
182
183
217M
  class = G_INPUT_STREAM_GET_CLASS (stream);
184
185
217M
  if (class->read_fn == NULL) 
186
0
    {
187
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188
0
                           _("Input stream doesn’t implement read"));
189
0
      return -1;
190
0
    }
191
192
217M
  if (!g_input_stream_set_pending (stream, error))
193
0
    return -1;
194
195
217M
  if (cancellable)
196
0
    g_cancellable_push_current (cancellable);
197
  
198
217M
  res = class->read_fn (stream, buffer, count, cancellable, error);
199
200
217M
  if (cancellable)
201
0
    g_cancellable_pop_current (cancellable);
202
  
203
217M
  g_input_stream_clear_pending (stream);
204
205
217M
  return res;
206
217M
}
207
208
/**
209
 * g_input_stream_read_all:
210
 * @stream: a #GInputStream.
211
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212
 *     a buffer to read data into (which should be at least count bytes long).
213
 * @count: the number of bytes that will be read from the stream
214
 * @bytes_read: (out): location to store the number of bytes that was read from the stream
215
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216
 * @error: location to store the error occurring, or %NULL to ignore
217
 *
218
 * Tries to read @count bytes from the stream into the buffer starting at
219
 * @buffer. Will block during this read.
220
 *
221
 * This function is similar to g_input_stream_read(), except it tries to
222
 * read as many bytes as requested, only stopping on an error or end of stream.
223
 *
224
 * On a successful read of @count bytes, or if we reached the end of the
225
 * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
226
 * read into @buffer.
227
 * 
228
 * If there is an error during the operation %FALSE is returned and @error
229
 * is set to indicate the error status.
230
 *
231
 * As a special exception to the normal conventions for functions that
232
 * use #GError, if this function returns %FALSE (and sets @error) then
233
 * @bytes_read will be set to the number of bytes that were successfully
234
 * read before the error was encountered.  This functionality is only
235
 * available from C.  If you need it from another language then you must
236
 * write your own loop around g_input_stream_read().
237
 *
238
 * Returns: %TRUE on success, %FALSE if there was an error
239
 **/
240
gboolean
241
g_input_stream_read_all (GInputStream  *stream,
242
       void          *buffer,
243
       gsize          count,
244
       gsize         *bytes_read,
245
       GCancellable  *cancellable,
246
       GError       **error)
247
0
{
248
0
  gsize _bytes_read;
249
0
  gssize res;
250
251
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252
0
  g_return_val_if_fail (buffer != NULL, FALSE);
253
254
0
  _bytes_read = 0;
255
0
  while (_bytes_read < count)
256
0
    {
257
0
      res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258
0
         cancellable, error);
259
0
      if (res == -1)
260
0
  {
261
0
    if (bytes_read)
262
0
      *bytes_read = _bytes_read;
263
0
    return FALSE;
264
0
  }
265
      
266
0
      if (res == 0)
267
0
  break;
268
269
0
      _bytes_read += res;
270
0
    }
271
272
0
  if (bytes_read)
273
0
    *bytes_read = _bytes_read;
274
0
  return TRUE;
275
0
}
276
277
/**
278
 * g_input_stream_read_bytes:
279
 * @stream: a #GInputStream.
280
 * @count: maximum number of bytes that will be read from the stream. Common
281
 * values include 4096 and 8192.
282
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283
 * @error: location to store the error occurring, or %NULL to ignore
284
 *
285
 * Like g_input_stream_read(), this tries to read @count bytes from
286
 * the stream in a blocking fashion. However, rather than reading into
287
 * a user-supplied buffer, this will create a new #GBytes containing
288
 * the data that was read. This may be easier to use from language
289
 * bindings.
290
 *
291
 * If count is zero, returns a zero-length #GBytes and does nothing. A
292
 * value of @count larger than %G_MAXSSIZE will cause a
293
 * %G_IO_ERROR_INVALID_ARGUMENT error.
294
 *
295
 * On success, a new #GBytes is returned. It is not an error if the
296
 * size of this object is not the same as the requested size, as it
297
 * can happen e.g. near the end of a file. A zero-length #GBytes is
298
 * returned on end of file (or if @count is zero), but never
299
 * otherwise.
300
 *
301
 * If @cancellable is not %NULL, then the operation can be cancelled by
302
 * triggering the cancellable object from another thread. If the operation
303
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304
 * operation was partially finished when the operation was cancelled the
305
 * partial result will be returned, without an error.
306
 *
307
 * On error %NULL is returned and @error is set accordingly.
308
 *
309
 * Returns: (transfer full): a new #GBytes, or %NULL on error
310
 *
311
 * Since: 2.34
312
 **/
313
GBytes *
314
g_input_stream_read_bytes (GInputStream  *stream,
315
         gsize          count,
316
         GCancellable  *cancellable,
317
         GError       **error)
318
0
{
319
0
  guchar *buf;
320
0
  gssize nread;
321
322
0
  buf = g_malloc (count);
323
0
  nread = g_input_stream_read (stream, buf, count, cancellable, error);
324
0
  if (nread == -1)
325
0
    {
326
0
      g_free (buf);
327
0
      return NULL;
328
0
    }
329
0
  else if (nread == 0)
330
0
    {
331
0
      g_free (buf);
332
0
      return g_bytes_new_static ("", 0);
333
0
    }
334
0
  else
335
0
    return g_bytes_new_take (buf, nread);
336
0
}
337
338
/**
339
 * g_input_stream_skip:
340
 * @stream: a #GInputStream.
341
 * @count: the number of bytes that will be skipped from the stream
342
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
343
 * @error: location to store the error occurring, or %NULL to ignore
344
 *
345
 * Tries to skip @count bytes from the stream. Will block during the operation.
346
 *
347
 * This is identical to g_input_stream_read(), from a behaviour standpoint,
348
 * but the bytes that are skipped are not returned to the user. Some
349
 * streams have an implementation that is more efficient than reading the data.
350
 *
351
 * This function is optional for inherited classes, as the default implementation
352
 * emulates it using read.
353
 *
354
 * If @cancellable is not %NULL, then the operation can be cancelled by
355
 * triggering the cancellable object from another thread. If the operation
356
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357
 * operation was partially finished when the operation was cancelled the
358
 * partial result will be returned, without an error.
359
 *
360
 * Returns: Number of bytes skipped, or -1 on error
361
 **/
362
gssize
363
g_input_stream_skip (GInputStream  *stream,
364
         gsize          count,
365
         GCancellable  *cancellable,
366
         GError       **error)
367
0
{
368
0
  GInputStreamClass *class;
369
0
  gssize res;
370
371
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372
373
0
  if (count == 0)
374
0
    return 0;
375
376
0
  if (((gssize) count) < 0)
377
0
    {
378
0
      g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379
0
       _("Too large count value passed to %s"), G_STRFUNC);
380
0
      return -1;
381
0
    }
382
  
383
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
384
385
0
  if (!g_input_stream_set_pending (stream, error))
386
0
    return -1;
387
388
0
  if (cancellable)
389
0
    g_cancellable_push_current (cancellable);
390
  
391
0
  res = class->skip (stream, count, cancellable, error);
392
393
0
  if (cancellable)
394
0
    g_cancellable_pop_current (cancellable);
395
  
396
0
  g_input_stream_clear_pending (stream);
397
398
0
  return res;
399
0
}
400
401
static gssize
402
g_input_stream_real_skip (GInputStream  *stream,
403
        gsize          count,
404
        GCancellable  *cancellable,
405
        GError       **error)
406
0
{
407
0
  GInputStreamClass *class;
408
0
  gssize ret, read_bytes;
409
0
  char buffer[8192];
410
0
  GError *my_error;
411
412
0
  if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413
0
    {
414
0
      GSeekable *seekable = G_SEEKABLE (stream);
415
0
      goffset start, end;
416
0
      gboolean success;
417
418
      /* g_seekable_seek() may try to set pending itself */
419
0
      stream->priv->pending = FALSE;
420
421
0
      start = g_seekable_tell (seekable);
422
423
0
      if (g_seekable_seek (G_SEEKABLE (stream),
424
0
                           0,
425
0
                           G_SEEK_END,
426
0
                           cancellable,
427
0
                           NULL))
428
0
        {
429
0
          end = g_seekable_tell (seekable);
430
0
          g_assert (end >= start);
431
0
          if (start > G_MAXSIZE - count || start + count > end)
432
0
            {
433
0
              stream->priv->pending = TRUE;
434
0
              return end - start;
435
0
            }
436
437
0
          success = g_seekable_seek (G_SEEKABLE (stream),
438
0
                                     start + count,
439
0
                                     G_SEEK_SET,
440
0
                                     cancellable,
441
0
                                     error);
442
0
          stream->priv->pending = TRUE;
443
444
0
          if (success)
445
0
            return count;
446
0
          else
447
0
            return -1;
448
0
        }
449
0
    }
450
451
  /* If not seekable, or seek failed, fall back to reading data: */
452
453
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
454
455
0
  read_bytes = 0;
456
0
  while (1)
457
0
    {
458
0
      my_error = NULL;
459
460
0
      ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
461
0
                            cancellable, &my_error);
462
0
      if (ret == -1)
463
0
  {
464
0
    if (read_bytes > 0 &&
465
0
        my_error->domain == G_IO_ERROR &&
466
0
        my_error->code == G_IO_ERROR_CANCELLED)
467
0
      {
468
0
        g_error_free (my_error);
469
0
        return read_bytes;
470
0
      }
471
472
0
    g_propagate_error (error, my_error);
473
0
    return -1;
474
0
  }
475
476
0
      count -= ret;
477
0
      read_bytes += ret;
478
479
0
      if (ret == 0 || count == 0)
480
0
        return read_bytes;
481
0
    }
482
0
}
483
484
/**
485
 * g_input_stream_close:
486
 * @stream: A #GInputStream.
487
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
488
 * @error: location to store the error occurring, or %NULL to ignore
489
 *
490
 * Closes the stream, releasing resources related to it.
491
 *
492
 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
493
 * Closing a stream multiple times will not return an error.
494
 *
495
 * Streams will be automatically closed when the last reference
496
 * is dropped, but you might want to call this function to make sure 
497
 * resources are released as early as possible.
498
 *
499
 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
500
 * open after the stream is closed. See the documentation for the individual
501
 * stream for details.
502
 *
503
 * On failure the first error that happened will be reported, but the close
504
 * operation will finish as much as possible. A stream that failed to
505
 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
506
 * is important to check and report the error to the user.
507
 *
508
 * If @cancellable is not %NULL, then the operation can be cancelled by
509
 * triggering the cancellable object from another thread. If the operation
510
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
511
 * Cancelling a close will still leave the stream closed, but some streams
512
 * can use a faster close that doesn't block to e.g. check errors. 
513
 *
514
 * Returns: %TRUE on success, %FALSE on failure
515
 **/
516
gboolean
517
g_input_stream_close (GInputStream  *stream,
518
          GCancellable  *cancellable,
519
          GError       **error)
520
1.72M
{
521
1.72M
  GInputStreamClass *class;
522
1.72M
  gboolean res;
523
524
1.72M
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
525
526
1.72M
  class = G_INPUT_STREAM_GET_CLASS (stream);
527
528
1.72M
  if (stream->priv->closed)
529
0
    return TRUE;
530
531
1.72M
  res = TRUE;
532
533
1.72M
  if (!g_input_stream_set_pending (stream, error))
534
0
    return FALSE;
535
536
1.72M
  if (cancellable)
537
0
    g_cancellable_push_current (cancellable);
538
539
1.72M
  if (class->close_fn)
540
263k
    res = class->close_fn (stream, cancellable, error);
541
542
1.72M
  if (cancellable)
543
0
    g_cancellable_pop_current (cancellable);
544
545
1.72M
  g_input_stream_clear_pending (stream);
546
  
547
1.72M
  stream->priv->closed = TRUE;
548
  
549
1.72M
  return res;
550
1.72M
}
551
552
static void
553
async_ready_callback_wrapper (GObject      *source_object,
554
            GAsyncResult *res,
555
            gpointer      user_data)
556
0
{
557
0
  GInputStream *stream = G_INPUT_STREAM (source_object);
558
559
0
  g_input_stream_clear_pending (stream);
560
0
  if (stream->priv->outstanding_callback)
561
0
    (*stream->priv->outstanding_callback) (source_object, res, user_data);
562
0
  g_object_unref (stream);
563
0
}
564
565
static void
566
async_ready_close_callback_wrapper (GObject      *source_object,
567
            GAsyncResult *res,
568
            gpointer      user_data)
569
0
{
570
0
  GInputStream *stream = G_INPUT_STREAM (source_object);
571
572
0
  g_input_stream_clear_pending (stream);
573
0
  stream->priv->closed = TRUE;
574
0
  if (stream->priv->outstanding_callback)
575
0
    (*stream->priv->outstanding_callback) (source_object, res, user_data);
576
0
  g_object_unref (stream);
577
0
}
578
579
/**
580
 * g_input_stream_read_async:
581
 * @stream: A #GInputStream.
582
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
583
 *     a buffer to read data into (which should be at least count bytes long).
584
 * @count: the number of bytes that will be read from the stream
585
 * @io_priority: the [I/O priority][io-priority]
586
 * of the request. 
587
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
588
 * @callback: (scope async): callback to call when the request is satisfied
589
 * @user_data: (closure): the data to pass to callback function
590
 *
591
 * Request an asynchronous read of @count bytes from the stream into the buffer
592
 * starting at @buffer. When the operation is finished @callback will be called. 
593
 * You can then call g_input_stream_read_finish() to get the result of the 
594
 * operation.
595
 *
596
 * During an async request no other sync and async calls are allowed on @stream, and will
597
 * result in %G_IO_ERROR_PENDING errors. 
598
 *
599
 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
600
 *
601
 * On success, the number of bytes read into the buffer will be passed to the
602
 * callback. It is not an error if this is not the same as the requested size, as it
603
 * can happen e.g. near the end of a file, but generally we try to read
604
 * as many bytes as requested. Zero is returned on end of file
605
 * (or if @count is zero),  but never otherwise.
606
 *
607
 * Any outstanding i/o request with higher priority (lower numerical value) will
608
 * be executed before an outstanding request with lower priority. Default
609
 * priority is %G_PRIORITY_DEFAULT.
610
 *
611
 * The asynchronous methods have a default fallback that uses threads to implement
612
 * asynchronicity, so they are optional for inheriting classes. However, if you
613
 * override one you must override all.
614
 **/
615
void
616
g_input_stream_read_async (GInputStream        *stream,
617
         void                *buffer,
618
         gsize                count,
619
         int                  io_priority,
620
         GCancellable        *cancellable,
621
         GAsyncReadyCallback  callback,
622
         gpointer             user_data)
623
0
{
624
0
  GInputStreamClass *class;
625
0
  GError *error = NULL;
626
627
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
628
0
  g_return_if_fail (buffer != NULL);
629
630
0
  if (count == 0)
631
0
    {
632
0
      GTask *task;
633
634
0
      task = g_task_new (stream, cancellable, callback, user_data);
635
0
      g_task_set_source_tag (task, g_input_stream_read_async);
636
0
      g_task_return_int (task, 0);
637
0
      g_object_unref (task);
638
0
      return;
639
0
    }
640
  
641
0
  if (((gssize) count) < 0)
642
0
    {
643
0
      g_task_report_new_error (stream, callback, user_data,
644
0
                               g_input_stream_read_async,
645
0
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
646
0
                               _("Too large count value passed to %s"),
647
0
                               G_STRFUNC);
648
0
      return;
649
0
    }
650
651
0
  if (!g_input_stream_set_pending (stream, &error))
652
0
    {
653
0
      g_task_report_error (stream, callback, user_data,
654
0
                           g_input_stream_read_async,
655
0
                           error);
656
0
      return;
657
0
    }
658
659
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
660
0
  stream->priv->outstanding_callback = callback;
661
0
  g_object_ref (stream);
662
0
  class->read_async (stream, buffer, count, io_priority, cancellable,
663
0
         async_ready_callback_wrapper, user_data);
664
0
}
665
666
/**
667
 * g_input_stream_read_finish:
668
 * @stream: a #GInputStream.
669
 * @result: a #GAsyncResult.
670
 * @error: a #GError location to store the error occurring, or %NULL to 
671
 * ignore.
672
 * 
673
 * Finishes an asynchronous stream read operation. 
674
 * 
675
 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
676
 **/
677
gssize
678
g_input_stream_read_finish (GInputStream  *stream,
679
          GAsyncResult  *result,
680
          GError       **error)
681
0
{
682
0
  GInputStreamClass *class;
683
  
684
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
685
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
686
687
0
  if (g_async_result_legacy_propagate_error (result, error))
688
0
    return -1;
689
0
  else if (g_async_result_is_tagged (result, g_input_stream_read_async))
690
0
    return g_task_propagate_int (G_TASK (result), error);
691
692
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
693
0
  return class->read_finish (stream, result, error);
694
0
}
695
696
typedef struct
697
{
698
  gchar *buffer;
699
  gsize to_read;
700
  gsize bytes_read;
701
} AsyncReadAll;
702
703
static void
704
free_async_read_all (gpointer data)
705
0
{
706
0
  g_slice_free (AsyncReadAll, data);
707
0
}
708
709
static void
710
read_all_callback (GObject      *stream,
711
                   GAsyncResult *result,
712
                   gpointer      user_data)
713
0
{
714
0
  GTask *task = user_data;
715
0
  AsyncReadAll *data = g_task_get_task_data (task);
716
0
  gboolean got_eof = FALSE;
717
718
0
  if (result)
719
0
    {
720
0
      GError *error = NULL;
721
0
      gssize nread;
722
723
0
      nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
724
725
0
      if (nread == -1)
726
0
        {
727
0
          g_task_return_error (task, error);
728
0
          g_object_unref (task);
729
0
          return;
730
0
        }
731
732
0
      g_assert_cmpint (nread, <=, data->to_read);
733
0
      data->to_read -= nread;
734
0
      data->bytes_read += nread;
735
0
      got_eof = (nread == 0);
736
0
    }
737
738
0
  if (got_eof || data->to_read == 0)
739
0
    {
740
0
      g_task_return_boolean (task, TRUE);
741
0
      g_object_unref (task);
742
0
    }
743
744
0
  else
745
0
    g_input_stream_read_async (G_INPUT_STREAM (stream),
746
0
                               data->buffer + data->bytes_read,
747
0
                               data->to_read,
748
0
                               g_task_get_priority (task),
749
0
                               g_task_get_cancellable (task),
750
0
                               read_all_callback, task);
751
0
}
752
753
754
static void
755
read_all_async_thread (GTask        *task,
756
                       gpointer      source_object,
757
                       gpointer      task_data,
758
                       GCancellable *cancellable)
759
0
{
760
0
  GInputStream *stream = source_object;
761
0
  AsyncReadAll *data = task_data;
762
0
  GError *error = NULL;
763
764
0
  if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
765
0
                               g_task_get_cancellable (task), &error))
766
0
    g_task_return_boolean (task, TRUE);
767
0
  else
768
0
    g_task_return_error (task, error);
769
0
}
770
771
/**
772
 * g_input_stream_read_all_async:
773
 * @stream: A #GInputStream
774
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
775
 *     a buffer to read data into (which should be at least count bytes long)
776
 * @count: the number of bytes that will be read from the stream
777
 * @io_priority: the [I/O priority][io-priority] of the request
778
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
779
 * @callback: (scope async): callback to call when the request is satisfied
780
 * @user_data: (closure): the data to pass to callback function
781
 *
782
 * Request an asynchronous read of @count bytes from the stream into the
783
 * buffer starting at @buffer.
784
 *
785
 * This is the asynchronous equivalent of g_input_stream_read_all().
786
 *
787
 * Call g_input_stream_read_all_finish() to collect the result.
788
 *
789
 * Any outstanding I/O request with higher priority (lower numerical
790
 * value) will be executed before an outstanding request with lower
791
 * priority. Default priority is %G_PRIORITY_DEFAULT.
792
 *
793
 * Since: 2.44
794
 **/
795
void
796
g_input_stream_read_all_async (GInputStream        *stream,
797
                               void                *buffer,
798
                               gsize                count,
799
                               int                  io_priority,
800
                               GCancellable        *cancellable,
801
                               GAsyncReadyCallback  callback,
802
                               gpointer             user_data)
803
0
{
804
0
  AsyncReadAll *data;
805
0
  GTask *task;
806
807
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
808
0
  g_return_if_fail (buffer != NULL || count == 0);
809
810
0
  task = g_task_new (stream, cancellable, callback, user_data);
811
0
  data = g_slice_new0 (AsyncReadAll);
812
0
  data->buffer = buffer;
813
0
  data->to_read = count;
814
815
0
  g_task_set_source_tag (task, g_input_stream_read_all_async);
816
0
  g_task_set_task_data (task, data, free_async_read_all);
817
0
  g_task_set_priority (task, io_priority);
818
819
  /* If async reads are going to be handled via the threadpool anyway
820
   * then we may as well do it with a single dispatch instead of
821
   * bouncing in and out.
822
   */
823
0
  if (g_input_stream_async_read_is_via_threads (stream))
824
0
    {
825
0
      g_task_run_in_thread (task, read_all_async_thread);
826
0
      g_object_unref (task);
827
0
    }
828
0
  else
829
0
    read_all_callback (G_OBJECT (stream), NULL, task);
830
0
}
831
832
/**
833
 * g_input_stream_read_all_finish:
834
 * @stream: a #GInputStream
835
 * @result: a #GAsyncResult
836
 * @bytes_read: (out): location to store the number of bytes that was read from the stream
837
 * @error: a #GError location to store the error occurring, or %NULL to ignore
838
 *
839
 * Finishes an asynchronous stream read operation started with
840
 * g_input_stream_read_all_async().
841
 *
842
 * As a special exception to the normal conventions for functions that
843
 * use #GError, if this function returns %FALSE (and sets @error) then
844
 * @bytes_read will be set to the number of bytes that were successfully
845
 * read before the error was encountered.  This functionality is only
846
 * available from C.  If you need it from another language then you must
847
 * write your own loop around g_input_stream_read_async().
848
 *
849
 * Returns: %TRUE on success, %FALSE if there was an error
850
 *
851
 * Since: 2.44
852
 **/
853
gboolean
854
g_input_stream_read_all_finish (GInputStream  *stream,
855
                                GAsyncResult  *result,
856
                                gsize         *bytes_read,
857
                                GError       **error)
858
0
{
859
0
  GTask *task;
860
861
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
862
0
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
863
864
0
  task = G_TASK (result);
865
866
0
  if (bytes_read)
867
0
    {
868
0
      AsyncReadAll *data = g_task_get_task_data (task);
869
870
0
      *bytes_read = data->bytes_read;
871
0
    }
872
873
0
  return g_task_propagate_boolean (task, error);
874
0
}
875
876
static void
877
read_bytes_callback (GObject      *stream,
878
         GAsyncResult *result,
879
         gpointer      user_data)
880
0
{
881
0
  GTask *task = user_data;
882
0
  guchar *buf = g_task_get_task_data (task);
883
0
  GError *error = NULL;
884
0
  gssize nread;
885
0
  GBytes *bytes = NULL;
886
887
0
  nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
888
0
              result, &error);
889
0
  if (nread == -1)
890
0
    {
891
0
      g_free (buf);
892
0
      g_task_return_error (task, error);
893
0
    }
894
0
  else if (nread == 0)
895
0
    {
896
0
      g_free (buf);
897
0
      bytes = g_bytes_new_static ("", 0);
898
0
    }
899
0
  else
900
0
    bytes = g_bytes_new_take (buf, nread);
901
902
0
  if (bytes)
903
0
    g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
904
905
0
  g_object_unref (task);
906
0
}
907
908
/**
909
 * g_input_stream_read_bytes_async:
910
 * @stream: A #GInputStream.
911
 * @count: the number of bytes that will be read from the stream
912
 * @io_priority: the [I/O priority][io-priority] of the request
913
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
914
 * @callback: (scope async): callback to call when the request is satisfied
915
 * @user_data: (closure): the data to pass to callback function
916
 *
917
 * Request an asynchronous read of @count bytes from the stream into a
918
 * new #GBytes. When the operation is finished @callback will be
919
 * called. You can then call g_input_stream_read_bytes_finish() to get the
920
 * result of the operation.
921
 *
922
 * During an async request no other sync and async calls are allowed
923
 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
924
 *
925
 * A value of @count larger than %G_MAXSSIZE will cause a
926
 * %G_IO_ERROR_INVALID_ARGUMENT error.
927
 *
928
 * On success, the new #GBytes will be passed to the callback. It is
929
 * not an error if this is smaller than the requested size, as it can
930
 * happen e.g. near the end of a file, but generally we try to read as
931
 * many bytes as requested. Zero is returned on end of file (or if
932
 * @count is zero), but never otherwise.
933
 *
934
 * Any outstanding I/O request with higher priority (lower numerical
935
 * value) will be executed before an outstanding request with lower
936
 * priority. Default priority is %G_PRIORITY_DEFAULT.
937
 *
938
 * Since: 2.34
939
 **/
940
void
941
g_input_stream_read_bytes_async (GInputStream          *stream,
942
         gsize                  count,
943
         int                    io_priority,
944
         GCancellable          *cancellable,
945
         GAsyncReadyCallback    callback,
946
         gpointer               user_data)
947
0
{
948
0
  GTask *task;
949
0
  guchar *buf;
950
951
0
  task = g_task_new (stream, cancellable, callback, user_data);
952
0
  g_task_set_source_tag (task, g_input_stream_read_bytes_async);
953
954
0
  buf = g_malloc (count);
955
0
  g_task_set_task_data (task, buf, NULL);
956
957
0
  g_input_stream_read_async (stream, buf, count,
958
0
                             io_priority, cancellable,
959
0
                             read_bytes_callback, task);
960
0
}
961
962
/**
963
 * g_input_stream_read_bytes_finish:
964
 * @stream: a #GInputStream.
965
 * @result: a #GAsyncResult.
966
 * @error: a #GError location to store the error occurring, or %NULL to
967
 *   ignore.
968
 *
969
 * Finishes an asynchronous stream read-into-#GBytes operation.
970
 *
971
 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
972
 *
973
 * Since: 2.34
974
 **/
975
GBytes *
976
g_input_stream_read_bytes_finish (GInputStream  *stream,
977
          GAsyncResult  *result,
978
          GError       **error)
979
0
{
980
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
981
0
  g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
982
983
0
  return g_task_propagate_pointer (G_TASK (result), error);
984
0
}
985
986
/**
987
 * g_input_stream_skip_async:
988
 * @stream: A #GInputStream.
989
 * @count: the number of bytes that will be skipped from the stream
990
 * @io_priority: the [I/O priority][io-priority] of the request
991
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
992
 * @callback: (scope async): callback to call when the request is satisfied
993
 * @user_data: (closure): the data to pass to callback function
994
 *
995
 * Request an asynchronous skip of @count bytes from the stream.
996
 * When the operation is finished @callback will be called.
997
 * You can then call g_input_stream_skip_finish() to get the result
998
 * of the operation.
999
 *
1000
 * During an async request no other sync and async calls are allowed,
1001
 * and will result in %G_IO_ERROR_PENDING errors.
1002
 *
1003
 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1004
 *
1005
 * On success, the number of bytes skipped will be passed to the callback.
1006
 * It is not an error if this is not the same as the requested size, as it
1007
 * can happen e.g. near the end of a file, but generally we try to skip
1008
 * as many bytes as requested. Zero is returned on end of file
1009
 * (or if @count is zero), but never otherwise.
1010
 *
1011
 * Any outstanding i/o request with higher priority (lower numerical value)
1012
 * will be executed before an outstanding request with lower priority.
1013
 * Default priority is %G_PRIORITY_DEFAULT.
1014
 *
1015
 * The asynchronous methods have a default fallback that uses threads to
1016
 * implement asynchronicity, so they are optional for inheriting classes.
1017
 * However, if you override one, you must override all.
1018
 **/
1019
void
1020
g_input_stream_skip_async (GInputStream        *stream,
1021
         gsize                count,
1022
         int                  io_priority,
1023
         GCancellable        *cancellable,
1024
         GAsyncReadyCallback  callback,
1025
         gpointer             user_data)
1026
0
{
1027
0
  GInputStreamClass *class;
1028
0
  GError *error = NULL;
1029
1030
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1031
1032
0
  if (count == 0)
1033
0
    {
1034
0
      GTask *task;
1035
1036
0
      task = g_task_new (stream, cancellable, callback, user_data);
1037
0
      g_task_set_source_tag (task, g_input_stream_skip_async);
1038
0
      g_task_return_int (task, 0);
1039
0
      g_object_unref (task);
1040
0
      return;
1041
0
    }
1042
  
1043
0
  if (((gssize) count) < 0)
1044
0
    {
1045
0
      g_task_report_new_error (stream, callback, user_data,
1046
0
                               g_input_stream_skip_async,
1047
0
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1048
0
                               _("Too large count value passed to %s"),
1049
0
                               G_STRFUNC);
1050
0
      return;
1051
0
    }
1052
1053
0
  if (!g_input_stream_set_pending (stream, &error))
1054
0
    {
1055
0
      g_task_report_error (stream, callback, user_data,
1056
0
                           g_input_stream_skip_async,
1057
0
                           error);
1058
0
      return;
1059
0
    }
1060
1061
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1062
0
  stream->priv->outstanding_callback = callback;
1063
0
  g_object_ref (stream);
1064
0
  class->skip_async (stream, count, io_priority, cancellable,
1065
0
         async_ready_callback_wrapper, user_data);
1066
0
}
1067
1068
/**
1069
 * g_input_stream_skip_finish:
1070
 * @stream: a #GInputStream.
1071
 * @result: a #GAsyncResult.
1072
 * @error: a #GError location to store the error occurring, or %NULL to 
1073
 * ignore.
1074
 * 
1075
 * Finishes a stream skip operation.
1076
 * 
1077
 * Returns: the size of the bytes skipped, or `-1` on error.
1078
 **/
1079
gssize
1080
g_input_stream_skip_finish (GInputStream  *stream,
1081
          GAsyncResult  *result,
1082
          GError       **error)
1083
0
{
1084
0
  GInputStreamClass *class;
1085
1086
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1087
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1088
1089
0
  if (g_async_result_legacy_propagate_error (result, error))
1090
0
    return -1;
1091
0
  else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1092
0
    return g_task_propagate_int (G_TASK (result), error);
1093
1094
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1095
0
  return class->skip_finish (stream, result, error);
1096
0
}
1097
1098
/**
1099
 * g_input_stream_close_async:
1100
 * @stream: A #GInputStream.
1101
 * @io_priority: the [I/O priority][io-priority] of the request
1102
 * @cancellable: (nullable): optional cancellable object
1103
 * @callback: (scope async): callback to call when the request is satisfied
1104
 * @user_data: (closure): the data to pass to callback function
1105
 *
1106
 * Requests an asynchronous closes of the stream, releasing resources related to it.
1107
 * When the operation is finished @callback will be called. 
1108
 * You can then call g_input_stream_close_finish() to get the result of the 
1109
 * operation.
1110
 *
1111
 * For behaviour details see g_input_stream_close().
1112
 *
1113
 * The asynchronous methods have a default fallback that uses threads to implement
1114
 * asynchronicity, so they are optional for inheriting classes. However, if you
1115
 * override one you must override all.
1116
 **/
1117
void
1118
g_input_stream_close_async (GInputStream        *stream,
1119
          int                  io_priority,
1120
          GCancellable        *cancellable,
1121
          GAsyncReadyCallback  callback,
1122
          gpointer             user_data)
1123
0
{
1124
0
  GInputStreamClass *class;
1125
0
  GError *error = NULL;
1126
1127
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1128
1129
0
  if (stream->priv->closed)
1130
0
    {
1131
0
      GTask *task;
1132
1133
0
      task = g_task_new (stream, cancellable, callback, user_data);
1134
0
      g_task_set_source_tag (task, g_input_stream_close_async);
1135
0
      g_task_return_boolean (task, TRUE);
1136
0
      g_object_unref (task);
1137
0
      return;
1138
0
    }
1139
1140
0
  if (!g_input_stream_set_pending (stream, &error))
1141
0
    {
1142
0
      g_task_report_error (stream, callback, user_data,
1143
0
                           g_input_stream_close_async,
1144
0
                           error);
1145
0
      return;
1146
0
    }
1147
  
1148
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1149
0
  stream->priv->outstanding_callback = callback;
1150
0
  g_object_ref (stream);
1151
0
  class->close_async (stream, io_priority, cancellable,
1152
0
          async_ready_close_callback_wrapper, user_data);
1153
0
}
1154
1155
/**
1156
 * g_input_stream_close_finish:
1157
 * @stream: a #GInputStream.
1158
 * @result: a #GAsyncResult.
1159
 * @error: a #GError location to store the error occurring, or %NULL to 
1160
 * ignore.
1161
 * 
1162
 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1163
 * 
1164
 * Returns: %TRUE if the stream was closed successfully.
1165
 **/
1166
gboolean
1167
g_input_stream_close_finish (GInputStream  *stream,
1168
           GAsyncResult  *result,
1169
           GError       **error)
1170
0
{
1171
0
  GInputStreamClass *class;
1172
1173
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1174
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1175
1176
0
  if (g_async_result_legacy_propagate_error (result, error))
1177
0
    return FALSE;
1178
0
  else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1179
0
    return g_task_propagate_boolean (G_TASK (result), error);
1180
1181
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1182
0
  return class->close_finish (stream, result, error);
1183
0
}
1184
1185
/**
1186
 * g_input_stream_is_closed:
1187
 * @stream: input stream.
1188
 * 
1189
 * Checks if an input stream is closed.
1190
 * 
1191
 * Returns: %TRUE if the stream is closed.
1192
 **/
1193
gboolean
1194
g_input_stream_is_closed (GInputStream *stream)
1195
0
{
1196
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1197
  
1198
0
  return stream->priv->closed;
1199
0
}
1200
 
1201
/**
1202
 * g_input_stream_has_pending:
1203
 * @stream: input stream.
1204
 * 
1205
 * Checks if an input stream has pending actions.
1206
 * 
1207
 * Returns: %TRUE if @stream has pending actions.
1208
 **/  
1209
gboolean
1210
g_input_stream_has_pending (GInputStream *stream)
1211
0
{
1212
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1213
  
1214
0
  return stream->priv->pending;
1215
0
}
1216
1217
/**
1218
 * g_input_stream_set_pending:
1219
 * @stream: input stream
1220
 * @error: a #GError location to store the error occurring, or %NULL to 
1221
 * ignore.
1222
 * 
1223
 * Sets @stream to have actions pending. If the pending flag is
1224
 * already set or @stream is closed, it will return %FALSE and set
1225
 * @error.
1226
 *
1227
 * Returns: %TRUE if pending was previously unset and is now set.
1228
 **/
1229
gboolean
1230
g_input_stream_set_pending (GInputStream *stream, GError **error)
1231
219M
{
1232
219M
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1233
  
1234
219M
  if (stream->priv->closed)
1235
0
    {
1236
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1237
0
                           _("Stream is already closed"));
1238
0
      return FALSE;
1239
0
    }
1240
  
1241
219M
  if (stream->priv->pending)
1242
0
    {
1243
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1244
    /* Translators: This is an error you get if there is already an
1245
     * operation running against this stream when you try to start
1246
     * one */
1247
0
     _("Stream has outstanding operation"));
1248
0
      return FALSE;
1249
0
    }
1250
  
1251
219M
  stream->priv->pending = TRUE;
1252
219M
  return TRUE;
1253
219M
}
1254
1255
/**
1256
 * g_input_stream_clear_pending:
1257
 * @stream: input stream
1258
 * 
1259
 * Clears the pending flag on @stream.
1260
 **/
1261
void
1262
g_input_stream_clear_pending (GInputStream *stream)
1263
219M
{
1264
219M
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1265
  
1266
219M
  stream->priv->pending = FALSE;
1267
219M
}
1268
1269
/*< internal >
1270
 * g_input_stream_async_read_is_via_threads:
1271
 * @stream: input stream
1272
 *
1273
 * Checks if an input stream's read_async function uses threads.
1274
 *
1275
 * Returns: %TRUE if @stream's read_async function uses threads.
1276
 **/
1277
gboolean
1278
g_input_stream_async_read_is_via_threads (GInputStream *stream)
1279
0
{
1280
0
  GInputStreamClass *class;
1281
1282
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1283
1284
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1285
1286
0
  return (class->read_async == g_input_stream_real_read_async &&
1287
0
      !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1288
0
        g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1289
0
}
1290
1291
/*< internal >
1292
 * g_input_stream_async_close_is_via_threads:
1293
 * @stream: input stream
1294
 *
1295
 * Checks if an input stream's close_async function uses threads.
1296
 *
1297
 * Returns: %TRUE if @stream's close_async function uses threads.
1298
 **/
1299
gboolean
1300
g_input_stream_async_close_is_via_threads (GInputStream *stream)
1301
0
{
1302
0
  GInputStreamClass *class;
1303
1304
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1305
1306
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1307
1308
0
  return class->close_async == g_input_stream_real_close_async;
1309
0
}
1310
1311
/********************************************
1312
 *   Default implementation of async ops    *
1313
 ********************************************/
1314
1315
typedef struct {
1316
  void   *buffer;
1317
  gsize   count;
1318
} ReadData;
1319
1320
static void
1321
free_read_data (ReadData *op)
1322
0
{
1323
0
  g_slice_free (ReadData, op);
1324
0
}
1325
1326
static void
1327
read_async_thread (GTask        *task,
1328
                   gpointer      source_object,
1329
                   gpointer      task_data,
1330
                   GCancellable *cancellable)
1331
0
{
1332
0
  GInputStream *stream = source_object;
1333
0
  ReadData *op = task_data;
1334
0
  GInputStreamClass *class;
1335
0
  GError *error = NULL;
1336
0
  gssize nread;
1337
 
1338
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1339
1340
0
  nread = class->read_fn (stream,
1341
0
                          op->buffer, op->count,
1342
0
                          g_task_get_cancellable (task),
1343
0
                          &error);
1344
0
  if (nread == -1)
1345
0
    g_task_return_error (task, error);
1346
0
  else
1347
0
    g_task_return_int (task, nread);
1348
0
}
1349
1350
static void read_async_pollable (GPollableInputStream *stream,
1351
                                 GTask                *task);
1352
1353
static gboolean
1354
read_async_pollable_ready (GPollableInputStream *stream,
1355
         gpointer              user_data)
1356
0
{
1357
0
  GTask *task = user_data;
1358
1359
0
  read_async_pollable (stream, task);
1360
0
  return FALSE;
1361
0
}
1362
1363
static void
1364
read_async_pollable (GPollableInputStream *stream,
1365
                     GTask                *task)
1366
0
{
1367
0
  ReadData *op = g_task_get_task_data (task);
1368
0
  GError *error = NULL;
1369
0
  gssize nread;
1370
1371
0
  if (g_task_return_error_if_cancelled (task))
1372
0
    return;
1373
1374
0
  nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1375
0
    read_nonblocking (stream, op->buffer, op->count, &error);
1376
1377
0
  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1378
0
    {
1379
0
      GSource *source;
1380
1381
0
      g_error_free (error);
1382
1383
0
      source = g_pollable_input_stream_create_source (stream,
1384
0
                                                      g_task_get_cancellable (task));
1385
0
      g_task_attach_source (task, source,
1386
0
                            (GSourceFunc) read_async_pollable_ready);
1387
0
      g_source_unref (source);
1388
0
      return;
1389
0
    }
1390
1391
0
  if (nread == -1)
1392
0
    g_task_return_error (task, error);
1393
0
  else
1394
0
    g_task_return_int (task, nread);
1395
  /* g_input_stream_real_read_async() unrefs task */
1396
0
}
1397
1398
1399
static void
1400
g_input_stream_real_read_async (GInputStream        *stream,
1401
        void                *buffer,
1402
        gsize                count,
1403
        int                  io_priority,
1404
        GCancellable        *cancellable,
1405
        GAsyncReadyCallback  callback,
1406
        gpointer             user_data)
1407
0
{
1408
0
  GTask *task;
1409
0
  ReadData *op;
1410
  
1411
0
  op = g_slice_new0 (ReadData);
1412
0
  task = g_task_new (stream, cancellable, callback, user_data);
1413
0
  g_task_set_source_tag (task, g_input_stream_real_read_async);
1414
0
  g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1415
0
  g_task_set_priority (task, io_priority);
1416
0
  op->buffer = buffer;
1417
0
  op->count = count;
1418
1419
0
  if (!g_input_stream_async_read_is_via_threads (stream))
1420
0
    read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1421
0
  else
1422
0
    g_task_run_in_thread (task, read_async_thread);
1423
0
  g_object_unref (task);
1424
0
}
1425
1426
static gssize
1427
g_input_stream_real_read_finish (GInputStream  *stream,
1428
         GAsyncResult  *result,
1429
         GError       **error)
1430
0
{
1431
0
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1432
1433
0
  return g_task_propagate_int (G_TASK (result), error);
1434
0
}
1435
1436
1437
static void
1438
skip_async_thread (GTask        *task,
1439
                   gpointer      source_object,
1440
                   gpointer      task_data,
1441
                   GCancellable *cancellable)
1442
0
{
1443
0
  GInputStream *stream = source_object;
1444
0
  gsize count = GPOINTER_TO_SIZE (task_data);
1445
0
  GInputStreamClass *class;
1446
0
  GError *error = NULL;
1447
0
  gssize ret;
1448
1449
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1450
0
  ret = class->skip (stream, count,
1451
0
                     g_task_get_cancellable (task),
1452
0
                     &error);
1453
0
  if (ret == -1)
1454
0
    g_task_return_error (task, error);
1455
0
  else
1456
0
    g_task_return_int (task, ret);
1457
0
}
1458
1459
typedef struct {
1460
  char buffer[8192];
1461
  gsize count;
1462
  gsize count_skipped;
1463
} SkipFallbackAsyncData;
1464
1465
static void
1466
skip_callback_wrapper (GObject      *source_object,
1467
           GAsyncResult *res,
1468
           gpointer      user_data)
1469
0
{
1470
0
  GInputStreamClass *class;
1471
0
  GTask *task = user_data;
1472
0
  SkipFallbackAsyncData *data = g_task_get_task_data (task);
1473
0
  GError *error = NULL;
1474
0
  gssize ret;
1475
1476
0
  ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1477
1478
0
  if (ret > 0)
1479
0
    {
1480
0
      data->count -= ret;
1481
0
      data->count_skipped += ret;
1482
1483
0
      if (data->count > 0)
1484
0
  {
1485
0
    class = G_INPUT_STREAM_GET_CLASS (source_object);
1486
0
    class->read_async (G_INPUT_STREAM (source_object),
1487
0
                             data->buffer, MIN (8192, data->count),
1488
0
                             g_task_get_priority (task),
1489
0
                             g_task_get_cancellable (task),
1490
0
                             skip_callback_wrapper, task);
1491
0
    return;
1492
0
  }
1493
0
    }
1494
1495
0
  if (ret == -1 &&
1496
0
      g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1497
0
      data->count_skipped)
1498
0
    {
1499
      /* No error, return partial read */
1500
0
      g_clear_error (&error);
1501
0
    }
1502
1503
0
  if (error)
1504
0
    g_task_return_error (task, error);
1505
0
  else
1506
0
    g_task_return_int (task, data->count_skipped);
1507
0
  g_object_unref (task);
1508
0
 }
1509
1510
static void
1511
g_input_stream_real_skip_async (GInputStream        *stream,
1512
        gsize                count,
1513
        int                  io_priority,
1514
        GCancellable        *cancellable,
1515
        GAsyncReadyCallback  callback,
1516
        gpointer             user_data)
1517
0
{
1518
0
  GInputStreamClass *class;
1519
0
  SkipFallbackAsyncData *data;
1520
0
  GTask *task;
1521
1522
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1523
1524
0
  task = g_task_new (stream, cancellable, callback, user_data);
1525
0
  g_task_set_source_tag (task, g_input_stream_real_skip_async);
1526
0
  g_task_set_priority (task, io_priority);
1527
1528
0
  if (g_input_stream_async_read_is_via_threads (stream))
1529
0
    {
1530
      /* Read is thread-using async fallback.
1531
       * Make skip use threads too, so that we can use a possible sync skip
1532
       * implementation. */
1533
0
      g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1534
1535
0
      g_task_run_in_thread (task, skip_async_thread);
1536
0
      g_object_unref (task);
1537
0
    }
1538
0
  else
1539
0
    {
1540
      /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1541
      
1542
      /* There is a custom async read function, lets use that. */
1543
0
      data = g_new (SkipFallbackAsyncData, 1);
1544
0
      data->count = count;
1545
0
      data->count_skipped = 0;
1546
0
      g_task_set_task_data (task, data, g_free);
1547
0
      g_task_set_check_cancellable (task, FALSE);
1548
0
      class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1549
0
       skip_callback_wrapper, task);
1550
0
    }
1551
1552
0
}
1553
1554
static gssize
1555
g_input_stream_real_skip_finish (GInputStream  *stream,
1556
         GAsyncResult  *result,
1557
         GError       **error)
1558
0
{
1559
0
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1560
1561
0
  return g_task_propagate_int (G_TASK (result), error);
1562
0
}
1563
1564
static void
1565
close_async_thread (GTask        *task,
1566
                    gpointer      source_object,
1567
                    gpointer      task_data,
1568
                    GCancellable *cancellable)
1569
0
{
1570
0
  GInputStream *stream = source_object;
1571
0
  GInputStreamClass *class;
1572
0
  GError *error = NULL;
1573
0
  gboolean result;
1574
1575
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1576
0
  if (class->close_fn)
1577
0
    {
1578
0
      result = class->close_fn (stream,
1579
0
                                g_task_get_cancellable (task),
1580
0
                                &error);
1581
0
      if (!result)
1582
0
        {
1583
0
          g_task_return_error (task, error);
1584
0
          return;
1585
0
        }
1586
0
    }
1587
1588
0
  g_task_return_boolean (task, TRUE);
1589
0
}
1590
1591
static void
1592
g_input_stream_real_close_async (GInputStream        *stream,
1593
         int                  io_priority,
1594
         GCancellable        *cancellable,
1595
         GAsyncReadyCallback  callback,
1596
         gpointer             user_data)
1597
0
{
1598
0
  GTask *task;
1599
1600
0
  task = g_task_new (stream, cancellable, callback, user_data);
1601
0
  g_task_set_source_tag (task, g_input_stream_real_close_async);
1602
0
  g_task_set_check_cancellable (task, FALSE);
1603
0
  g_task_set_priority (task, io_priority);
1604
  
1605
0
  g_task_run_in_thread (task, close_async_thread);
1606
0
  g_object_unref (task);
1607
0
}
1608
1609
static gboolean
1610
g_input_stream_real_close_finish (GInputStream  *stream,
1611
          GAsyncResult  *result,
1612
          GError       **error)
1613
0
{
1614
0
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1615
1616
0
  return g_task_propagate_boolean (G_TASK (result), error);
1617
0
}