Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/refs/log.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# This module is part of GitPython and is released under the 

2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

3 

4__all__ = ["RefLog", "RefLogEntry"] 

5 

6from mmap import mmap 

7import re 

8import time as _time 

9 

10from git.compat import defenc 

11from git.objects.util import ( 

12 Serializable, 

13 altz_to_utctz_str, 

14 parse_date, 

15) 

16from git.util import ( 

17 Actor, 

18 LockedFD, 

19 LockFile, 

20 assure_directory_exists, 

21 bin_to_hex, 

22 file_contents_ro_filepath, 

23 to_native_path, 

24) 

25 

26# typing ------------------------------------------------------------------ 

27 

28from typing import Iterator, List, Tuple, TYPE_CHECKING, Union 

29 

30from git.types import PathLike 

31 

32if TYPE_CHECKING: 

33 from io import BytesIO 

34 

35 from git.config import GitConfigParser, SectionConstraint 

36 from git.refs import SymbolicReference 

37 

38# ------------------------------------------------------------------------------ 

39 

40 

41class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): 

42 """Named tuple allowing easy access to the revlog data fields.""" 

43 

44 _re_hexsha_only = re.compile(r"^[0-9A-Fa-f]{40}$") 

45 

46 __slots__ = () 

47 

48 def __repr__(self) -> str: 

49 """Representation of ourselves in git reflog format.""" 

50 return self.format() 

51 

52 def format(self) -> str: 

53 """:return: A string suitable to be placed in a reflog file.""" 

54 act = self.actor 

55 time = self.time 

56 return "{} {} {} <{}> {!s} {}\t{}\n".format( 

57 self.oldhexsha, 

58 self.newhexsha, 

59 act.name, 

60 act.email, 

61 time[0], 

62 altz_to_utctz_str(time[1]), 

63 self.message, 

64 ) 

65 

66 @property 

67 def oldhexsha(self) -> str: 

68 """The hexsha to the commit the ref pointed to before the change.""" 

69 return self[0] 

70 

71 @property 

72 def newhexsha(self) -> str: 

73 """The hexsha to the commit the ref now points to, after the change.""" 

74 return self[1] 

75 

76 @property 

77 def actor(self) -> Actor: 

78 """Actor instance, providing access.""" 

79 return self[2] 

80 

81 @property 

82 def time(self) -> Tuple[int, int]: 

83 """Time as tuple: 

84 

85 * [0] = ``int(time)`` 

86 * [1] = ``int(timezone_offset)`` in :attr:`time.altzone` format 

87 """ 

88 return self[3] 

89 

90 @property 

91 def message(self) -> str: 

92 """Message describing the operation that acted on the reference.""" 

93 return self[4] 

94 

95 @classmethod 

96 def new( 

97 cls, 

98 oldhexsha: str, 

99 newhexsha: str, 

100 actor: Actor, 

101 time: int, 

102 tz_offset: int, 

103 message: str, 

104 ) -> "RefLogEntry": # skipcq: PYL-W0621 

105 """:return: New instance of a :class:`RefLogEntry`""" 

106 if not isinstance(actor, Actor): 

107 raise ValueError("Need actor instance, got %s" % actor) 

108 # END check types 

109 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message)) 

110 

111 @classmethod 

112 def from_line(cls, line: bytes) -> "RefLogEntry": 

113 """:return: New :class:`RefLogEntry` instance from the given revlog line. 

114 

115 :param line: 

116 Line bytes without trailing newline 

117 

118 :raise ValueError: 

119 If `line` could not be parsed. 

120 """ 

121 line_str = line.decode(defenc) 

122 fields = line_str.split("\t", 1) 

123 if len(fields) == 1: 

124 info, msg = fields[0], None 

125 elif len(fields) == 2: 

126 info, msg = fields 

127 else: 

128 raise ValueError("Line must have up to two TAB-separated fields. Got %s" % repr(line_str)) 

129 # END handle first split 

130 

131 oldhexsha = info[:40] 

132 newhexsha = info[41:81] 

133 for hexsha in (oldhexsha, newhexsha): 

134 if not cls._re_hexsha_only.match(hexsha): 

135 raise ValueError("Invalid hexsha: %r" % (hexsha,)) 

136 # END if hexsha re doesn't match 

137 # END for each hexsha 

138 

139 email_end = info.find(">", 82) 

140 if email_end == -1: 

141 raise ValueError("Missing token: >") 

142 # END handle missing end brace 

143 

144 actor = Actor._from_string(info[82 : email_end + 1]) 

145 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621 

146 

147 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) # type: ignore [arg-type] 

148 

149 

150class RefLog(List[RefLogEntry], Serializable): 

151 R"""A reflog contains :class:`RefLogEntry`\s, each of which defines a certain state 

152 of the head in question. Custom query methods allow to retrieve log entries by date 

153 or by other criteria. 

154 

155 Reflog entries are ordered. The first added entry is first in the list. The last 

156 entry, i.e. the last change of the head or reference, is last in the list. 

157 """ 

158 

159 __slots__ = ("_path",) 

160 

161 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog": 

162 inst = super().__new__(cls) 

163 return inst 

164 

165 def __init__(self, filepath: Union[PathLike, None] = None) -> None: 

166 """Initialize this instance with an optional filepath, from which we will 

167 initialize our data. The path is also used to write changes back using the 

168 :meth:`write` method.""" 

169 self._path = filepath 

170 if filepath is not None: 

171 self._read_from_file() 

172 # END handle filepath 

173 

174 def _read_from_file(self) -> None: 

175 try: 

176 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True) 

177 except OSError: 

178 # It is possible and allowed that the file doesn't exist! 

179 return 

180 # END handle invalid log 

181 

182 try: 

183 self._deserialize(fmap) 

184 finally: 

185 fmap.close() 

186 # END handle closing of handle 

187 

188 # { Interface 

189 

190 @classmethod 

191 def from_file(cls, filepath: PathLike) -> "RefLog": 

192 """ 

193 :return: 

194 A new :class:`RefLog` instance containing all entries from the reflog at the 

195 given `filepath`. 

196 

197 :param filepath: 

198 Path to reflog. 

199 

200 :raise ValueError: 

201 If the file could not be read or was corrupted in some way. 

202 """ 

203 return cls(filepath) 

204 

205 @classmethod 

206 def path(cls, ref: "SymbolicReference") -> str: 

207 """ 

208 :return: 

209 String to absolute path at which the reflog of the given ref instance would 

210 be found. The path is not guaranteed to point to a valid file though. 

211 

212 :param ref: 

213 :class:`~git.refs.symbolic.SymbolicReference` instance 

214 

215 :raise ValueError: 

216 If `ref.path` is invalid or escapes the repository's reflog directory. 

217 """ 

218 return to_native_path(ref._get_validated_reflog_path(ref.repo, ref.path)) 

219 

220 @classmethod 

221 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]: 

222 """ 

223 :return: 

224 Iterator yielding :class:`RefLogEntry` instances, one for each line read 

225 from the given stream. 

226 

227 :param stream: 

228 File-like object containing the revlog in its native format or string 

229 instance pointing to a file to read. 

230 """ 

231 new_entry = RefLogEntry.from_line 

232 if isinstance(stream, str): 

233 # Default args return mmap since Python 3. 

234 _stream = file_contents_ro_filepath(stream) 

235 assert isinstance(_stream, mmap) 

236 else: 

237 _stream = stream 

238 # END handle stream type 

239 while True: 

240 line = _stream.readline() 

241 if not line: 

242 return 

243 yield new_entry(line.strip()) 

244 # END endless loop 

245 

246 @classmethod 

247 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": 

248 """ 

249 :return: 

250 :class:`RefLogEntry` at the given index. 

251 

252 :param filepath: 

253 Full path to the index file from which to read the entry. 

254 

255 :param index: 

256 Python list compatible index, i.e. it may be negative to specify an entry 

257 counted from the end of the list. 

258 

259 :raise IndexError: 

260 If the entry didn't exist. 

261 

262 :note: 

263 This method is faster as it only parses the entry at index, skipping all 

264 other lines. Nonetheless, the whole file has to be read if the index is 

265 negative. 

266 """ 

267 with open(filepath, "rb") as fp: 

268 if index < 0: 

269 return RefLogEntry.from_line(fp.readlines()[index].strip()) 

270 # Read until index is reached. 

271 

272 for i in range(index + 1): 

273 line = fp.readline() 

274 if not line: 

275 raise IndexError(f"Index file ended at line {i + 1}, before given index was reached") 

276 # END abort on eof 

277 # END handle runup 

278 

279 return RefLogEntry.from_line(line.strip()) 

280 # END handle index 

281 

282 def to_file(self, filepath: PathLike) -> None: 

283 """Write the contents of the reflog instance to a file at the given filepath. 

284 

285 :param filepath: 

286 Path to file. Parent directories are assumed to exist. 

287 """ 

288 lfd = LockedFD(filepath) 

289 assure_directory_exists(filepath, is_file=True) 

290 

291 fp = lfd.open(write=True, stream=True) 

292 try: 

293 self._serialize(fp) 

294 lfd.commit() 

295 except BaseException: 

296 lfd.rollback() 

297 raise 

298 # END handle change 

299 

300 @classmethod 

301 def append_entry( 

302 cls, 

303 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None], 

304 filepath: PathLike, 

305 oldbinsha: bytes, 

306 newbinsha: bytes, 

307 message: str, 

308 write: bool = True, 

309 ) -> "RefLogEntry": 

310 """Append a new log entry to the revlog at filepath. 

311 

312 :param config_reader: 

313 Configuration reader of the repository - used to obtain user information. 

314 May also be an :class:`~git.util.Actor` instance identifying the committer 

315 directly or ``None``. 

316 

317 :param filepath: 

318 Full path to the log file. 

319 

320 :param oldbinsha: 

321 Binary sha of the previous commit. 

322 

323 :param newbinsha: 

324 Binary sha of the current commit. 

325 

326 :param message: 

327 Message describing the change to the reference. 

328 

329 :param write: 

330 If ``True``, the changes will be written right away. 

331 Otherwise the change will not be written. 

332 

333 :return: 

334 :class:`RefLogEntry` objects which was appended to the log. 

335 

336 :note: 

337 As we are append-only, concurrent access is not a problem as we do not 

338 interfere with readers. 

339 """ 

340 

341 if len(oldbinsha) != 20 or len(newbinsha) != 20: 

342 raise ValueError("Shas need to be given in binary format") 

343 # END handle sha type 

344 assure_directory_exists(filepath, is_file=True) 

345 first_line = message.split("\n")[0] 

346 if isinstance(config_reader, Actor): 

347 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why? 

348 else: 

349 committer = Actor.committer(config_reader) 

350 entry = RefLogEntry( 

351 ( 

352 bin_to_hex(oldbinsha).decode("ascii"), 

353 bin_to_hex(newbinsha).decode("ascii"), 

354 committer, 

355 (int(_time.time()), _time.altzone), 

356 first_line, 

357 ) 

358 ) 

359 

360 if write: 

361 lf = LockFile(filepath) 

362 lf._obtain_lock_or_raise() 

363 fd = open(filepath, "ab") 

364 try: 

365 fd.write(entry.format().encode(defenc)) 

366 finally: 

367 fd.close() 

368 lf._release_lock() 

369 # END handle write operation 

370 return entry 

371 

372 def write(self) -> "RefLog": 

373 """Write this instance's data to the file we are originating from. 

374 

375 :return: 

376 self 

377 """ 

378 if self._path is None: 

379 raise ValueError("Instance was not initialized with a path, use to_file(...) instead") 

380 # END assert path 

381 self.to_file(self._path) 

382 return self 

383 

384 # } END interface 

385 

386 # { Serializable Interface 

387 

388 def _serialize(self, stream: "BytesIO") -> "RefLog": 

389 write = stream.write 

390 

391 # Write all entries. 

392 for e in self: 

393 write(e.format().encode(defenc)) 

394 # END for each entry 

395 return self 

396 

397 def _deserialize(self, stream: "BytesIO") -> "RefLog": 

398 self.extend(self.iter_entries(stream)) 

399 return self 

400 

401 # } END serializable interface