Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/gitdb/stream.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

303 statements  

1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors 

2# 

3# This module is part of GitDB and is released under 

4# the New BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6from io import BytesIO 

7 

8import mmap 

9import os 

10import sys 

11import zlib 

12 

13from gitdb.fun import ( 

14 msb_size, 

15 stream_copy, 

16 apply_delta_data, 

17 connect_deltas, 

18 delta_types 

19) 

20 

21from gitdb.util import ( 

22 allocate_memory, 

23 LazyMixin, 

24 make_sha, 

25 write, 

26 close, 

27) 

28 

29from gitdb.const import NULL_BYTE, BYTE_SPACE 

30from gitdb.utils.encoding import force_bytes 

31 

32has_perf_mod = False 

33try: 

34 from gitdb_speedups._perf import apply_delta as c_apply_delta 

35 has_perf_mod = True 

36except ImportError: 

37 pass 

38 

39__all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader', 

40 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer', 

41 'FDStream', 'NullStream') 

42 

43 

44#{ RO Streams 

45 

46class DecompressMemMapReader(LazyMixin): 

47 

48 """Reads data in chunks from a memory map and decompresses it. The client sees 

49 only the uncompressed data, respective file-like read calls are handling on-demand 

50 buffered decompression accordingly 

51 

52 A constraint on the total size of bytes is activated, simulating 

53 a logical file within a possibly larger physical memory area 

54 

55 To read efficiently, you clearly don't want to read individual bytes, instead, 

56 read a few kilobytes at least. 

57 

58 **Note:** The chunk-size should be carefully selected as it will involve quite a bit 

59 of string copying due to the way the zlib is implemented. Its very wasteful, 

60 hence we try to find a good tradeoff between allocation time and number of 

61 times we actually allocate. An own zlib implementation would be good here 

62 to better support streamed reading - it would only need to keep the mmap 

63 and decompress it into chunks, that's all ... """ 

64 __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close', 

65 '_cbr', '_phi') 

66 

67 max_read_size = 512 * 1024 # currently unused 

68 

69 def __init__(self, m, close_on_deletion, size=None): 

70 """Initialize with mmap for stream reading 

71 :param m: must be content data - use new if you have object data and no size""" 

72 self._m = m 

73 self._zip = zlib.decompressobj() 

74 self._buf = None # buffer of decompressed bytes 

75 self._buflen = 0 # length of bytes in buffer 

76 if size is not None: 

77 self._s = size # size of uncompressed data to read in total 

78 self._br = 0 # num uncompressed bytes read 

79 self._cws = 0 # start byte of compression window 

80 self._cwe = 0 # end byte of compression window 

81 self._cbr = 0 # number of compressed bytes read 

82 self._phi = False # is True if we parsed the header info 

83 self._close = close_on_deletion # close the memmap on deletion ? 

84 

85 def _set_cache_(self, attr): 

86 assert attr == '_s' 

87 # only happens for size, which is a marker to indicate we still 

88 # have to parse the header from the stream 

89 self._parse_header_info() 

90 

91 def __del__(self): 

92 self.close() 

93 

94 def _parse_header_info(self): 

95 """If this stream contains object data, parse the header info and skip the 

96 stream to a point where each read will yield object content 

97 

98 :return: parsed type_string, size""" 

99 # read header 

100 # should really be enough, cgit uses 8192 I believe 

101 # And for good reason !! This needs to be that high for the header to be read correctly in all cases 

102 maxb = 8192 

103 self._s = maxb 

104 hdr = self.read(maxb) 

105 hdrend = hdr.find(NULL_BYTE) 

106 typ, size = hdr[:hdrend].split(BYTE_SPACE) 

107 size = int(size) 

108 self._s = size 

109 

110 # adjust internal state to match actual header length that we ignore 

111 # The buffer will be depleted first on future reads 

112 self._br = 0 

113 hdrend += 1 

114 self._buf = BytesIO(hdr[hdrend:]) 

115 self._buflen = len(hdr) - hdrend 

116 

117 self._phi = True 

118 

119 return typ, size 

120 

121 #{ Interface 

122 

123 @classmethod 

124 def new(self, m, close_on_deletion=False): 

125 """Create a new DecompressMemMapReader instance for acting as a read-only stream 

126 This method parses the object header from m and returns the parsed 

127 type and size, as well as the created stream instance. 

128 

129 :param m: memory map on which to operate. It must be object data ( header + contents ) 

130 :param close_on_deletion: if True, the memory map will be closed once we are 

131 being deleted""" 

132 inst = DecompressMemMapReader(m, close_on_deletion, 0) 

133 typ, size = inst._parse_header_info() 

134 return typ, size, inst 

135 

136 def data(self): 

137 """:return: random access compatible data we are working on""" 

138 return self._m 

139 

140 def close(self): 

141 """Close our underlying stream of compressed bytes if this was allowed during initialization 

142 :return: True if we closed the underlying stream 

143 :note: can be called safely 

144 """ 

145 if self._close: 

146 if hasattr(self._m, 'close'): 

147 self._m.close() 

148 self._close = False 

149 # END handle resource freeing 

150 

151 def compressed_bytes_read(self): 

152 """ 

153 :return: number of compressed bytes read. This includes the bytes it 

154 took to decompress the header ( if there was one )""" 

155 # ABSTRACT: When decompressing a byte stream, it can be that the first 

156 # x bytes which were requested match the first x bytes in the loosely 

157 # compressed datastream. This is the worst-case assumption that the reader 

158 # does, it assumes that it will get at least X bytes from X compressed bytes 

159 # in call cases. 

160 # The caveat is that the object, according to our known uncompressed size, 

161 # is already complete, but there are still some bytes left in the compressed 

162 # stream that contribute to the amount of compressed bytes. 

163 # How can we know that we are truly done, and have read all bytes we need 

164 # to read ? 

165 # Without help, we cannot know, as we need to obtain the status of the 

166 # decompression. If it is not finished, we need to decompress more data 

167 # until it is finished, to yield the actual number of compressed bytes 

168 # belonging to the decompressed object 

169 # We are using a custom zlib module for this, if its not present, 

170 # we try to put in additional bytes up for decompression if feasible 

171 # and check for the unused_data. 

172 

173 # Only scrub the stream forward if we are officially done with the 

174 # bytes we were to have. 

175 if self._br == self._s and not self._zip.unused_data: 

176 # manipulate the bytes-read to allow our own read method to continue 

177 # but keep the window at its current position 

178 self._br = 0 

179 if hasattr(self._zip, 'status'): 

180 while self._zip.status == zlib.Z_OK: 

181 self.read(mmap.PAGESIZE) 

182 # END scrub-loop custom zlib 

183 else: 

184 # pass in additional pages, until we have unused data 

185 while not self._zip.unused_data and self._cbr != len(self._m): 

186 self.read(mmap.PAGESIZE) 

187 # END scrub-loop default zlib 

188 # END handle stream scrubbing 

189 

190 # reset bytes read, just to be sure 

191 self._br = self._s 

192 # END handle stream scrubbing 

193 

194 # unused data ends up in the unconsumed tail, which was removed 

195 # from the count already 

196 return self._cbr 

197 

198 #} END interface 

199 

200 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): 

201 """Allows to reset the stream to restart reading 

202 :raise ValueError: If offset and whence are not 0""" 

203 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): 

204 raise ValueError("Can only seek to position 0") 

205 # END handle offset 

206 

207 self._zip = zlib.decompressobj() 

208 self._br = self._cws = self._cwe = self._cbr = 0 

209 if self._phi: 

210 self._phi = False 

211 del(self._s) # trigger header parsing on first access 

212 # END skip header 

213 

214 def read(self, size=-1): 

215 if size < 1: 

216 size = self._s - self._br 

217 else: 

218 size = min(size, self._s - self._br) 

219 # END clamp size 

220 

221 if size == 0: 

222 return b'' 

223 # END handle depletion 

224 

225 # deplete the buffer, then just continue using the decompress object 

226 # which has an own buffer. We just need this to transparently parse the 

227 # header from the zlib stream 

228 dat = b'' 

229 if self._buf: 

230 if self._buflen >= size: 

231 # have enough data 

232 dat = self._buf.read(size) 

233 self._buflen -= size 

234 self._br += size 

235 return dat 

236 else: 

237 dat = self._buf.read() # ouch, duplicates data 

238 size -= self._buflen 

239 self._br += self._buflen 

240 

241 self._buflen = 0 

242 self._buf = None 

243 # END handle buffer len 

244 # END handle buffer 

245 

246 # decompress some data 

247 # Abstract: zlib needs to operate on chunks of our memory map ( which may 

248 # be large ), as it will otherwise and always fill in the 'unconsumed_tail' 

249 # attribute which possible reads our whole map to the end, forcing 

250 # everything to be read from disk even though just a portion was requested. 

251 # As this would be a nogo, we workaround it by passing only chunks of data, 

252 # moving the window into the memory map along as we decompress, which keeps 

253 # the tail smaller than our chunk-size. This causes 'only' the chunk to be 

254 # copied once, and another copy of a part of it when it creates the unconsumed 

255 # tail. We have to use it to hand in the appropriate amount of bytes during 

256 # the next read. 

257 # 

258 # Decompress in a loop until we have produced `size` bytes or run out 

259 # of progress. Iteration (instead of recursion) keeps the call bounded 

260 # for streams that consume many input bytes per produced output byte 

261 # (e.g. zlib stored blocks of length zero); the previous recursive 

262 # form blew the stack on inputs > ~1500 empty blocks (issue #120 

263 # follow-up). 

264 dcompdat = b'' 

265 while True: 

266 tail = self._zip.unconsumed_tail 

267 remaining = size - len(dcompdat) 

268 if tail: 

269 # move the window, make it as large as size demands. For code-clarity, 

270 # we just take the chunk from our map again instead of reusing the unconsumed 

271 # tail. The latter one would save some memory copying, but we could end up 

272 # with not getting enough data uncompressed, so we had to sort that out as well. 

273 # Now we just assume the worst case, hence the data is uncompressed and the window 

274 # needs to be as large as the uncompressed bytes we want to read. 

275 self._cws = self._cwe - len(tail) 

276 self._cwe = self._cws + remaining 

277 else: 

278 cws = self._cws 

279 self._cws = self._cwe 

280 self._cwe = cws + remaining 

281 # END handle tail 

282 

283 # if window is too small, make it larger so zip can decompress something 

284 if self._cwe - self._cws < 8: 

285 self._cwe = self._cws + 8 

286 # END adjust winsize 

287 

288 # takes a slice, but doesn't copy the data, it says ... 

289 indata = self._m[self._cws:self._cwe] 

290 

291 # get the actual window end to be sure we don't use it for computations 

292 self._cwe = self._cws + len(indata) 

293 chunk = self._zip.decompress(indata, remaining) 

294 # update the amount of compressed bytes read 

295 # We feed possibly overlapping chunks, which is why the unconsumed tail 

296 # has to be taken into consideration, as well as the unused data 

297 # if we hit the end of the stream 

298 # NOTE: Behavior changed in PY2.7 onward, which requires special handling to make the tests work properly. 

299 # They are thorough, and I assume it is truly working. 

300 # Why is this logic as convoluted as it is ? Please look at the table in 

301 # https://github.com/gitpython-developers/gitdb/issues/19 to learn about the test-results. 

302 # Basically, on py2.6, you want to use branch 1, whereas on all other python version, the second branch 

303 # will be the one that works. 

304 # However, the zlib VERSIONs as well as the platform check is used to further match the entries in the 

305 # table in the github issue. This is it ... it was the only way I could make this work everywhere. 

306 # IT's CERTAINLY GOING TO BITE US IN THE FUTURE ... . 

307 if getattr(zlib, 'ZLIB_RUNTIME_VERSION', zlib.ZLIB_VERSION) in ('1.2.7', '1.2.5') and not sys.platform == 'darwin': 

308 unused_datalen = len(self._zip.unconsumed_tail) 

309 else: 

310 unused_datalen = len(self._zip.unconsumed_tail) + len(self._zip.unused_data) 

311 # # end handle very special case ... 

312 

313 consumed = len(indata) - unused_datalen 

314 self._cbr += consumed 

315 self._br += len(chunk) 

316 if chunk: 

317 if not isinstance(dcompdat, bytearray): 

318 dcompdat = bytearray(dcompdat) 

319 dcompdat.extend(chunk) 

320 

321 # Stop when we have enough or there is no path to more output. 

322 # `chunk` may legitimately be empty mid-stream when zlib is 

323 # consuming header / dictionary frames; in that case we keep 

324 # iterating as long as we are still feeding zlib new bytes 

325 # (consumed > 0) and zlib has not flagged end-of-stream. The 

326 # compressed_bytes_read() scrub loop drives this same code with 

327 # _br manipulated to 0 past zip EOF; it terminates here because 

328 # `getattr(_zip, 'eof', False)` is True or no compressed bytes 

329 # are consumed. The empty-block recursion attack from issue #120 

330 # follow-up is bounded by the iteration; each empty block does 

331 # consume input, so the loop walks the stream forward a constant 

332 # amount per iteration without growing the call stack. 

333 if len(dcompdat) >= size or self._br >= self._s: 

334 break 

335 zip_eof = getattr(self._zip, 'eof', False) 

336 if not chunk and (zip_eof or len(indata) == 0 or consumed == 0): 

337 break 

338 # END iterative decompress 

339 

340 if dat: 

341 dcompdat = dat + dcompdat 

342 # END prepend our cached data 

343 

344 return dcompdat 

345 

346 

347class DeltaApplyReader(LazyMixin): 

348 

349 """A reader which dynamically applies pack deltas to a base object, keeping the 

350 memory demands to a minimum. 

351 

352 The size of the final object is only obtainable once all deltas have been 

353 applied, unless it is retrieved from a pack index. 

354 

355 The uncompressed Delta has the following layout (MSB being a most significant 

356 bit encoded dynamic size): 

357 

358 * MSB Source Size - the size of the base against which the delta was created 

359 * MSB Target Size - the size of the resulting data after the delta was applied 

360 * A list of one byte commands (cmd) which are followed by a specific protocol: 

361 

362 * cmd & 0x80 - copy delta_data[offset:offset+size] 

363 

364 * Followed by an encoded offset into the delta data 

365 * Followed by an encoded size of the chunk to copy 

366 

367 * cmd & 0x7f - insert 

368 

369 * insert cmd bytes from the delta buffer into the output stream 

370 

371 * cmd == 0 - invalid operation ( or error in delta stream ) 

372 """ 

373 __slots__ = ( 

374 "_bstream", # base stream to which to apply the deltas 

375 "_dstreams", # tuple of delta stream readers 

376 "_mm_target", # memory map of the delta-applied data 

377 "_size", # actual number of bytes in _mm_target 

378 "_br" # number of bytes read 

379 ) 

380 

381 #{ Configuration 

382 k_max_memory_move = 250 * 1000 * 1000 

383 #} END configuration 

384 

385 def __init__(self, stream_list): 

386 """Initialize this instance with a list of streams, the first stream being 

387 the delta to apply on top of all following deltas, the last stream being the 

388 base object onto which to apply the deltas""" 

389 assert len(stream_list) > 1, "Need at least one delta and one base stream" 

390 

391 self._bstream = stream_list[-1] 

392 self._dstreams = tuple(stream_list[:-1]) 

393 self._br = 0 

394 

395 def _set_cache_too_slow_without_c(self, attr): 

396 # the direct algorithm is fastest and most direct if there is only one 

397 # delta. Also, the extra overhead might not be worth it for items smaller 

398 # than X - definitely the case in python, every function call costs 

399 # huge amounts of time 

400 # if len(self._dstreams) * self._bstream.size < self.k_max_memory_move: 

401 if len(self._dstreams) == 1: 

402 return self._set_cache_brute_(attr) 

403 

404 # Aggregate all deltas into one delta in reverse order. Hence we take 

405 # the last delta, and reverse-merge its ancestor delta, until we receive 

406 # the final delta data stream. 

407 dcl = connect_deltas(self._dstreams) 

408 

409 # call len directly, as the (optional) c version doesn't implement the sequence 

410 # protocol 

411 if dcl.rbound() == 0: 

412 self._size = 0 

413 self._mm_target = allocate_memory(0) 

414 return 

415 # END handle empty list 

416 

417 self._size = dcl.rbound() 

418 self._mm_target = allocate_memory(self._size) 

419 

420 bbuf = allocate_memory(self._bstream.size) 

421 stream_copy(self._bstream.read, bbuf.write, self._bstream.size, 256 * mmap.PAGESIZE) 

422 

423 # APPLY CHUNKS 

424 write = self._mm_target.write 

425 dcl.apply(bbuf, write) 

426 

427 self._mm_target.seek(0) 

428 

429 def _set_cache_brute_(self, attr): 

430 """If we are here, we apply the actual deltas""" 

431 # TODO: There should be a special case if there is only one stream 

432 # Then the default-git algorithm should perform a tad faster, as the 

433 # delta is not peaked into, causing less overhead. 

434 buffer_info_list = list() 

435 max_target_size = 0 

436 for dstream in self._dstreams: 

437 buf = dstream.read(512) # read the header information + X 

438 offset, src_size = msb_size(buf) 

439 offset, target_size = msb_size(buf, offset) 

440 buffer_info_list.append((buf[offset:], offset, src_size, target_size)) 

441 max_target_size = max(max_target_size, target_size) 

442 # END for each delta stream 

443 

444 # sanity check - the first delta to apply should have the same source 

445 # size as our actual base stream 

446 base_size = self._bstream.size 

447 target_size = max_target_size 

448 

449 # if we have more than 1 delta to apply, we will swap buffers, hence we must 

450 # assure that all buffers we use are large enough to hold all the results 

451 if len(self._dstreams) > 1: 

452 base_size = target_size = max(base_size, max_target_size) 

453 # END adjust buffer sizes 

454 

455 # Allocate private memory map big enough to hold the first base buffer 

456 # We need random access to it 

457 bbuf = allocate_memory(base_size) 

458 stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE) 

459 

460 # allocate memory map large enough for the largest (intermediate) target 

461 # We will use it as scratch space for all delta ops. If the final 

462 # target buffer is smaller than our allocated space, we just use parts 

463 # of it upon return. 

464 tbuf = allocate_memory(target_size) 

465 

466 # for each delta to apply, memory map the decompressed delta and 

467 # work on the op-codes to reconstruct everything. 

468 # For the actual copying, we use a seek and write pattern of buffer 

469 # slices. 

470 final_target_size = None 

471 for (dbuf, offset, src_size, target_size), dstream in zip(reversed(buffer_info_list), reversed(self._dstreams)): 

472 # allocate a buffer to hold all delta data - fill in the data for 

473 # fast access. We do this as we know that reading individual bytes 

474 # from our stream would be slower than necessary ( although possible ) 

475 # The dbuf buffer contains commands after the first two MSB sizes, the 

476 # offset specifies the amount of bytes read to get the sizes. 

477 ddata = allocate_memory(dstream.size - offset) 

478 ddata.write(dbuf) 

479 # read the rest from the stream. The size we give is larger than necessary 

480 stream_copy(dstream.read, ddata.write, dstream.size, 256 * mmap.PAGESIZE) 

481 

482 ####################################################################### 

483 if 'c_apply_delta' in globals(): 

484 c_apply_delta(bbuf, ddata, tbuf) 

485 else: 

486 apply_delta_data(bbuf, src_size, ddata, len(ddata), tbuf.write) 

487 ####################################################################### 

488 

489 # finally, swap out source and target buffers. The target is now the 

490 # base for the next delta to apply 

491 bbuf, tbuf = tbuf, bbuf 

492 bbuf.seek(0) 

493 tbuf.seek(0) 

494 final_target_size = target_size 

495 # END for each delta to apply 

496 

497 # its already seeked to 0, constrain it to the actual size 

498 # NOTE: in the end of the loop, it swaps buffers, hence our target buffer 

499 # is not tbuf, but bbuf ! 

500 self._mm_target = bbuf 

501 self._size = final_target_size 

502 

503 #{ Configuration 

504 if not has_perf_mod: 

505 _set_cache_ = _set_cache_brute_ 

506 else: 

507 _set_cache_ = _set_cache_too_slow_without_c 

508 

509 #} END configuration 

510 

511 def read(self, count=0): 

512 bl = self._size - self._br # bytes left 

513 if count < 1 or count > bl: 

514 count = bl 

515 # NOTE: we could check for certain size limits, and possibly 

516 # return buffers instead of strings to prevent byte copying 

517 data = self._mm_target.read(count) 

518 self._br += len(data) 

519 return data 

520 

521 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): 

522 """Allows to reset the stream to restart reading 

523 

524 :raise ValueError: If offset and whence are not 0""" 

525 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): 

526 raise ValueError("Can only seek to position 0") 

527 # END handle offset 

528 self._br = 0 

529 self._mm_target.seek(0) 

530 

531 #{ Interface 

532 

533 @classmethod 

534 def new(cls, stream_list): 

535 """ 

536 Convert the given list of streams into a stream which resolves deltas 

537 when reading from it. 

538 

539 :param stream_list: two or more stream objects, first stream is a Delta 

540 to the object that you want to resolve, followed by N additional delta 

541 streams. The list's last stream must be a non-delta stream. 

542 

543 :return: Non-Delta OPackStream object whose stream can be used to obtain 

544 the decompressed resolved data 

545 :raise ValueError: if the stream list cannot be handled""" 

546 if len(stream_list) < 2: 

547 raise ValueError("Need at least two streams") 

548 # END single object special handling 

549 

550 if stream_list[-1].type_id in delta_types: 

551 raise ValueError( 

552 "Cannot resolve deltas if there is no base object stream, last one was type: %s" % stream_list[-1].type) 

553 # END check stream 

554 return cls(stream_list) 

555 

556 #} END interface 

557 

558 #{ OInfo like Interface 

559 

560 @property 

561 def type(self): 

562 return self._bstream.type 

563 

564 @property 

565 def type_id(self): 

566 return self._bstream.type_id 

567 

568 @property 

569 def size(self): 

570 """:return: number of uncompressed bytes in the stream""" 

571 return self._size 

572 

573 #} END oinfo like interface 

574 

575 

576#} END RO streams 

577 

578 

579#{ W Streams 

580 

581class Sha1Writer: 

582 

583 """Simple stream writer which produces a sha whenever you like as it degests 

584 everything it is supposed to write""" 

585 __slots__ = "sha1" 

586 

587 def __init__(self): 

588 self.sha1 = make_sha() 

589 

590 #{ Stream Interface 

591 

592 def write(self, data): 

593 """:raise IOError: If not all bytes could be written 

594 :param data: byte object 

595 :return: length of incoming data""" 

596 

597 self.sha1.update(data) 

598 

599 return len(data) 

600 

601 # END stream interface 

602 

603 #{ Interface 

604 

605 def sha(self, as_hex=False): 

606 """:return: sha so far 

607 :param as_hex: if True, sha will be hex-encoded, binary otherwise""" 

608 if as_hex: 

609 return self.sha1.hexdigest() 

610 return self.sha1.digest() 

611 

612 #} END interface 

613 

614 

615class FlexibleSha1Writer(Sha1Writer): 

616 

617 """Writer producing a sha1 while passing on the written bytes to the given 

618 write function""" 

619 __slots__ = 'writer' 

620 

621 def __init__(self, writer): 

622 Sha1Writer.__init__(self) 

623 self.writer = writer 

624 

625 def write(self, data): 

626 Sha1Writer.write(self, data) 

627 self.writer(data) 

628 

629 

630class ZippedStoreShaWriter(Sha1Writer): 

631 

632 """Remembers everything someone writes to it and generates a sha""" 

633 __slots__ = ('buf', 'zip') 

634 

635 def __init__(self): 

636 Sha1Writer.__init__(self) 

637 self.buf = BytesIO() 

638 self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) 

639 

640 def __getattr__(self, attr): 

641 return getattr(self.buf, attr) 

642 

643 def write(self, data): 

644 alen = Sha1Writer.write(self, data) 

645 self.buf.write(self.zip.compress(data)) 

646 

647 return alen 

648 

649 def close(self): 

650 self.buf.write(self.zip.flush()) 

651 

652 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): 

653 """Seeking currently only supports to rewind written data 

654 Multiple writes are not supported""" 

655 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): 

656 raise ValueError("Can only seek to position 0") 

657 # END handle offset 

658 self.buf.seek(0) 

659 

660 def getvalue(self): 

661 """:return: string value from the current stream position to the end""" 

662 return self.buf.getvalue() 

663 

664 

665class FDCompressedSha1Writer(Sha1Writer): 

666 

667 """Digests data written to it, making the sha available, then compress the 

668 data and write it to the file descriptor 

669 

670 **Note:** operates on raw file descriptors 

671 **Note:** for this to work, you have to use the close-method of this instance""" 

672 __slots__ = ("fd", "sha1", "zip") 

673 

674 # default exception 

675 exc = IOError("Failed to write all bytes to filedescriptor") 

676 

677 def __init__(self, fd): 

678 super().__init__() 

679 self.fd = fd 

680 self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) 

681 

682 #{ Stream Interface 

683 

684 def write(self, data): 

685 """:raise IOError: If not all bytes could be written 

686 :return: length of incoming data""" 

687 self.sha1.update(data) 

688 cdata = self.zip.compress(data) 

689 bytes_written = write(self.fd, cdata) 

690 

691 if bytes_written != len(cdata): 

692 raise self.exc 

693 

694 return len(data) 

695 

696 def close(self): 

697 remainder = self.zip.flush() 

698 if write(self.fd, remainder) != len(remainder): 

699 raise self.exc 

700 return close(self.fd) 

701 

702 #} END stream interface 

703 

704 

705class FDStream: 

706 

707 """A simple wrapper providing the most basic functions on a file descriptor 

708 with the fileobject interface. Cannot use os.fdopen as the resulting stream 

709 takes ownership""" 

710 __slots__ = ("_fd", '_pos') 

711 

712 def __init__(self, fd): 

713 self._fd = fd 

714 self._pos = 0 

715 

716 def write(self, data): 

717 self._pos += len(data) 

718 os.write(self._fd, data) 

719 

720 def read(self, count=0): 

721 if count == 0: 

722 count = os.path.getsize(self._filepath) 

723 # END handle read everything 

724 

725 bytes = os.read(self._fd, count) 

726 self._pos += len(bytes) 

727 return bytes 

728 

729 def fileno(self): 

730 return self._fd 

731 

732 def tell(self): 

733 return self._pos 

734 

735 def close(self): 

736 close(self._fd) 

737 

738 

739class NullStream: 

740 

741 """A stream that does nothing but providing a stream interface. 

742 Use it like /dev/null""" 

743 __slots__ = tuple() 

744 

745 def read(self, size=0): 

746 return '' 

747 

748 def close(self): 

749 pass 

750 

751 def write(self, data): 

752 return len(data) 

753 

754 

755#} END W streams