Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/refs/log.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

149 statements  

1# This module is part of GitPython and is released under the 

2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

3 

4__all__ = ["RefLog", "RefLogEntry"] 

5 

6from mmap import mmap 

7import os.path as osp 

8import re 

9import time as _time 

10 

11from git.compat import defenc 

12from git.objects.util import ( 

13 Serializable, 

14 altz_to_utctz_str, 

15 parse_date, 

16) 

17from git.util import ( 

18 Actor, 

19 LockedFD, 

20 LockFile, 

21 assure_directory_exists, 

22 bin_to_hex, 

23 file_contents_ro_filepath, 

24 to_native_path, 

25) 

26 

27# typing ------------------------------------------------------------------ 

28 

29from typing import Iterator, List, Tuple, TYPE_CHECKING, Union 

30 

31from git.types import PathLike 

32 

33if TYPE_CHECKING: 

34 from io import BytesIO 

35 

36 from git.config import GitConfigParser, SectionConstraint 

37 from git.refs import SymbolicReference 

38 

39# ------------------------------------------------------------------------------ 

40 

41 

42class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): 

43 """Named tuple allowing easy access to the revlog data fields.""" 

44 

45 _re_hexsha_only = re.compile(r"^[0-9A-Fa-f]{40}$") 

46 

47 __slots__ = () 

48 

49 def __repr__(self) -> str: 

50 """Representation of ourselves in git reflog format.""" 

51 return self.format() 

52 

53 def format(self) -> str: 

54 """:return: A string suitable to be placed in a reflog file.""" 

55 act = self.actor 

56 time = self.time 

57 return "{} {} {} <{}> {!s} {}\t{}\n".format( 

58 self.oldhexsha, 

59 self.newhexsha, 

60 act.name, 

61 act.email, 

62 time[0], 

63 altz_to_utctz_str(time[1]), 

64 self.message, 

65 ) 

66 

67 @property 

68 def oldhexsha(self) -> str: 

69 """The hexsha to the commit the ref pointed to before the change.""" 

70 return self[0] 

71 

72 @property 

73 def newhexsha(self) -> str: 

74 """The hexsha to the commit the ref now points to, after the change.""" 

75 return self[1] 

76 

77 @property 

78 def actor(self) -> Actor: 

79 """Actor instance, providing access.""" 

80 return self[2] 

81 

82 @property 

83 def time(self) -> Tuple[int, int]: 

84 """Time as tuple: 

85 

86 * [0] = ``int(time)`` 

87 * [1] = ``int(timezone_offset)`` in :attr:`time.altzone` format 

88 """ 

89 return self[3] 

90 

91 @property 

92 def message(self) -> str: 

93 """Message describing the operation that acted on the reference.""" 

94 return self[4] 

95 

96 @classmethod 

97 def new( 

98 cls, 

99 oldhexsha: str, 

100 newhexsha: str, 

101 actor: Actor, 

102 time: int, 

103 tz_offset: int, 

104 message: str, 

105 ) -> "RefLogEntry": # skipcq: PYL-W0621 

106 """:return: New instance of a :class:`RefLogEntry`""" 

107 if not isinstance(actor, Actor): 

108 raise ValueError("Need actor instance, got %s" % actor) 

109 # END check types 

110 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message)) 

111 

112 @classmethod 

113 def from_line(cls, line: bytes) -> "RefLogEntry": 

114 """:return: New :class:`RefLogEntry` instance from the given revlog line. 

115 

116 :param line: 

117 Line bytes without trailing newline 

118 

119 :raise ValueError: 

120 If `line` could not be parsed. 

121 """ 

122 line_str = line.decode(defenc) 

123 fields = line_str.split("\t", 1) 

124 if len(fields) == 1: 

125 info, msg = fields[0], None 

126 elif len(fields) == 2: 

127 info, msg = fields 

128 else: 

129 raise ValueError("Line must have up to two TAB-separated fields." " Got %s" % repr(line_str)) 

130 # END handle first split 

131 

132 oldhexsha = info[:40] 

133 newhexsha = info[41:81] 

134 for hexsha in (oldhexsha, newhexsha): 

135 if not cls._re_hexsha_only.match(hexsha): 

136 raise ValueError("Invalid hexsha: %r" % (hexsha,)) 

137 # END if hexsha re doesn't match 

138 # END for each hexsha 

139 

140 email_end = info.find(">", 82) 

141 if email_end == -1: 

142 raise ValueError("Missing token: >") 

143 # END handle missing end brace 

144 

145 actor = Actor._from_string(info[82 : email_end + 1]) 

146 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621 

147 

148 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) 

149 

150 

151class RefLog(List[RefLogEntry], Serializable): 

152 R"""A reflog contains :class:`RefLogEntry`\s, each of which defines a certain state 

153 of the head in question. Custom query methods allow to retrieve log entries by date 

154 or by other criteria. 

155 

156 Reflog entries are ordered. The first added entry is first in the list. The last 

157 entry, i.e. the last change of the head or reference, is last in the list. 

158 """ 

159 

160 __slots__ = ("_path",) 

161 

162 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog": 

163 inst = super().__new__(cls) 

164 return inst 

165 

166 def __init__(self, filepath: Union[PathLike, None] = None) -> None: 

167 """Initialize this instance with an optional filepath, from which we will 

168 initialize our data. The path is also used to write changes back using the 

169 :meth:`write` method.""" 

170 self._path = filepath 

171 if filepath is not None: 

172 self._read_from_file() 

173 # END handle filepath 

174 

175 def _read_from_file(self) -> None: 

176 try: 

177 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True) 

178 except OSError: 

179 # It is possible and allowed that the file doesn't exist! 

180 return 

181 # END handle invalid log 

182 

183 try: 

184 self._deserialize(fmap) 

185 finally: 

186 fmap.close() 

187 # END handle closing of handle 

188 

189 # { Interface 

190 

191 @classmethod 

192 def from_file(cls, filepath: PathLike) -> "RefLog": 

193 """ 

194 :return: 

195 A new :class:`RefLog` instance containing all entries from the reflog at the 

196 given `filepath`. 

197 

198 :param filepath: 

199 Path to reflog. 

200 

201 :raise ValueError: 

202 If the file could not be read or was corrupted in some way. 

203 """ 

204 return cls(filepath) 

205 

206 @classmethod 

207 def path(cls, ref: "SymbolicReference") -> str: 

208 """ 

209 :return: 

210 String to absolute path at which the reflog of the given ref instance would 

211 be found. The path is not guaranteed to point to a valid file though. 

212 

213 :param ref: 

214 :class:`~git.refs.symbolic.SymbolicReference` instance 

215 """ 

216 return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path)) 

217 

218 @classmethod 

219 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]: 

220 """ 

221 :return: 

222 Iterator yielding :class:`RefLogEntry` instances, one for each line read 

223 from the given stream. 

224 

225 :param stream: 

226 File-like object containing the revlog in its native format or string 

227 instance pointing to a file to read. 

228 """ 

229 new_entry = RefLogEntry.from_line 

230 if isinstance(stream, str): 

231 # Default args return mmap since Python 3. 

232 _stream = file_contents_ro_filepath(stream) 

233 assert isinstance(_stream, mmap) 

234 else: 

235 _stream = stream 

236 # END handle stream type 

237 while True: 

238 line = _stream.readline() 

239 if not line: 

240 return 

241 yield new_entry(line.strip()) 

242 # END endless loop 

243 

244 @classmethod 

245 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": 

246 """ 

247 :return: 

248 :class:`RefLogEntry` at the given index. 

249 

250 :param filepath: 

251 Full path to the index file from which to read the entry. 

252 

253 :param index: 

254 Python list compatible index, i.e. it may be negative to specify an entry 

255 counted from the end of the list. 

256 

257 :raise IndexError: 

258 If the entry didn't exist. 

259 

260 :note: 

261 This method is faster as it only parses the entry at index, skipping all 

262 other lines. Nonetheless, the whole file has to be read if the index is 

263 negative. 

264 """ 

265 with open(filepath, "rb") as fp: 

266 if index < 0: 

267 return RefLogEntry.from_line(fp.readlines()[index].strip()) 

268 # Read until index is reached. 

269 

270 for i in range(index + 1): 

271 line = fp.readline() 

272 if not line: 

273 raise IndexError(f"Index file ended at line {i + 1}, before given index was reached") 

274 # END abort on eof 

275 # END handle runup 

276 

277 return RefLogEntry.from_line(line.strip()) 

278 # END handle index 

279 

280 def to_file(self, filepath: PathLike) -> None: 

281 """Write the contents of the reflog instance to a file at the given filepath. 

282 

283 :param filepath: 

284 Path to file. Parent directories are assumed to exist. 

285 """ 

286 lfd = LockedFD(filepath) 

287 assure_directory_exists(filepath, is_file=True) 

288 

289 fp = lfd.open(write=True, stream=True) 

290 try: 

291 self._serialize(fp) 

292 lfd.commit() 

293 except BaseException: 

294 lfd.rollback() 

295 raise 

296 # END handle change 

297 

298 @classmethod 

299 def append_entry( 

300 cls, 

301 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None], 

302 filepath: PathLike, 

303 oldbinsha: bytes, 

304 newbinsha: bytes, 

305 message: str, 

306 write: bool = True, 

307 ) -> "RefLogEntry": 

308 """Append a new log entry to the revlog at filepath. 

309 

310 :param config_reader: 

311 Configuration reader of the repository - used to obtain user information. 

312 May also be an :class:`~git.util.Actor` instance identifying the committer 

313 directly or ``None``. 

314 

315 :param filepath: 

316 Full path to the log file. 

317 

318 :param oldbinsha: 

319 Binary sha of the previous commit. 

320 

321 :param newbinsha: 

322 Binary sha of the current commit. 

323 

324 :param message: 

325 Message describing the change to the reference. 

326 

327 :param write: 

328 If ``True``, the changes will be written right away. 

329 Otherwise the change will not be written. 

330 

331 :return: 

332 :class:`RefLogEntry` objects which was appended to the log. 

333 

334 :note: 

335 As we are append-only, concurrent access is not a problem as we do not 

336 interfere with readers. 

337 """ 

338 

339 if len(oldbinsha) != 20 or len(newbinsha) != 20: 

340 raise ValueError("Shas need to be given in binary format") 

341 # END handle sha type 

342 assure_directory_exists(filepath, is_file=True) 

343 first_line = message.split("\n")[0] 

344 if isinstance(config_reader, Actor): 

345 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why? 

346 else: 

347 committer = Actor.committer(config_reader) 

348 entry = RefLogEntry( 

349 ( 

350 bin_to_hex(oldbinsha).decode("ascii"), 

351 bin_to_hex(newbinsha).decode("ascii"), 

352 committer, 

353 (int(_time.time()), _time.altzone), 

354 first_line, 

355 ) 

356 ) 

357 

358 if write: 

359 lf = LockFile(filepath) 

360 lf._obtain_lock_or_raise() 

361 fd = open(filepath, "ab") 

362 try: 

363 fd.write(entry.format().encode(defenc)) 

364 finally: 

365 fd.close() 

366 lf._release_lock() 

367 # END handle write operation 

368 return entry 

369 

370 def write(self) -> "RefLog": 

371 """Write this instance's data to the file we are originating from. 

372 

373 :return: 

374 self 

375 """ 

376 if self._path is None: 

377 raise ValueError("Instance was not initialized with a path, use to_file(...) instead") 

378 # END assert path 

379 self.to_file(self._path) 

380 return self 

381 

382 # } END interface 

383 

384 # { Serializable Interface 

385 

386 def _serialize(self, stream: "BytesIO") -> "RefLog": 

387 write = stream.write 

388 

389 # Write all entries. 

390 for e in self: 

391 write(e.format().encode(defenc)) 

392 # END for each entry 

393 return self 

394 

395 def _deserialize(self, stream: "BytesIO") -> "RefLog": 

396 self.extend(self.iter_entries(stream)) 

397 return self 

398 

399 # } END serializable interface