Coverage Report

Created: 2025-11-16 09:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libreoffice/package/source/zipapi/XBufferedThreadedStream.cxx
Line
Count
Source
1
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
/*
3
 * This file is part of the LibreOffice project.
4
 *
5
 * This Source Code Form is subject to the terms of the Mozilla Public
6
 * License, v. 2.0. If a copy of the MPL was not distributed with this
7
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8
 */
9
10
#include "XBufferedThreadedStream.hxx"
11
12
using namespace css::uno;
13
14
namespace {
15
16
class UnzippingThread: public salhelper::Thread
17
{
18
    XBufferedThreadedStream &mxStream;
19
public:
20
22.6k
    explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
21
private:
22
    virtual void execute() override
23
22.6k
    {
24
22.6k
        try
25
22.6k
        {
26
22.6k
            mxStream.produce();
27
22.6k
        }
28
22.6k
        catch (...)
29
22.6k
        {
30
10.4k
            mxStream.saveException(std::current_exception());
31
10.4k
        }
32
33
22.6k
        mxStream.setTerminateThread();
34
22.6k
    }
35
};
36
37
}
38
39
XBufferedThreadedStream::XBufferedThreadedStream(
40
                    const Reference<XInputStream>& xSrcStream,
41
                    sal_Int64 nStreamSize)
42
22.6k
: mxSrcStream( xSrcStream )
43
22.6k
, mnPos(0)
44
22.6k
, mnStreamSize( nStreamSize )
45
22.6k
, mnOffset( 0 )
46
22.6k
, mxUnzippingThread( new UnzippingThread(*this) )
47
22.6k
, mbTerminateThread( false )
48
22.6k
{
49
22.6k
    mxUnzippingThread->launch();
50
22.6k
}
51
52
XBufferedThreadedStream::~XBufferedThreadedStream()
53
22.6k
{
54
22.6k
    setTerminateThread();
55
22.6k
    mxUnzippingThread->join();
56
22.6k
}
57
58
/**
59
 * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
60
 * in maPendingBuffers queue for further use.
61
 */
62
void XBufferedThreadedStream::produce()
63
22.6k
{
64
22.6k
    Buffer pProducedBuffer;
65
22.6k
    sal_Int64 nTotalBytesRead(0);
66
22.6k
    std::unique_lock<std::mutex> aGuard( maBufferProtector );
67
22.6k
    do
68
29.9k
    {
69
29.9k
        if( !maUsedBuffers.empty() )
70
17.2k
        {
71
17.2k
            pProducedBuffer = maUsedBuffers.front();
72
17.2k
            maUsedBuffers.pop();
73
17.2k
        }
74
75
29.9k
        aGuard.unlock();
76
29.9k
        nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
77
78
29.9k
        aGuard.lock();
79
29.9k
        maPendingBuffers.push( pProducedBuffer );
80
29.9k
        maBufferConsumeResume.notify_one();
81
82
29.9k
        if (!mbTerminateThread)
83
17.6k
            maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
84
85
29.9k
    } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize );
86
22.6k
}
87
88
/**
89
 * Fetches next available block from maPendingBuffers for use in Reading thread.
90
 */
