/src/libreoffice/package/source/zipapi/XBufferedThreadedStream.cxx
Line | Count | Source |
1 | | /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | /* |
3 | | * This file is part of the LibreOffice project. |
4 | | * |
5 | | * This Source Code Form is subject to the terms of the Mozilla Public |
6 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
7 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
8 | | */ |
9 | | |
10 | | #include "XBufferedThreadedStream.hxx" |
11 | | |
12 | | using namespace css::uno; |
13 | | |
14 | | namespace { |
15 | | |
16 | | class UnzippingThread: public salhelper::Thread |
17 | | { |
18 | | XBufferedThreadedStream &mxStream; |
19 | | public: |
20 | 22.6k | explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {} |
21 | | private: |
22 | | virtual void execute() override |
23 | 22.6k | { |
24 | 22.6k | try |
25 | 22.6k | { |
26 | 22.6k | mxStream.produce(); |
27 | 22.6k | } |
28 | 22.6k | catch (...) |
29 | 22.6k | { |
30 | 10.4k | mxStream.saveException(std::current_exception()); |
31 | 10.4k | } |
32 | | |
33 | 22.6k | mxStream.setTerminateThread(); |
34 | 22.6k | } |
35 | | }; |
36 | | |
37 | | } |
38 | | |
39 | | XBufferedThreadedStream::XBufferedThreadedStream( |
40 | | const Reference<XInputStream>& xSrcStream, |
41 | | sal_Int64 nStreamSize) |
42 | 22.6k | : mxSrcStream( xSrcStream ) |
43 | 22.6k | , mnPos(0) |
44 | 22.6k | , mnStreamSize( nStreamSize ) |
45 | 22.6k | , mnOffset( 0 ) |
46 | 22.6k | , mxUnzippingThread( new UnzippingThread(*this) ) |
47 | 22.6k | , mbTerminateThread( false ) |
48 | 22.6k | { |
49 | 22.6k | mxUnzippingThread->launch(); |
50 | 22.6k | } |
51 | | |
52 | | XBufferedThreadedStream::~XBufferedThreadedStream() |
53 | 22.6k | { |
54 | 22.6k | setTerminateThread(); |
55 | 22.6k | mxUnzippingThread->join(); |
56 | 22.6k | } |
57 | | |
58 | | /** |
59 | | * Reads from UnbufferedStream in a separate thread and stores the buffer blocks |
60 | | * in maPendingBuffers queue for further use. |
61 | | */ |
62 | | void XBufferedThreadedStream::produce() |
63 | 22.6k | { |
64 | 22.6k | Buffer pProducedBuffer; |
65 | 22.6k | sal_Int64 nTotalBytesRead(0); |
66 | 22.6k | std::unique_lock<std::mutex> aGuard( maBufferProtector ); |
67 | 22.6k | do |
68 | 29.9k | { |
69 | 29.9k | if( !maUsedBuffers.empty() ) |
70 | 17.2k | { |
71 | 17.2k | pProducedBuffer = maUsedBuffers.front(); |
72 | 17.2k | maUsedBuffers.pop(); |
73 | 17.2k | } |
74 | | |
75 | 29.9k | aGuard.unlock(); |
76 | 29.9k | nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize ); |
77 | | |
78 | 29.9k | aGuard.lock(); |
79 | 29.9k | maPendingBuffers.push( pProducedBuffer ); |
80 | 29.9k | maBufferConsumeResume.notify_one(); |
81 | | |
82 | 29.9k | if (!mbTerminateThread) |
83 | 17.6k | maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } ); |
84 | | |
85 | 29.9k | } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize ); |
86 | 22.6k | } |
87 | | |
88 | | /** |
89 | | * Fetches next available block from maPendingBuffers for use in Reading thread. |
90 | | */ |
91 | | const Buffer& XBufferedThreadedStream::getNextBlock() |
92 | 51.4k | { |
93 | 51.4k | std::unique_lock<std::mutex> aGuard( maBufferProtector ); |
94 | 51.4k | const sal_Int32 nBufSize = maInUseBuffer.getLength(); |
95 | 51.4k | if( nBufSize <= 0 || mnOffset >= nBufSize ) |
96 | 23.8k | { |
97 | 23.8k | if( mnOffset >= nBufSize ) |
98 | 23.8k | maUsedBuffers.push( maInUseBuffer ); |
99 | | |
100 | 43.7k | maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } ); |
101 | | |
102 | 23.8k | if( maPendingBuffers.empty() ) |
103 | 7.66k | { |
104 | 7.66k | maInUseBuffer = Buffer(); |
105 | 7.66k | if (maSavedException) |
106 | 7.66k | std::rethrow_exception(maSavedException); |
107 | 7.66k | } |
108 | 16.1k | else |
109 | 16.1k | { |
110 | 16.1k | maInUseBuffer = maPendingBuffers.front(); |
111 | 16.1k | maPendingBuffers.pop(); |
112 | 16.1k | mnOffset = 0; |
113 | | |
114 | 16.1k | if( maPendingBuffers.size() <= nBufferLowWater ) |
115 | 16.0k | maBufferProduceResume.notify_one(); |
116 | 16.1k | } |
117 | 23.8k | } |
118 | | |
119 | 51.4k | return maInUseBuffer; |
120 | 51.4k | } |
121 | | |
122 | | void XBufferedThreadedStream::setTerminateThread() |
123 | 61.6k | { |
124 | 61.6k | std::scoped_lock<std::mutex> aGuard( maBufferProtector ); |
125 | 61.6k | mbTerminateThread = true; |
126 | 61.6k | maBufferProduceResume.notify_one(); |
127 | 61.6k | maBufferConsumeResume.notify_one(); |
128 | 61.6k | } |
129 | | |
130 | | sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead ) |
131 | 54.9k | { |
132 | 54.9k | if( !hasBytes() ) |
133 | 8.29k | return 0; |
134 | | |
135 | 46.7k | const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) ); |
136 | 46.7k | rData.realloc( nAvailableSize ); |
137 | 46.7k | auto pData = rData.getArray(); |
138 | 46.7k | sal_Int32 i = 0, nPendingBytes = nAvailableSize; |
139 | | |
140 | 94.8k | while( nPendingBytes ) |
141 | 48.1k | { |
142 | 48.1k | const Buffer &pBuffer = getNextBlock(); |
143 | 48.1k | if( !pBuffer.hasElements() ) |
144 | 0 | { |
145 | 0 | rData.realloc( nAvailableSize - nPendingBytes ); |
146 | 0 | return nAvailableSize - nPendingBytes; |
147 | 0 | } |
148 | 48.1k | const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset ); |
149 | | |
150 | 48.1k | memcpy( &pData[i], &pBuffer[mnOffset], limit ); |
151 | | |
152 | 48.1k | nPendingBytes -= limit; |
153 | 48.1k | mnOffset += limit; |
154 | 48.1k | mnPos += limit; |
155 | 48.1k | i += limit; |
156 | 48.1k | } |
157 | | |
158 | 46.7k | return nAvailableSize; |
159 | 46.7k | } |
160 | | |
161 | | sal_Int32 XBufferedThreadedStream::readSomeBytes( sal_Int8* pData, sal_Int32 nBytesToRead ) |
162 | 1.72k | { |
163 | 1.72k | if( !hasBytes() ) |
164 | 0 | return 0; |
165 | | |
166 | 1.72k | const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) ); |
167 | 1.72k | sal_Int32 i = 0, nPendingBytes = nAvailableSize; |
168 | | |
169 | 5.04k | while( nPendingBytes ) |
170 | 3.31k | { |
171 | 3.31k | const Buffer &pBuffer = getNextBlock(); |
172 | 3.31k | if( !pBuffer.hasElements() ) |
173 | 0 | return nAvailableSize - nPendingBytes; |
174 | 3.31k | const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset ); |
175 | | |
176 | 3.31k | memcpy( &pData[i], &pBuffer[mnOffset], limit ); |
177 | | |
178 | 3.31k | nPendingBytes -= limit; |
179 | 3.31k | mnOffset += limit; |
180 | 3.31k | mnPos += limit; |
181 | 3.31k | i += limit; |
182 | 3.31k | } |
183 | | |
184 | 1.72k | return nAvailableSize; |
185 | 1.72k | } |
186 | | |
187 | | sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) |
188 | 47.7k | { |
189 | 47.7k | return readBytes( aData, nMaxBytesToRead ); |
190 | 47.7k | } |
191 | | void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip ) |
192 | 0 | { |
193 | 0 | if( nBytesToSkip ) |
194 | 0 | { |
195 | 0 | Sequence < sal_Int8 > aSequence( nBytesToSkip ); |
196 | 0 | readBytes( aSequence, nBytesToSkip ); |
197 | 0 | } |
198 | 0 | } |
199 | | |
200 | | sal_Int32 SAL_CALL XBufferedThreadedStream::available() |
201 | 0 | { |
202 | 0 | if( !hasBytes() ) |
203 | 0 | return 0; |
204 | | |
205 | 0 | return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) ); |
206 | 0 | } |
207 | | |
208 | | void SAL_CALL XBufferedThreadedStream::closeInput() |
209 | 16.3k | { |
210 | 16.3k | setTerminateThread(); |
211 | 16.3k | mxUnzippingThread->join(); |
212 | 16.3k | mxSrcStream->closeInput(); |
213 | 16.3k | } |
214 | | |
215 | | /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |