Coverage Report

Created: 2025-06-13 06:55

/src/glib/gio/ginputstream.c
Line
Count
Source (jump to first uncovered line)
1
/* GIO - GLib Input, Output and Streaming Library
2
 * 
3
 * Copyright (C) 2006-2007 Red Hat, Inc.
4
 *
5
 * SPDX-License-Identifier: LGPL-2.1-or-later
6
 *
7
 * This library is free software; you can redistribute it and/or
8
 * modify it under the terms of the GNU Lesser General Public
9
 * License as published by the Free Software Foundation; either
10
 * version 2.1 of the License, or (at your option) any later version.
11
 *
12
 * This library is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
 * Lesser General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Lesser General
18
 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19
 *
20
 * Author: Alexander Larsson <alexl@redhat.com>
21
 */
22
23
#include "config.h"
24
#include <glib.h>
25
#include "glibintl.h"
26
27
#include "ginputstream.h"
28
#include "gioprivate.h"
29
#include "gseekable.h"
30
#include "gcancellable.h"
31
#include "gasyncresult.h"
32
#include "gioerror.h"
33
#include "gpollableinputstream.h"
34
35
/**
36
 * SECTION:ginputstream
37
 * @short_description: Base class for implementing streaming input
38
 * @include: gio/gio.h
39
 *
40
 * #GInputStream has functions to read from a stream (g_input_stream_read()),
41
 * to close a stream (g_input_stream_close()) and to skip some content
42
 * (g_input_stream_skip()). 
43
 *
44
 * To copy the content of an input stream to an output stream without 
45
 * manually handling the reads and writes, use g_output_stream_splice().
46
 *
47
 * See the documentation for #GIOStream for details of thread safety of
48
 * streaming APIs.
49
 *
50
 * All of these functions have async variants too.
51
 **/
52
53
struct _GInputStreamPrivate {
54
  guint closed : 1;
55
  guint pending : 1;
56
  GAsyncReadyCallback outstanding_callback;
57
};
58
59
G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
60
61
static gssize   g_input_stream_real_skip         (GInputStream         *stream,
62
              gsize                 count,
63
              GCancellable         *cancellable,
64
              GError              **error);
65
static void     g_input_stream_real_read_async   (GInputStream         *stream,
66
              void                 *buffer,
67
              gsize                 count,
68
              int                   io_priority,
69
              GCancellable         *cancellable,
70
              GAsyncReadyCallback   callback,
71
              gpointer              user_data);
72
static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
73
              GAsyncResult         *result,
74
              GError              **error);
75
static void     g_input_stream_real_skip_async   (GInputStream         *stream,
76
              gsize                 count,
77
              int                   io_priority,
78
              GCancellable         *cancellable,
79
              GAsyncReadyCallback   callback,
80
              gpointer              data);
81
static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
82
              GAsyncResult         *result,
83
              GError              **error);
84
static void     g_input_stream_real_close_async  (GInputStream         *stream,
85
              int                   io_priority,
86
              GCancellable         *cancellable,
87
              GAsyncReadyCallback   callback,
88
              gpointer              data);
89
static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
90
              GAsyncResult         *result,
91
              GError              **error);
92
93
static void
94
g_input_stream_dispose (GObject *object)
95
0
{
96
0
  GInputStream *stream;
97
98
0
  stream = G_INPUT_STREAM (object);
99
  
100
0
  if (!stream->priv->closed)
101
0
    g_input_stream_close (stream, NULL, NULL);
102
103
0
  G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
104
0
}
105
106
107
static void
108
g_input_stream_class_init (GInputStreamClass *klass)
109
0
{
110
0
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
111
  
112
0
  gobject_class->dispose = g_input_stream_dispose;
113
  
114
0
  klass->skip = g_input_stream_real_skip;
115
0
  klass->read_async = g_input_stream_real_read_async;
116
0
  klass->read_finish = g_input_stream_real_read_finish;
117
0
  klass->skip_async = g_input_stream_real_skip_async;
118
0
  klass->skip_finish = g_input_stream_real_skip_finish;
119
0
  klass->close_async = g_input_stream_real_close_async;
120
0
  klass->close_finish = g_input_stream_real_close_finish;
121
0
}
122
123
static void
124
g_input_stream_init (GInputStream *stream)
125
0
{
126
0
  stream->priv = g_input_stream_get_instance_private (stream);
127
0
}
128
129
/**
130
 * g_input_stream_read:
131
 * @stream: a #GInputStream.
132
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
133
 *     a buffer to read data into (which should be at least count bytes long).
134
 * @count: (in): the number of bytes that will be read from the stream
135
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
136
 * @error: location to store the error occurring, or %NULL to ignore
137
 *
138
 * Tries to read @count bytes from the stream into the buffer starting at
139
 * @buffer. Will block during this read.
140
 * 
141
 * If count is zero returns zero and does nothing. A value of @count
142
 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
143
 *
144
 * On success, the number of bytes read into the buffer is returned.
145
 * It is not an error if this is not the same as the requested size, as it
146
 * can happen e.g. near the end of a file. Zero is returned on end of file
147
 * (or if @count is zero),  but never otherwise.
148
 *
149
 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
150
 * at any position, and this function doesn't nul-terminate the @buffer.
151
 *
152
 * If @cancellable is not %NULL, then the operation can be cancelled by
153
 * triggering the cancellable object from another thread. If the operation
154
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
155
 * operation was partially finished when the operation was cancelled the
156
 * partial result will be returned, without an error.
157
 *
158
 * On error -1 is returned and @error is set accordingly.
159
 * 
160
 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
161
 **/
162
gssize
163
g_input_stream_read  (GInputStream  *stream,
164
          void          *buffer,
165
          gsize          count,
166
          GCancellable  *cancellable,
167
          GError       **error)
168
0
{
169
0
  GInputStreamClass *class;
170
0
  gssize res;
171
172
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
173
0
  g_return_val_if_fail (buffer != NULL, 0);
174
175
0
  if (count == 0)
176
0
    return 0;
177
  
178
0
  if (((gssize) count) < 0)
179
0
    {
180
0
      g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
181
0
       _("Too large count value passed to %s"), G_STRFUNC);
182
0
      return -1;
183
0
    }
184
185
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
186
187
0
  if (class->read_fn == NULL) 
188
0
    {
189
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
190
0
                           _("Input stream doesn’t implement read"));
191
0
      return -1;
192
0
    }
193
194
0
  if (!g_input_stream_set_pending (stream, error))
195
0
    return -1;
196
197
0
  if (cancellable)
198
0
    g_cancellable_push_current (cancellable);
199
  
200
0
  res = class->read_fn (stream, buffer, count, cancellable, error);
201
202
0
  if (cancellable)
203
0
    g_cancellable_pop_current (cancellable);
204
  
205
0
  g_input_stream_clear_pending (stream);
206
207
0
  return res;
208
0
}
209
210
/**
211
 * g_input_stream_read_all:
212
 * @stream: a #GInputStream.
213
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
214
 *     a buffer to read data into (which should be at least count bytes long).
215
 * @count: (in): the number of bytes that will be read from the stream
216
 * @bytes_read: (out): location to store the number of bytes that was read from the stream
217
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
218
 * @error: location to store the error occurring, or %NULL to ignore
219
 *
220
 * Tries to read @count bytes from the stream into the buffer starting at
221
 * @buffer. Will block during this read.
222
 *
223
 * This function is similar to g_input_stream_read(), except it tries to
224
 * read as many bytes as requested, only stopping on an error or end of stream.
225
 *
226
 * On a successful read of @count bytes, or if we reached the end of the
227
 * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
228
 * read into @buffer.
229
 * 
230
 * If there is an error during the operation %FALSE is returned and @error
231
 * is set to indicate the error status.
232
 *
233
 * As a special exception to the normal conventions for functions that
234
 * use #GError, if this function returns %FALSE (and sets @error) then
235
 * @bytes_read will be set to the number of bytes that were successfully
236
 * read before the error was encountered.  This functionality is only
237
 * available from C.  If you need it from another language then you must
238
 * write your own loop around g_input_stream_read().
239
 *
240
 * Returns: %TRUE on success, %FALSE if there was an error
241
 **/
242
gboolean
243
g_input_stream_read_all (GInputStream  *stream,
244
       void          *buffer,
245
       gsize          count,
246
       gsize         *bytes_read,
247
       GCancellable  *cancellable,
248
       GError       **error)
249
0
{
250
0
  gsize _bytes_read;
251
0
  gssize res;
252
253
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
254
0
  g_return_val_if_fail (buffer != NULL, FALSE);
255
256
0
  _bytes_read = 0;
257
0
  while (_bytes_read < count)
258
0
    {
259
0
      res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
260
0
         cancellable, error);
261
0
      if (res == -1)
262
0
  {
263
0
    if (bytes_read)
264
0
      *bytes_read = _bytes_read;
265
0
    return FALSE;
266
0
  }
267
      
268
0
      if (res == 0)
269
0
  break;
270
271
0
      _bytes_read += res;
272
0
    }
273
274
0
  if (bytes_read)
275
0
    *bytes_read = _bytes_read;
276
0
  return TRUE;
277
0
}
278
279
/**
280
 * g_input_stream_read_bytes:
281
 * @stream: a #GInputStream.
282
 * @count: maximum number of bytes that will be read from the stream. Common
283
 * values include 4096 and 8192.
284
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
285
 * @error: location to store the error occurring, or %NULL to ignore
286
 *
287
 * Like g_input_stream_read(), this tries to read @count bytes from
288
 * the stream in a blocking fashion. However, rather than reading into
289
 * a user-supplied buffer, this will create a new #GBytes containing
290
 * the data that was read. This may be easier to use from language
291
 * bindings.
292
 *
293
 * If count is zero, returns a zero-length #GBytes and does nothing. A
294
 * value of @count larger than %G_MAXSSIZE will cause a
295
 * %G_IO_ERROR_INVALID_ARGUMENT error.
296
 *
297
 * On success, a new #GBytes is returned. It is not an error if the
298
 * size of this object is not the same as the requested size, as it
299
 * can happen e.g. near the end of a file. A zero-length #GBytes is
300
 * returned on end of file (or if @count is zero), but never
301
 * otherwise.
302
 *
303
 * If @cancellable is not %NULL, then the operation can be cancelled by
304
 * triggering the cancellable object from another thread. If the operation
305
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
306
 * operation was partially finished when the operation was cancelled the
307
 * partial result will be returned, without an error.
308
 *
309
 * On error %NULL is returned and @error is set accordingly.
310
 *
311
 * Returns: (transfer full): a new #GBytes, or %NULL on error
312
 *
313
 * Since: 2.34
314
 **/
315
GBytes *
316
g_input_stream_read_bytes (GInputStream  *stream,
317
         gsize          count,
318
         GCancellable  *cancellable,
319
         GError       **error)
320
0
{
321
0
  guchar *buf;
322
0
  gssize nread;
323
324
0
  buf = g_malloc (count);
325
0
  nread = g_input_stream_read (stream, buf, count, cancellable, error);
326
0
  if (nread == -1)
327
0
    {
328
0
      g_free (buf);
329
0
      return NULL;
330
0
    }
331
0
  else if (nread == 0)
332
0
    {
333
0
      g_free (buf);
334
0
      return g_bytes_new_static ("", 0);
335
0
    }
336
0
  else
337
0
    return g_bytes_new_take (buf, nread);
338
0
}
339
340
/**
341
 * g_input_stream_skip:
342
 * @stream: a #GInputStream.
343
 * @count: the number of bytes that will be skipped from the stream
344
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
345
 * @error: location to store the error occurring, or %NULL to ignore
346
 *
347
 * Tries to skip @count bytes from the stream. Will block during the operation.
348
 *
349
 * This is identical to g_input_stream_read(), from a behaviour standpoint,
350
 * but the bytes that are skipped are not returned to the user. Some
351
 * streams have an implementation that is more efficient than reading the data.
352
 *
353
 * This function is optional for inherited classes, as the default implementation
354
 * emulates it using read.
355
 *
356
 * If @cancellable is not %NULL, then the operation can be cancelled by
357
 * triggering the cancellable object from another thread. If the operation
358
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
359
 * operation was partially finished when the operation was cancelled the
360
 * partial result will be returned, without an error.
361
 *
362
 * Returns: Number of bytes skipped, or -1 on error
363
 **/
364
gssize
365
g_input_stream_skip (GInputStream  *stream,
366
         gsize          count,
367
         GCancellable  *cancellable,
368
         GError       **error)
369
0
{
370
0
  GInputStreamClass *class;
371
0
  gssize res;
372
373
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
374
375
0
  if (count == 0)
376
0
    return 0;
377
378
0
  if (((gssize) count) < 0)
379
0
    {
380
0
      g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
381
0
       _("Too large count value passed to %s"), G_STRFUNC);
382
0
      return -1;
383
0
    }
384
  
385
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
386
387
0
  if (!g_input_stream_set_pending (stream, error))
388
0
    return -1;
389
390
0
  if (cancellable)
391
0
    g_cancellable_push_current (cancellable);
392
  
393
0
  res = class->skip (stream, count, cancellable, error);
394
395
0
  if (cancellable)
396
0
    g_cancellable_pop_current (cancellable);
397
  
398
0
  g_input_stream_clear_pending (stream);
399
400
0
  return res;
401
0
}
402
403
static gssize
404
g_input_stream_real_skip (GInputStream  *stream,
405
        gsize          count,
406
        GCancellable  *cancellable,
407
        GError       **error)
408
0
{
409
0
  GInputStreamClass *class;
410
0
  gssize ret, read_bytes;
411
0
  char buffer[8192];
412
0
  GError *my_error;
413
414
0
  if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
415
0
    {
416
0
      GSeekable *seekable = G_SEEKABLE (stream);
417
0
      goffset start, end;
418
0
      gboolean success;
419
420
      /* g_seekable_seek() may try to set pending itself */
421
0
      stream->priv->pending = FALSE;
422
423
0
      start = g_seekable_tell (seekable);
424
425
0
      if (g_seekable_seek (G_SEEKABLE (stream),
426
0
                           0,
427
0
                           G_SEEK_END,
428
0
                           cancellable,
429
0
                           NULL))
430
0
        {
431
0
          end = g_seekable_tell (seekable);
432
0
          g_assert (start >= 0);
433
0
          g_assert (end >= start);
434
0
          if (start > (goffset) (G_MAXOFFSET - count) ||
435
0
              (goffset) (start + count) > end)
436
0
            {
437
0
              stream->priv->pending = TRUE;
438
0
              return end - start;
439
0
            }
440
441
0
          success = g_seekable_seek (G_SEEKABLE (stream),
442
0
                                     start + count,
443
0
                                     G_SEEK_SET,
444
0
                                     cancellable,
445
0
                                     error);
446
0
          stream->priv->pending = TRUE;
447
448
0
          if (success)
449
0
            return count;
450
0
          else
451
0
            return -1;
452
0
        }
453
0
    }
454
455
  /* If not seekable, or seek failed, fall back to reading data: */
456
457
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
458
459
0
  read_bytes = 0;
460
0
  while (1)
461
0
    {
462
0
      my_error = NULL;
463
464
0
      ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
465
0
                            cancellable, &my_error);
466
0
      if (ret == -1)
467
0
  {
468
0
    if (read_bytes > 0 &&
469
0
        my_error->domain == G_IO_ERROR &&
470
0
        my_error->code == G_IO_ERROR_CANCELLED)
471
0
      {
472
0
        g_error_free (my_error);
473
0
        return read_bytes;
474
0
      }
475
476
0
    g_propagate_error (error, my_error);
477
0
    return -1;
478
0
  }
479
480
0
      count -= ret;
481
0
      read_bytes += ret;
482
483
0
      if (ret == 0 || count == 0)
484
0
        return read_bytes;
485
0
    }
486
0
}
487
488
/**
489
 * g_input_stream_close:
490
 * @stream: A #GInputStream.
491
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
492
 * @error: location to store the error occurring, or %NULL to ignore
493
 *
494
 * Closes the stream, releasing resources related to it.
495
 *
496
 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
497
 * Closing a stream multiple times will not return an error.
498
 *
499
 * Streams will be automatically closed when the last reference
500
 * is dropped, but you might want to call this function to make sure 
501
 * resources are released as early as possible.
502
 *
503
 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
504
 * open after the stream is closed. See the documentation for the individual
505
 * stream for details.
506
 *
507
 * On failure the first error that happened will be reported, but the close
508
 * operation will finish as much as possible. A stream that failed to
509
 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
510
 * is important to check and report the error to the user.
511
 *
512
 * If @cancellable is not %NULL, then the operation can be cancelled by
513
 * triggering the cancellable object from another thread. If the operation
514
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
515
 * Cancelling a close will still leave the stream closed, but some streams
516
 * can use a faster close that doesn't block to e.g. check errors. 
517
 *
518
 * Returns: %TRUE on success, %FALSE on failure
519
 **/
520
gboolean
521
g_input_stream_close (GInputStream  *stream,
522
          GCancellable  *cancellable,
523
          GError       **error)
524
0
{
525
0
  GInputStreamClass *class;
526
0
  gboolean res;
527
528
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
529
530
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
531
532
0
  if (stream->priv->closed)
533
0
    return TRUE;
534
535
0
  res = TRUE;
536
537
0
  if (!g_input_stream_set_pending (stream, error))
538
0
    return FALSE;
539
540
0
  if (cancellable)
541
0
    g_cancellable_push_current (cancellable);
542
543
0
  if (class->close_fn)
544
0
    res = class->close_fn (stream, cancellable, error);
545
546
0
  if (cancellable)
547
0
    g_cancellable_pop_current (cancellable);
548
549
0
  g_input_stream_clear_pending (stream);
550
  
551
0
  stream->priv->closed = TRUE;
552
  
553
0
  return res;
554
0
}
555
556
static void
557
async_ready_callback_wrapper (GObject      *source_object,
558
            GAsyncResult *res,
559
            gpointer      user_data)
560
0
{
561
0
  GInputStream *stream = G_INPUT_STREAM (source_object);
562
563
0
  g_input_stream_clear_pending (stream);
564
0
  if (stream->priv->outstanding_callback)
565
0
    (*stream->priv->outstanding_callback) (source_object, res, user_data);
566
0
  g_object_unref (stream);
567
0
}
568
569
static void
570
async_ready_close_callback_wrapper (GObject      *source_object,
571
            GAsyncResult *res,
572
            gpointer      user_data)
573
0
{
574
0
  GInputStream *stream = G_INPUT_STREAM (source_object);
575
576
0
  g_input_stream_clear_pending (stream);
577
0
  stream->priv->closed = TRUE;
578
0
  if (stream->priv->outstanding_callback)
579
0
    (*stream->priv->outstanding_callback) (source_object, res, user_data);
580
0
  g_object_unref (stream);
581
0
}
582
583
/**
584
 * g_input_stream_read_async:
585
 * @stream: A #GInputStream.
586
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
587
 *     a buffer to read data into (which should be at least count bytes long).
588
 * @count: (in): the number of bytes that will be read from the stream
589
 * @io_priority: the [I/O priority][io-priority]
590
 * of the request. 
591
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
592
 * @callback: (scope async): a #GAsyncReadyCallback
593
 *   to call when the request is satisfied
594
 * @user_data: the data to pass to callback function
595
 *
596
 * Request an asynchronous read of @count bytes from the stream into the buffer
597
 * starting at @buffer. When the operation is finished @callback will be called. 
598
 * You can then call g_input_stream_read_finish() to get the result of the 
599
 * operation.
600
 *
601
 * During an async request no other sync and async calls are allowed on @stream, and will
602
 * result in %G_IO_ERROR_PENDING errors. 
603
 *
604
 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
605
 *
606
 * On success, the number of bytes read into the buffer will be passed to the
607
 * callback. It is not an error if this is not the same as the requested size, as it
608
 * can happen e.g. near the end of a file, but generally we try to read
609
 * as many bytes as requested. Zero is returned on end of file
610
 * (or if @count is zero),  but never otherwise.
611
 *
612
 * Any outstanding i/o request with higher priority (lower numerical value) will
613
 * be executed before an outstanding request with lower priority. Default
614
 * priority is %G_PRIORITY_DEFAULT.
615
 *
616
 * The asynchronous methods have a default fallback that uses threads to implement
617
 * asynchronicity, so they are optional for inheriting classes. However, if you
618
 * override one you must override all.
619
 **/
620
void
621
g_input_stream_read_async (GInputStream        *stream,
622
         void                *buffer,
623
         gsize                count,
624
         int                  io_priority,
625
         GCancellable        *cancellable,
626
         GAsyncReadyCallback  callback,
627
         gpointer             user_data)
628
0
{
629
0
  GInputStreamClass *class;
630
0
  GError *error = NULL;
631
632
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
633
0
  g_return_if_fail (buffer != NULL);
634
635
0
  if (count == 0)
636
0
    {
637
0
      GTask *task;
638
639
0
      task = g_task_new (stream, cancellable, callback, user_data);
640
0
      g_task_set_source_tag (task, g_input_stream_read_async);
641
0
      g_task_return_int (task, 0);
642
0
      g_object_unref (task);
643
0
      return;
644
0
    }
645
  
646
0
  if (((gssize) count) < 0)
647
0
    {
648
0
      g_task_report_new_error (stream, callback, user_data,
649
0
                               g_input_stream_read_async,
650
0
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
651
0
                               _("Too large count value passed to %s"),
652
0
                               G_STRFUNC);
653
0
      return;
654
0
    }
655
656
0
  if (!g_input_stream_set_pending (stream, &error))
657
0
    {
658
0
      g_task_report_error (stream, callback, user_data,
659
0
                           g_input_stream_read_async,
660
0
                           error);
661
0
      return;
662
0
    }
663
664
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
665
0
  stream->priv->outstanding_callback = callback;
666
0
  g_object_ref (stream);
667
0
  class->read_async (stream, buffer, count, io_priority, cancellable,
668
0
         async_ready_callback_wrapper, user_data);
669
0
}
670
671
/**
672
 * g_input_stream_read_finish:
673
 * @stream: a #GInputStream.
674
 * @result: a #GAsyncResult.
675
 * @error: a #GError location to store the error occurring, or %NULL to 
676
 * ignore.
677
 * 
678
 * Finishes an asynchronous stream read operation. 
679
 * 
680
 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
681
 **/
682
gssize
683
g_input_stream_read_finish (GInputStream  *stream,
684
          GAsyncResult  *result,
685
          GError       **error)
686
0
{
687
0
  GInputStreamClass *class;
688
  
689
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
690
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
691
692
0
  if (g_async_result_legacy_propagate_error (result, error))
693
0
    return -1;
694
0
  else if (g_async_result_is_tagged (result, g_input_stream_read_async))
695
0
    return g_task_propagate_int (G_TASK (result), error);
696
697
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
698
0
  return class->read_finish (stream, result, error);
699
0
}
700
701
typedef struct
702
{
703
  gchar *buffer;
704
  gsize to_read;
705
  gsize bytes_read;
706
} AsyncReadAll;
707
708
static void
709
free_async_read_all (gpointer data)
710
0
{
711
0
  g_slice_free (AsyncReadAll, data);
712
0
}
713
714
static void
715
read_all_callback (GObject      *stream,
716
                   GAsyncResult *result,
717
                   gpointer      user_data)
718
0
{
719
0
  GTask *task = user_data;
720
0
  AsyncReadAll *data = g_task_get_task_data (task);
721
0
  gboolean got_eof = FALSE;
722
723
0
  if (result)
724
0
    {
725
0
      GError *error = NULL;
726
0
      gssize nread;
727
728
0
      nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
729
730
0
      if (nread == -1)
731
0
        {
732
0
          g_task_return_error (task, error);
733
0
          g_object_unref (task);
734
0
          return;
735
0
        }
736
737
0
      g_assert_cmpint (nread, <=, data->to_read);
738
0
      data->to_read -= nread;
739
0
      data->bytes_read += nread;
740
0
      got_eof = (nread == 0);
741
0
    }
742
743
0
  if (got_eof || data->to_read == 0)
744
0
    {
745
0
      g_task_return_boolean (task, TRUE);
746
0
      g_object_unref (task);
747
0
    }
748
749
0
  else
750
0
    g_input_stream_read_async (G_INPUT_STREAM (stream),
751
0
                               data->buffer + data->bytes_read,
752
0
                               data->to_read,
753
0
                               g_task_get_priority (task),
754
0
                               g_task_get_cancellable (task),
755
0
                               read_all_callback, task);
756
0
}
757
758
759
static void
760
read_all_async_thread (GTask        *task,
761
                       gpointer      source_object,
762
                       gpointer      task_data,
763
                       GCancellable *cancellable)
764
0
{
765
0
  GInputStream *stream = source_object;
766
0
  AsyncReadAll *data = task_data;
767
0
  GError *error = NULL;
768
769
0
  if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
770
0
                               g_task_get_cancellable (task), &error))
771
0
    g_task_return_boolean (task, TRUE);
772
0
  else
773
0
    g_task_return_error (task, error);
774
0
}
775
776
/**
777
 * g_input_stream_read_all_async:
778
 * @stream: A #GInputStream
779
 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
780
 *     a buffer to read data into (which should be at least count bytes long)
781
 * @count: (in): the number of bytes that will be read from the stream
782
 * @io_priority: the [I/O priority][io-priority] of the request
783
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
784
 * @callback: (scope async): a #GAsyncReadyCallback
785
 *   to call when the request is satisfied
786
 * @user_data: the data to pass to callback function
787
 *
788
 * Request an asynchronous read of @count bytes from the stream into the
789
 * buffer starting at @buffer.
790
 *
791
 * This is the asynchronous equivalent of g_input_stream_read_all().
792
 *
793
 * Call g_input_stream_read_all_finish() to collect the result.
794
 *
795
 * Any outstanding I/O request with higher priority (lower numerical
796
 * value) will be executed before an outstanding request with lower
797
 * priority. Default priority is %G_PRIORITY_DEFAULT.
798
 *
799
 * Since: 2.44
800
 **/
801
void
802
g_input_stream_read_all_async (GInputStream        *stream,
803
                               void                *buffer,
804
                               gsize                count,
805
                               int                  io_priority,
806
                               GCancellable        *cancellable,
807
                               GAsyncReadyCallback  callback,
808
                               gpointer             user_data)
809
0
{
810
0
  AsyncReadAll *data;
811
0
  GTask *task;
812
813
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
814
0
  g_return_if_fail (buffer != NULL || count == 0);
815
816
0
  task = g_task_new (stream, cancellable, callback, user_data);
817
0
  data = g_slice_new0 (AsyncReadAll);
818
0
  data->buffer = buffer;
819
0
  data->to_read = count;
820
821
0
  g_task_set_source_tag (task, g_input_stream_read_all_async);
822
0
  g_task_set_task_data (task, data, free_async_read_all);
823
0
  g_task_set_priority (task, io_priority);
824
825
  /* If async reads are going to be handled via the threadpool anyway
826
   * then we may as well do it with a single dispatch instead of
827
   * bouncing in and out.
828
   */
829
0
  if (g_input_stream_async_read_is_via_threads (stream))
830
0
    {
831
0
      g_task_run_in_thread (task, read_all_async_thread);
832
0
      g_object_unref (task);
833
0
    }
834
0
  else
835
0
    read_all_callback (G_OBJECT (stream), NULL, task);
836
0
}
837
838
/**
839
 * g_input_stream_read_all_finish:
840
 * @stream: a #GInputStream
841
 * @result: a #GAsyncResult
842
 * @bytes_read: (out): location to store the number of bytes that was read from the stream
843
 * @error: a #GError location to store the error occurring, or %NULL to ignore
844
 *
845
 * Finishes an asynchronous stream read operation started with
846
 * g_input_stream_read_all_async().
847
 *
848
 * As a special exception to the normal conventions for functions that
849
 * use #GError, if this function returns %FALSE (and sets @error) then
850
 * @bytes_read will be set to the number of bytes that were successfully
851
 * read before the error was encountered.  This functionality is only
852
 * available from C.  If you need it from another language then you must
853
 * write your own loop around g_input_stream_read_async().
854
 *
855
 * Returns: %TRUE on success, %FALSE if there was an error
856
 *
857
 * Since: 2.44
858
 **/
859
gboolean
860
g_input_stream_read_all_finish (GInputStream  *stream,
861
                                GAsyncResult  *result,
862
                                gsize         *bytes_read,
863
                                GError       **error)
864
0
{
865
0
  GTask *task;
866
867
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
868
0
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
869
870
0
  task = G_TASK (result);
871
872
0
  if (bytes_read)
873
0
    {
874
0
      AsyncReadAll *data = g_task_get_task_data (task);
875
876
0
      *bytes_read = data->bytes_read;
877
0
    }
878
879
0
  return g_task_propagate_boolean (task, error);
880
0
}
881
882
static void
883
read_bytes_callback (GObject      *stream,
884
         GAsyncResult *result,
885
         gpointer      user_data)
886
0
{
887
0
  GTask *task = user_data;
888
0
  guchar *buf = g_task_get_task_data (task);
889
0
  GError *error = NULL;
890
0
  gssize nread;
891
0
  GBytes *bytes = NULL;
892
893
0
  nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
894
0
              result, &error);
895
0
  if (nread == -1)
896
0
    {
897
0
      g_free (buf);
898
0
      g_task_return_error (task, error);
899
0
    }
900
0
  else if (nread == 0)
901
0
    {
902
0
      g_free (buf);
903
0
      bytes = g_bytes_new_static ("", 0);
904
0
    }
905
0
  else
906
0
    bytes = g_bytes_new_take (buf, nread);
907
908
0
  if (bytes)
909
0
    g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
910
911
0
  g_object_unref (task);
912
0
}
913
914
/**
915
 * g_input_stream_read_bytes_async:
916
 * @stream: A #GInputStream.
917
 * @count: the number of bytes that will be read from the stream
918
 * @io_priority: the [I/O priority][io-priority] of the request
919
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
920
 * @callback: (scope async): a #GAsyncReadyCallback
921
 *   to call when the request is satisfied
922
 * @user_data: the data to pass to callback function
923
 *
924
 * Request an asynchronous read of @count bytes from the stream into a
925
 * new #GBytes. When the operation is finished @callback will be
926
 * called. You can then call g_input_stream_read_bytes_finish() to get the
927
 * result of the operation.
928
 *
929
 * During an async request no other sync and async calls are allowed
930
 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
931
 *
932
 * A value of @count larger than %G_MAXSSIZE will cause a
933
 * %G_IO_ERROR_INVALID_ARGUMENT error.
934
 *
935
 * On success, the new #GBytes will be passed to the callback. It is
936
 * not an error if this is smaller than the requested size, as it can
937
 * happen e.g. near the end of a file, but generally we try to read as
938
 * many bytes as requested. Zero is returned on end of file (or if
939
 * @count is zero), but never otherwise.
940
 *
941
 * Any outstanding I/O request with higher priority (lower numerical
942
 * value) will be executed before an outstanding request with lower
943
 * priority. Default priority is %G_PRIORITY_DEFAULT.
944
 *
945
 * Since: 2.34
946
 **/
947
void
948
g_input_stream_read_bytes_async (GInputStream          *stream,
949
         gsize                  count,
950
         int                    io_priority,
951
         GCancellable          *cancellable,
952
         GAsyncReadyCallback    callback,
953
         gpointer               user_data)
954
0
{
955
0
  GTask *task;
956
0
  guchar *buf;
957
958
0
  task = g_task_new (stream, cancellable, callback, user_data);
959
0
  g_task_set_source_tag (task, g_input_stream_read_bytes_async);
960
961
0
  buf = g_malloc (count);
962
0
  g_task_set_task_data (task, buf, NULL);
963
964
0
  g_input_stream_read_async (stream, buf, count,
965
0
                             io_priority, cancellable,
966
0
                             read_bytes_callback, task);
967
0
}
968
969
/**
970
 * g_input_stream_read_bytes_finish:
971
 * @stream: a #GInputStream.
972
 * @result: a #GAsyncResult.
973
 * @error: a #GError location to store the error occurring, or %NULL to
974
 *   ignore.
975
 *
976
 * Finishes an asynchronous stream read-into-#GBytes operation.
977
 *
978
 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
979
 *
980
 * Since: 2.34
981
 **/
982
GBytes *
983
g_input_stream_read_bytes_finish (GInputStream  *stream,
984
          GAsyncResult  *result,
985
          GError       **error)
986
0
{
987
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
988
0
  g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
989
990
0
  return g_task_propagate_pointer (G_TASK (result), error);
991
0
}
992
993
/**
994
 * g_input_stream_skip_async:
995
 * @stream: A #GInputStream.
996
 * @count: the number of bytes that will be skipped from the stream
997
 * @io_priority: the [I/O priority][io-priority] of the request
998
 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
999
 * @callback: (scope async): a #GAsyncReadyCallback
1000
 *   to call when the request is satisfied
1001
 * @user_data: the data to pass to callback function
1002
 *
1003
 * Request an asynchronous skip of @count bytes from the stream.
1004
 * When the operation is finished @callback will be called.
1005
 * You can then call g_input_stream_skip_finish() to get the result
1006
 * of the operation.
1007
 *
1008
 * During an async request no other sync and async calls are allowed,
1009
 * and will result in %G_IO_ERROR_PENDING errors.
1010
 *
1011
 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1012
 *
1013
 * On success, the number of bytes skipped will be passed to the callback.
1014
 * It is not an error if this is not the same as the requested size, as it
1015
 * can happen e.g. near the end of a file, but generally we try to skip
1016
 * as many bytes as requested. Zero is returned on end of file
1017
 * (or if @count is zero), but never otherwise.
1018
 *
1019
 * Any outstanding i/o request with higher priority (lower numerical value)
1020
 * will be executed before an outstanding request with lower priority.
1021
 * Default priority is %G_PRIORITY_DEFAULT.
1022
 *
1023
 * The asynchronous methods have a default fallback that uses threads to
1024
 * implement asynchronicity, so they are optional for inheriting classes.
1025
 * However, if you override one, you must override all.
1026
 **/
1027
void
1028
g_input_stream_skip_async (GInputStream        *stream,
1029
         gsize                count,
1030
         int                  io_priority,
1031
         GCancellable        *cancellable,
1032
         GAsyncReadyCallback  callback,
1033
         gpointer             user_data)
1034
0
{
1035
0
  GInputStreamClass *class;
1036
0
  GError *error = NULL;
1037
1038
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1039
1040
0
  if (count == 0)
1041
0
    {
1042
0
      GTask *task;
1043
1044
0
      task = g_task_new (stream, cancellable, callback, user_data);
1045
0
      g_task_set_source_tag (task, g_input_stream_skip_async);
1046
0
      g_task_return_int (task, 0);
1047
0
      g_object_unref (task);
1048
0
      return;
1049
0
    }
1050
  
1051
0
  if (((gssize) count) < 0)
1052
0
    {
1053
0
      g_task_report_new_error (stream, callback, user_data,
1054
0
                               g_input_stream_skip_async,
1055
0
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1056
0
                               _("Too large count value passed to %s"),
1057
0
                               G_STRFUNC);
1058
0
      return;
1059
0
    }
1060
1061
0
  if (!g_input_stream_set_pending (stream, &error))
1062
0
    {
1063
0
      g_task_report_error (stream, callback, user_data,
1064
0
                           g_input_stream_skip_async,
1065
0
                           error);
1066
0
      return;
1067
0
    }
1068
1069
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1070
0
  stream->priv->outstanding_callback = callback;
1071
0
  g_object_ref (stream);
1072
0
  class->skip_async (stream, count, io_priority, cancellable,
1073
0
         async_ready_callback_wrapper, user_data);
1074
0
}
1075
1076
/**
1077
 * g_input_stream_skip_finish:
1078
 * @stream: a #GInputStream.
1079
 * @result: a #GAsyncResult.
1080
 * @error: a #GError location to store the error occurring, or %NULL to 
1081
 * ignore.
1082
 * 
1083
 * Finishes a stream skip operation.
1084
 * 
1085
 * Returns: the size of the bytes skipped, or `-1` on error.
1086
 **/
1087
gssize
1088
g_input_stream_skip_finish (GInputStream  *stream,
1089
          GAsyncResult  *result,
1090
          GError       **error)
1091
0
{
1092
0
  GInputStreamClass *class;
1093
1094
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1095
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1096
1097
0
  if (g_async_result_legacy_propagate_error (result, error))
1098
0
    return -1;
1099
0
  else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1100
0
    return g_task_propagate_int (G_TASK (result), error);
1101
1102
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1103
0
  return class->skip_finish (stream, result, error);
1104
0
}
1105
1106
/**
1107
 * g_input_stream_close_async:
1108
 * @stream: A #GInputStream.
1109
 * @io_priority: the [I/O priority][io-priority] of the request
1110
 * @cancellable: (nullable): optional cancellable object
1111
 * @callback: (scope async): a #GAsyncReadyCallback
1112
 *   to call when the request is satisfied
1113
 * @user_data: the data to pass to callback function
1114
 *
1115
 * Requests an asynchronous closes of the stream, releasing resources related to it.
1116
 * When the operation is finished @callback will be called. 
1117
 * You can then call g_input_stream_close_finish() to get the result of the 
1118
 * operation.
1119
 *
1120
 * For behaviour details see g_input_stream_close().
1121
 *
1122
 * The asynchronous methods have a default fallback that uses threads to implement
1123
 * asynchronicity, so they are optional for inheriting classes. However, if you
1124
 * override one you must override all.
1125
 **/
1126
void
1127
g_input_stream_close_async (GInputStream        *stream,
1128
          int                  io_priority,
1129
          GCancellable        *cancellable,
1130
          GAsyncReadyCallback  callback,
1131
          gpointer             user_data)
1132
0
{
1133
0
  GInputStreamClass *class;
1134
0
  GError *error = NULL;
1135
1136
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1137
1138
0
  if (stream->priv->closed)
1139
0
    {
1140
0
      GTask *task;
1141
1142
0
      task = g_task_new (stream, cancellable, callback, user_data);
1143
0
      g_task_set_source_tag (task, g_input_stream_close_async);
1144
0
      g_task_return_boolean (task, TRUE);
1145
0
      g_object_unref (task);
1146
0
      return;
1147
0
    }
1148
1149
0
  if (!g_input_stream_set_pending (stream, &error))
1150
0
    {
1151
0
      g_task_report_error (stream, callback, user_data,
1152
0
                           g_input_stream_close_async,
1153
0
                           error);
1154
0
      return;
1155
0
    }
1156
  
1157
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1158
0
  stream->priv->outstanding_callback = callback;
1159
0
  g_object_ref (stream);
1160
0
  class->close_async (stream, io_priority, cancellable,
1161
0
          async_ready_close_callback_wrapper, user_data);
1162
0
}
1163
1164
/**
1165
 * g_input_stream_close_finish:
1166
 * @stream: a #GInputStream.
1167
 * @result: a #GAsyncResult.
1168
 * @error: a #GError location to store the error occurring, or %NULL to 
1169
 * ignore.
1170
 * 
1171
 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1172
 * 
1173
 * Returns: %TRUE if the stream was closed successfully.
1174
 **/
1175
gboolean
1176
g_input_stream_close_finish (GInputStream  *stream,
1177
           GAsyncResult  *result,
1178
           GError       **error)
1179
0
{
1180
0
  GInputStreamClass *class;
1181
1182
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1183
0
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1184
1185
0
  if (g_async_result_legacy_propagate_error (result, error))
1186
0
    return FALSE;
1187
0
  else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1188
0
    return g_task_propagate_boolean (G_TASK (result), error);
1189
1190
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1191
0
  return class->close_finish (stream, result, error);
1192
0
}
1193
1194
/**
1195
 * g_input_stream_is_closed:
1196
 * @stream: input stream.
1197
 * 
1198
 * Checks if an input stream is closed.
1199
 * 
1200
 * Returns: %TRUE if the stream is closed.
1201
 **/
1202
gboolean
1203
g_input_stream_is_closed (GInputStream *stream)
1204
0
{
1205
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1206
  
1207
0
  return stream->priv->closed;
1208
0
}
1209
 
1210
/**
1211
 * g_input_stream_has_pending:
1212
 * @stream: input stream.
1213
 * 
1214
 * Checks if an input stream has pending actions.
1215
 * 
1216
 * Returns: %TRUE if @stream has pending actions.
1217
 **/  
1218
gboolean
1219
g_input_stream_has_pending (GInputStream *stream)
1220
0
{
1221
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1222
  
1223
0
  return stream->priv->pending;
1224
0
}
1225
1226
/**
1227
 * g_input_stream_set_pending:
1228
 * @stream: input stream
1229
 * @error: a #GError location to store the error occurring, or %NULL to 
1230
 * ignore.
1231
 * 
1232
 * Sets @stream to have actions pending. If the pending flag is
1233
 * already set or @stream is closed, it will return %FALSE and set
1234
 * @error.
1235
 *
1236
 * Returns: %TRUE if pending was previously unset and is now set.
1237
 **/
1238
gboolean
1239
g_input_stream_set_pending (GInputStream *stream, GError **error)
1240
0
{
1241
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1242
  
1243
0
  if (stream->priv->closed)
1244
0
    {
1245
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1246
0
                           _("Stream is already closed"));
1247
0
      return FALSE;
1248
0
    }
1249
  
1250
0
  if (stream->priv->pending)
1251
0
    {
1252
0
      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1253
    /* Translators: This is an error you get if there is already an
1254
     * operation running against this stream when you try to start
1255
     * one */
1256
0
     _("Stream has outstanding operation"));
1257
0
      return FALSE;
1258
0
    }
1259
  
1260
0
  stream->priv->pending = TRUE;
1261
0
  return TRUE;
1262
0
}
1263
1264
/**
1265
 * g_input_stream_clear_pending:
1266
 * @stream: input stream
1267
 * 
1268
 * Clears the pending flag on @stream.
1269
 **/
1270
void
1271
g_input_stream_clear_pending (GInputStream *stream)
1272
0
{
1273
0
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
1274
  
1275
0
  stream->priv->pending = FALSE;
1276
0
}
1277
1278
/*< internal >
1279
 * g_input_stream_async_read_is_via_threads:
1280
 * @stream: input stream
1281
 *
1282
 * Checks if an input stream's read_async function uses threads.
1283
 *
1284
 * Returns: %TRUE if @stream's read_async function uses threads.
1285
 **/
1286
gboolean
1287
g_input_stream_async_read_is_via_threads (GInputStream *stream)
1288
0
{
1289
0
  GInputStreamClass *class;
1290
1291
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1292
1293
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1294
1295
0
  return (class->read_async == g_input_stream_real_read_async &&
1296
0
      !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1297
0
        g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1298
0
}
1299
1300
/*< internal >
1301
 * g_input_stream_async_close_is_via_threads:
1302
 * @stream: input stream
1303
 *
1304
 * Checks if an input stream's close_async function uses threads.
1305
 *
1306
 * Returns: %TRUE if @stream's close_async function uses threads.
1307
 **/
1308
gboolean
1309
g_input_stream_async_close_is_via_threads (GInputStream *stream)
1310
0
{
1311
0
  GInputStreamClass *class;
1312
1313
0
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1314
1315
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1316
1317
0
  return class->close_async == g_input_stream_real_close_async;
1318
0
}
1319
1320
/********************************************
1321
 *   Default implementation of async ops    *
1322
 ********************************************/
1323
1324
typedef struct {
1325
  void   *buffer;
1326
  gsize   count;
1327
} ReadData;
1328
1329
static void
1330
free_read_data (ReadData *op)
1331
0
{
1332
0
  g_slice_free (ReadData, op);
1333
0
}
1334
1335
static void
1336
read_async_thread (GTask        *task,
1337
                   gpointer      source_object,
1338
                   gpointer      task_data,
1339
                   GCancellable *cancellable)
1340
0
{
1341
0
  GInputStream *stream = source_object;
1342
0
  ReadData *op = task_data;
1343
0
  GInputStreamClass *class;
1344
0
  GError *error = NULL;
1345
0
  gssize nread;
1346
 
1347
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1348
1349
0
  nread = class->read_fn (stream,
1350
0
                          op->buffer, op->count,
1351
0
                          g_task_get_cancellable (task),
1352
0
                          &error);
1353
0
  if (nread == -1)
1354
0
    g_task_return_error (task, error);
1355
0
  else
1356
0
    g_task_return_int (task, nread);
1357
0
}
1358
1359
static void read_async_pollable (GPollableInputStream *stream,
1360
                                 GTask                *task);
1361
1362
static gboolean
1363
read_async_pollable_ready (GPollableInputStream *stream,
1364
         gpointer              user_data)
1365
0
{
1366
0
  GTask *task = user_data;
1367
1368
0
  read_async_pollable (stream, task);
1369
0
  return FALSE;
1370
0
}
1371
1372
static void
1373
read_async_pollable (GPollableInputStream *stream,
1374
                     GTask                *task)
1375
0
{
1376
0
  ReadData *op = g_task_get_task_data (task);
1377
0
  GError *error = NULL;
1378
0
  gssize nread;
1379
1380
0
  if (g_task_return_error_if_cancelled (task))
1381
0
    return;
1382
1383
0
  nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1384
0
    read_nonblocking (stream, op->buffer, op->count, &error);
1385
1386
0
  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1387
0
    {
1388
0
      GSource *source;
1389
1390
0
      g_error_free (error);
1391
1392
0
      source = g_pollable_input_stream_create_source (stream,
1393
0
                                                      g_task_get_cancellable (task));
1394
0
      g_task_attach_source (task, source,
1395
0
                            (GSourceFunc) read_async_pollable_ready);
1396
0
      g_source_unref (source);
1397
0
      return;
1398
0
    }
1399
1400
0
  if (nread == -1)
1401
0
    g_task_return_error (task, error);
1402
0
  else
1403
0
    g_task_return_int (task, nread);
1404
  /* g_input_stream_real_read_async() unrefs task */
1405
0
}
1406
1407
1408
static void
1409
g_input_stream_real_read_async (GInputStream        *stream,
1410
        void                *buffer,
1411
        gsize                count,
1412
        int                  io_priority,
1413
        GCancellable        *cancellable,
1414
        GAsyncReadyCallback  callback,
1415
        gpointer             user_data)
1416
0
{
1417
0
  GTask *task;
1418
0
  ReadData *op;
1419
  
1420
0
  op = g_slice_new0 (ReadData);
1421
0
  task = g_task_new (stream, cancellable, callback, user_data);
1422
0
  g_task_set_source_tag (task, g_input_stream_real_read_async);
1423
0
  g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1424
0
  g_task_set_priority (task, io_priority);
1425
0
  op->buffer = buffer;
1426
0
  op->count = count;
1427
1428
0
  if (!g_input_stream_async_read_is_via_threads (stream))
1429
0
    read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1430
0
  else
1431
0
    g_task_run_in_thread (task, read_async_thread);
1432
0
  g_object_unref (task);
1433
0
}
1434
1435
static gssize
1436
g_input_stream_real_read_finish (GInputStream  *stream,
1437
         GAsyncResult  *result,
1438
         GError       **error)
1439
0
{
1440
0
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1441
1442
0
  return g_task_propagate_int (G_TASK (result), error);
1443
0
}
1444
1445
1446
static void
1447
skip_async_thread (GTask        *task,
1448
                   gpointer      source_object,
1449
                   gpointer      task_data,
1450
                   GCancellable *cancellable)
1451
0
{
1452
0
  GInputStream *stream = source_object;
1453
0
  gsize count = GPOINTER_TO_SIZE (task_data);
1454
0
  GInputStreamClass *class;
1455
0
  GError *error = NULL;
1456
0
  gssize ret;
1457
1458
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1459
0
  ret = class->skip (stream, count,
1460
0
                     g_task_get_cancellable (task),
1461
0
                     &error);
1462
0
  if (ret == -1)
1463
0
    g_task_return_error (task, error);
1464
0
  else
1465
0
    g_task_return_int (task, ret);
1466
0
}
1467
1468
typedef struct {
1469
  char buffer[8192];
1470
  gsize count;
1471
  gsize count_skipped;
1472
} SkipFallbackAsyncData;
1473
1474
static void
1475
skip_callback_wrapper (GObject      *source_object,
1476
           GAsyncResult *res,
1477
           gpointer      user_data)
1478
0
{
1479
0
  GInputStreamClass *class;
1480
0
  GTask *task = user_data;
1481
0
  SkipFallbackAsyncData *data = g_task_get_task_data (task);
1482
0
  GError *error = NULL;
1483
0
  gssize ret;
1484
1485
0
  ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1486
1487
0
  if (ret > 0)
1488
0
    {
1489
0
      data->count -= ret;
1490
0
      data->count_skipped += ret;
1491
1492
0
      if (data->count > 0)
1493
0
  {
1494
0
    class = G_INPUT_STREAM_GET_CLASS (source_object);
1495
0
    class->read_async (G_INPUT_STREAM (source_object),
1496
0
                             data->buffer, MIN (8192, data->count),
1497
0
                             g_task_get_priority (task),
1498
0
                             g_task_get_cancellable (task),
1499
0
                             skip_callback_wrapper, task);
1500
0
    return;
1501
0
  }
1502
0
    }
1503
1504
0
  if (ret == -1 &&
1505
0
      g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1506
0
      data->count_skipped)
1507
0
    {
1508
      /* No error, return partial read */
1509
0
      g_clear_error (&error);
1510
0
    }
1511
1512
0
  if (error)
1513
0
    g_task_return_error (task, error);
1514
0
  else
1515
0
    g_task_return_int (task, data->count_skipped);
1516
0
  g_object_unref (task);
1517
0
 }
1518
1519
static void
1520
g_input_stream_real_skip_async (GInputStream        *stream,
1521
        gsize                count,
1522
        int                  io_priority,
1523
        GCancellable        *cancellable,
1524
        GAsyncReadyCallback  callback,
1525
        gpointer             user_data)
1526
0
{
1527
0
  GInputStreamClass *class;
1528
0
  SkipFallbackAsyncData *data;
1529
0
  GTask *task;
1530
1531
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1532
1533
0
  task = g_task_new (stream, cancellable, callback, user_data);
1534
0
  g_task_set_source_tag (task, g_input_stream_real_skip_async);
1535
0
  g_task_set_priority (task, io_priority);
1536
1537
0
  if (g_input_stream_async_read_is_via_threads (stream))
1538
0
    {
1539
      /* Read is thread-using async fallback.
1540
       * Make skip use threads too, so that we can use a possible sync skip
1541
       * implementation. */
1542
0
      g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1543
1544
0
      g_task_run_in_thread (task, skip_async_thread);
1545
0
      g_object_unref (task);
1546
0
    }
1547
0
  else
1548
0
    {
1549
      /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1550
      
1551
      /* There is a custom async read function, lets use that. */
1552
0
      data = g_new (SkipFallbackAsyncData, 1);
1553
0
      data->count = count;
1554
0
      data->count_skipped = 0;
1555
0
      g_task_set_task_data (task, data, g_free);
1556
0
      g_task_set_check_cancellable (task, FALSE);
1557
0
      class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1558
0
       skip_callback_wrapper, task);
1559
0
    }
1560
1561
0
}
1562
1563
static gssize
1564
g_input_stream_real_skip_finish (GInputStream  *stream,
1565
         GAsyncResult  *result,
1566
         GError       **error)
1567
0
{
1568
0
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1569
1570
0
  return g_task_propagate_int (G_TASK (result), error);
1571
0
}
1572
1573
static void
1574
close_async_thread (GTask        *task,
1575
                    gpointer      source_object,
1576
                    gpointer      task_data,
1577
                    GCancellable *cancellable)
1578
0
{
1579
0
  GInputStream *stream = source_object;
1580
0
  GInputStreamClass *class;
1581
0
  GError *error = NULL;
1582
0
  gboolean result;
1583
1584
0
  class = G_INPUT_STREAM_GET_CLASS (stream);
1585
0
  if (class->close_fn)
1586
0
    {
1587
0
      result = class->close_fn (stream,
1588
0
                                g_task_get_cancellable (task),
1589
0
                                &error);
1590
0
      if (!result)
1591
0
        {
1592
0
          g_task_return_error (task, error);
1593
0
          return;
1594
0
        }
1595
0
    }
1596
1597
0
  g_task_return_boolean (task, TRUE);
1598
0
}
1599
1600
static void
1601
g_input_stream_real_close_async (GInputStream        *stream,
1602
         int                  io_priority,
1603
         GCancellable        *cancellable,
1604
         GAsyncReadyCallback  callback,
1605
         gpointer             user_data)
1606
0
{
1607
0
  GTask *task;
1608
1609
0
  task = g_task_new (stream, cancellable, callback, user_data);
1610
0
  g_task_set_source_tag (task, g_input_stream_real_close_async);
1611
0
  g_task_set_check_cancellable (task, FALSE);
1612
0
  g_task_set_priority (task, io_priority);
1613
  
1614
0
  g_task_run_in_thread (task, close_async_thread);
1615
0
  g_object_unref (task);
1616
0
}
1617
1618
static gboolean
1619
g_input_stream_real_close_finish (GInputStream  *stream,
1620
          GAsyncResult  *result,
1621
          GError       **error)
1622
0
{
1623
0
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1624
1625
0
  return g_task_propagate_boolean (G_TASK (result), error);
1626
0
}