91
const Buffer& XBufferedThreadedStream::getNextBlock()
92
51.4k
{
93
51.4k
    std::unique_lock<std::mutex> aGuard( maBufferProtector );
94
51.4k
    const sal_Int32 nBufSize = maInUseBuffer.getLength();
95
51.4k
    if( nBufSize <= 0 || mnOffset >= nBufSize )
96
23.8k
    {
97
23.8k
        if( mnOffset >= nBufSize )
98
23.8k
            maUsedBuffers.push( maInUseBuffer );
99
100
43.7k
        maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
101
102
23.8k
        if( maPendingBuffers.empty() )
103
7.66k
        {
104
7.66k
            maInUseBuffer = Buffer();
105
7.66k
            if (maSavedException)
106
7.66k
                std::rethrow_exception(maSavedException);
107
7.66k
        }
108
16.1k
        else
109
16.1k
        {
110
16.1k
            maInUseBuffer = maPendingBuffers.front();
111
16.1k
            maPendingBuffers.pop();
112
16.1k
            mnOffset = 0;
113
114
16.1k
            if( maPendingBuffers.size() <= nBufferLowWater )
115
16.0k
                maBufferProduceResume.notify_one();
116
16.1k
        }
117
23.8k
    }
118
119
51.4k
    return maInUseBuffer;
120
51.4k
}
121
122
void XBufferedThreadedStream::setTerminateThread()
123
61.6k
{
124
61.6k
    std::scoped_lock<std::mutex> aGuard( maBufferProtector );
125
61.6k
    mbTerminateThread = true;
126
61.6k
    maBufferProduceResume.notify_one();
127
61.6k
    maBufferConsumeResume.notify_one();
128
61.6k
}
129
130
sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
131
54.9k
{
132
54.9k
    if( !hasBytes() )
133
8.29k
        return 0;
134
135
46.7k
    const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
136
46.7k
    rData.realloc( nAvailableSize );
137
46.7k
    auto pData = rData.getArray();
138
46.7k
    sal_Int32 i = 0, nPendingBytes = nAvailableSize;
139
140
94.8k
    while( nPendingBytes )
141
48.1k
    {
142
48.1k
        const Buffer &pBuffer = getNextBlock();
143
48.1k
        if( !pBuffer.hasElements() )
144
0
        {
145
0
            rData.realloc( nAvailableSize - nPendingBytes );
146
0
            return nAvailableSize - nPendingBytes;
147
0
        }
148
48.1k
        const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
149
150
48.1k
        memcpy( &pData[i], &pBuffer[mnOffset], limit );
151
152
48.1k
        nPendingBytes -= limit;
153
48.1k
        mnOffset += limit;
154
48.1k
        mnPos += limit;
155
48.1k
        i += limit;
156
48.1k
    }
157
158
46.7k
    return nAvailableSize;
159
46.7k
}
160
161
sal_Int32 XBufferedThreadedStream::readSomeBytes( sal_Int8* pData, sal_Int32 nBytesToRead )
162
1.72k
{
163
1.72k
    if( !hasBytes() )
164
0
        return 0;
165
166
1.72k
    const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
167
1.72k
    sal_Int32 i = 0, nPendingBytes = nAvailableSize;
168
169
5.04k
    while( nPendingBytes )
170
3.31k
    {
171
3.31k
        const Buffer &pBuffer = getNextBlock();
172
3.31k
        if( !pBuffer.hasElements() )
173
0
            return nAvailableSize - nPendingBytes;
174
3.31k
        const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
175
176
3.31k
        memcpy( &pData[i], &pBuffer[mnOffset], limit );
177
178
3.31k
        nPendingBytes -= limit;
179
3.31k
        mnOffset += limit;
180
3.31k
        mnPos += limit;
181
3.31k
        i += limit;
182
3.31k
    }
183
184
1.72k
    return nAvailableSize;
185
1.72k
}
186
187
sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
188
47.7k
{
189
47.7k
    return readBytes( aData, nMaxBytesToRead );
190
47.7k
}
191
void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
192
0
{
193
0
    if( nBytesToSkip )
194
0
    {
195
0
        Sequence < sal_Int8 > aSequence( nBytesToSkip );
196
0
        readBytes( aSequence, nBytesToSkip );
197
0
    }
198
0
}
199
200
sal_Int32 SAL_CALL XBufferedThreadedStream::available()
201
0
{
202
0
    if( !hasBytes() )
203
0
        return 0;
204
205
0
    return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) );
206
0
}
207
208
void SAL_CALL XBufferedThreadedStream::closeInput()
209
16.3k
{
210
16.3k
    setTerminateThread();
211
16.3k
    mxUnzippingThread->join();
212
16.3k
    mxSrcStream->closeInput();
213
16.3k
}
214
215
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */