Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

416 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File logging handler for tasks.""" 

19 

20from __future__ import annotations 

21 

22import heapq 

23import io 

24import logging 

25import os 

26from collections.abc import Callable, Generator, Iterator 

27from contextlib import suppress 

28from datetime import datetime 

29from enum import Enum 

30from itertools import chain, islice 

31from pathlib import Path 

32from types import GeneratorType 

33from typing import IO, TYPE_CHECKING, TypedDict, cast 

34from urllib.parse import urljoin 

35 

36import pendulum 

37from pydantic import BaseModel, ConfigDict, ValidationError 

38from typing_extensions import NotRequired 

39 

40from airflow.configuration import conf 

41from airflow.executors.executor_loader import ExecutorLoader 

42from airflow.utils.helpers import parse_template_string, render_template 

43from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator 

44from airflow.utils.log.logging_mixin import SetContextPropagate 

45from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler 

46from airflow.utils.session import NEW_SESSION, provide_session 

47from airflow.utils.state import State, TaskInstanceState 

48 

49if TYPE_CHECKING: 

50 from typing import TypeAlias 

51 

52 from requests import Response 

53 

54 from airflow.executors.base_executor import BaseExecutor 

55 from airflow.models.taskinstance import TaskInstance 

56 from airflow.models.taskinstancehistory import TaskInstanceHistory 

57 

58CHUNK_SIZE = 1024 * 1024 * 5 # 5MB 

59DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1) 

60DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000) 

61SORT_KEY_OFFSET = 10000000 

62"""An offset used by the _create_sort_key utility. 

63 

64Assuming 50 characters per line, an offset of 10,000,000 can represent approximately 500 MB of file data, which is sufficient for use as a constant. 

65""" 

66HEAP_DUMP_SIZE = 5000 

67HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2 

68 

69# These types are similar, but have distinct names to make processing them less error prone 

70LogMessages: TypeAlias = list[str] 

71"""The legacy format of log messages before 3.0.4""" 

72LogSourceInfo: TypeAlias = list[str] 

73"""Information _about_ the log fetching process for display to a user""" 

74RawLogStream: TypeAlias = Generator[str, None, None] 

75"""Raw log stream, containing unparsed log lines.""" 

76LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] 

77"""Legacy log response, containing source information and log messages.""" 

78StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] 

79"""Streaming log response, containing source information, stream of log lines.""" 

80StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None] 

81"""Structured log stream, containing structured log messages.""" 

82LogHandlerOutputStream: TypeAlias = ( 

83 StructuredLogStream | Iterator["StructuredLogMessage"] | chain["StructuredLogMessage"] 

84) 

85"""Output stream, containing structured log messages or a chain of them.""" 

86ParsedLog: TypeAlias = tuple[datetime | None, int, "StructuredLogMessage"] 

87"""Parsed log record, containing timestamp, line_num and the structured log message.""" 

88ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None] 

89LegacyProvidersLogType: TypeAlias = list["StructuredLogMessage"] | str | list[str] 

90"""Return type used by legacy `_read` methods for Alibaba Cloud, Elasticsearch, OpenSearch, and Redis log handlers. 

91 

92- For Elasticsearch and OpenSearch: returns either a list of structured log messages. 

93- For Alibaba Cloud: returns a string. 

94- For Redis: returns a list of strings. 

95""" 

96 

97 

98logger = logging.getLogger(__name__) 

99 

100 

101class LogMetadata(TypedDict): 

102 """Metadata about the log fetching process, including `end_of_log` and `log_pos`.""" 

103 

104 end_of_log: bool 

105 log_pos: NotRequired[int] 

106 # the following attributes are used for Elasticsearch and OpenSearch log handlers 

107 offset: NotRequired[str | int] 

108 # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly 

109 # on the client. Sending as a string prevents this issue. 

110 # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER 

111 last_log_timestamp: NotRequired[str] 

112 max_offset: NotRequired[str] 

113 

114 

115class StructuredLogMessage(BaseModel): 

116 """An individual log message.""" 

117 

118 timestamp: datetime | None = None 

119 event: str 

120 

121 # Collisions of sort_key may occur due to duplicated messages. If this happens, the heap will use the second element, 

122 # which is the StructuredLogMessage for comparison. Therefore, we need to define a comparator for it. 

123 def __lt__(self, other: StructuredLogMessage) -> bool: 

124 return self.sort_key < other.sort_key 

125 

126 @property 

127 def sort_key(self) -> datetime: 

128 return self.timestamp or DEFAULT_SORT_DATETIME 

129 

130 # We don't need to cache string when parsing in to this, as almost every line will have a different 

131 # values; `extra=allow` means we'll create extra properties as needed. Only timestamp and event are 

132 # required, everything else is up to what ever is producing the logs 

133 model_config = ConfigDict(cache_strings=False, extra="allow") 

134 

135 

136class LogType(str, Enum): 

137 """ 

138 Type of service from which we retrieve logs. 

139 

140 :meta private: 

141 """ 

142 

143 TRIGGER = "trigger" 

144 WORKER = "worker" 

145 

146 

147def _set_task_deferred_context_var(): 

148 """ 

149 Tell task log handler that task exited with deferral. 

150 

151 This exists for the sole purpose of telling elasticsearch handler not to 

152 emit end_of_log mark after task deferral. 

153 

154 Depending on how the task is run, we may need to set this in task command or in local task job. 

155 Kubernetes executor requires the local task job invocation; local executor requires the task 

156 command invocation. 

157 

158 :meta private: 

159 """ 

160 logger = logging.getLogger() 

161 with suppress(StopIteration): 

162 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred")) 

163 h.ctx_task_deferred = True 

164 

165 

166def _fetch_logs_from_service(url: str, log_relative_path: str) -> Response: 

167 # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 

168 import requests 

169 

170 from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_key 

171 

172 timeout = conf.getint("api", "log_fetch_timeout_sec", fallback=None) 

173 generator = JWTGenerator( 

174 secret_key=get_signing_key("api", "secret_key"), 

175 # Since we are using a secret key, we need to be explicit about the algorithm here too 

176 algorithm="HS512", 

177 # We must set an empty private key here as otherwise it can be automatically loaded by JWTGenerator 

178 # and secret_key and private_key cannot be set together 

179 private_key=None, # type: ignore[arg-type] 

180 issuer=None, 

181 valid_for=conf.getint("webserver", "log_request_clock_grace", fallback=30), 

182 audience="task-instance-logs", 

183 ) 

184 response = requests.get( 

185 url, 

186 timeout=timeout, 

187 headers={"Authorization": generator.generate({"filename": log_relative_path})}, 

188 stream=True, 

189 ) 

190 response.encoding = "utf-8" 

191 return response 

192 

193 

194_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None) 

195 

196if not _parse_timestamp: 

197 

198 def _parse_timestamp(line: str): 

199 timestamp_str, _ = line.split(" ", 1) 

200 return pendulum.parse(timestamp_str.strip("[]")) 

201 

202 

203def _stream_lines_by_chunk( 

204 log_io: IO[str], 

205) -> RawLogStream: 

206 """ 

207 Stream lines from a file-like IO object. 

208 

209 :param log_io: A file-like IO object to read from. 

210 :return: A generator that yields individual lines within the specified range. 

211 """ 

212 # Skip processing if file is already closed 

213 if log_io.closed: 

214 return 

215 

216 # Seek to beginning if possible 

217 if log_io.seekable(): 

218 try: 

219 log_io.seek(0) 

220 except Exception as e: 

221 logger.error("Error seeking in log stream: %s", e) 

222 return 

223 

224 buffer = "" 

225 while True: 

226 # Check if file is already closed 

227 if log_io.closed: 

228 break 

229 

230 try: 

231 chunk = log_io.read(CHUNK_SIZE) 

232 except Exception as e: 

233 logger.error("Error reading log stream: %s", e) 

234 break 

235 

236 if not chunk: 

237 break 

238 

239 buffer += chunk 

240 *lines, buffer = buffer.split("\n") 

241 yield from lines 

242 

243 if buffer: 

244 yield from buffer.split("\n") 

245 

246 

247def _log_stream_to_parsed_log_stream( 

248 log_stream: RawLogStream, 

249) -> ParsedLogStream: 

250 """ 

251 Turn a str log stream into a generator of parsed log lines. 

252 

253 :param log_stream: The stream to parse. 

254 :return: A generator of parsed log lines. 

255 """ 

256 from airflow._shared.timezones.timezone import coerce_datetime 

257 

258 timestamp = None 

259 next_timestamp = None 

260 idx = 0 

261 for line in log_stream: 

262 if line: 

263 try: 

264 log = StructuredLogMessage.model_validate_json(line) 

265 except ValidationError: 

266 with suppress(Exception): 

267 # If we can't parse the timestamp, don't attach one to the row 

268 if isinstance(line, str): 

269 next_timestamp = _parse_timestamp(line) 

270 log = StructuredLogMessage(event=str(line), timestamp=next_timestamp) 

271 if log.timestamp: 

272 log.timestamp = coerce_datetime(log.timestamp) 

273 timestamp = log.timestamp 

274 yield timestamp, idx, log 

275 idx += 1 

276 

277 

278def _create_sort_key(timestamp: datetime | None, line_num: int) -> int: 

279 """ 

280 Create a sort key for log record, to be used in K-way merge. 

281 

282 :param timestamp: timestamp of the log line 

283 :param line_num: line number of the log line 

284 :return: a integer as sort key to avoid overhead of memory usage 

285 """ 

286 return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * SORT_KEY_OFFSET + line_num 

287 

288 

289def _is_sort_key_with_default_timestamp(sort_key: int) -> bool: 

290 """ 

291 Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP. 

292 

293 This is used to identify log records that don't have timestamp. 

294 

295 :param sort_key: The sort key to check 

296 :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, False otherwise 

297 """ 

298 # Extract the timestamp part from the sort key (remove the line number part) 

299 timestamp_part = sort_key // SORT_KEY_OFFSET 

300 return timestamp_part == DEFAULT_SORT_TIMESTAMP 

301 

302 

303def _add_log_from_parsed_log_streams_to_heap( 

304 heap: list[tuple[int, StructuredLogMessage]], 

305 parsed_log_streams: dict[int, ParsedLogStream], 

306) -> None: 

307 """ 

308 Add one log record from each parsed log stream to the heap, and will remove empty log stream from the dict after iterating. 

309 

310 :param heap: heap to store log records 

311 :param parsed_log_streams: dict of parsed log streams 

312 """ 

313 # We intend to initialize the list lazily, as in most cases we don't need to remove any log streams. 

314 # This reduces memory overhead, since this function is called repeatedly until all log streams are empty. 

315 log_stream_to_remove: list[int] | None = None 

316 for idx, log_stream in parsed_log_streams.items(): 

317 record: ParsedLog | None = next(log_stream, None) 

318 if record is None: 

319 if log_stream_to_remove is None: 

320 log_stream_to_remove = [] 

321 log_stream_to_remove.append(idx) 

322 continue 

323 timestamp, line_num, line = record 

324 # take int as sort key to avoid overhead of memory usage 

325 heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line)) 

326 # remove empty log stream from the dict 

327 if log_stream_to_remove is not None: 

328 for idx in log_stream_to_remove: 

329 del parsed_log_streams[idx] 

330 

331 

332def _flush_logs_out_of_heap( 

333 heap: list[tuple[int, StructuredLogMessage]], 

334 flush_size: int, 

335 last_log_container: list[StructuredLogMessage | None], 

336) -> Generator[StructuredLogMessage, None, None]: 

337 """ 

338 Flush logs out of the heap, deduplicating them based on the last log. 

339 

340 :param heap: heap to flush logs from 

341 :param flush_size: number of logs to flush 

342 :param last_log_container: a container to store the last log, to avoid duplicate logs 

343 :return: a generator that yields deduplicated logs 

344 """ 

345 last_log = last_log_container[0] 

346 for _ in range(flush_size): 

347 sort_key, line = heapq.heappop(heap) 

348 if line != last_log or _is_sort_key_with_default_timestamp(sort_key): # dedupe 

349 yield line 

350 last_log = line 

351 # update the last log container with the last log 

352 last_log_container[0] = last_log 

353 

354 

355def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream: 

356 """ 

357 Merge parsed log streams using K-way merge. 

358 

359 By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we can reduce the chance of messing up the global order. 

360 Since there are multiple log streams, we can't guarantee that the records are in global order. 

361 

362 e.g. 

363 

364 log_stream1: ---------- 

365 log_stream2: ---- 

366 log_stream3: -------- 

367 

368 The first record of log_stream3 is later than the fourth record of log_stream1 ! 

369 :param parsed_log_streams: parsed log streams 

370 :return: interleaved log stream 

371 """ 

372 # don't need to push whole tuple into heap, which increases too much overhead 

373 # push only sort_key and line into heap 

374 heap: list[tuple[int, StructuredLogMessage]] = [] 

375 # to allow removing empty streams while iterating, also turn the str stream into parsed log stream 

376 parsed_log_streams: dict[int, ParsedLogStream] = { 

377 idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream in enumerate(log_streams) 

378 } 

379 

380 # keep adding records from logs until all logs are empty 

381 last_log_container: list[StructuredLogMessage | None] = [None] 

382 while parsed_log_streams: 

383 _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams) 

384 

385 # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE 

386 if len(heap) >= HEAP_DUMP_SIZE: 

387 yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE, last_log_container) 

388 

389 # yield remaining records 

390 yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container) 

391 # free memory 

392 del heap 

393 del parsed_log_streams 

394 

395 

396def _is_logs_stream_like(log) -> bool: 

397 """Check if the logs are stream-like.""" 

398 return isinstance(log, (chain, GeneratorType)) 

399 

400 

401def _get_compatible_log_stream( 

402 log_messages: LogMessages, 

403) -> RawLogStream: 

404 """ 

405 Convert legacy log message blobs into a generator that yields log lines. 

406 

407 :param log_messages: List of legacy log message strings. 

408 :return: A generator that yields interleaved log lines. 

409 """ 

410 yield from chain.from_iterable( 

411 _stream_lines_by_chunk(io.StringIO(log_message)) for log_message in log_messages 

412 ) 

413 

414 

415class FileTaskHandler(logging.Handler): 

416 """ 

417 FileTaskHandler is a python log handler that handles and reads task instance logs. 

418 

419 It creates and delegates log handling to `logging.FileHandler` after receiving task 

420 instance context. It reads logs from task instance's host machine. 

421 

422 :param base_log_folder: Base log folder to place logs. 

423 :param max_bytes: max bytes size for the log file 

424 :param backup_count: backup file count for the log file 

425 :param delay: default False -> StreamHandler, True -> Handler 

426 """ 

427 

428 trigger_should_wrap = True 

429 inherits_from_empty_operator_log_message = ( 

430 "Operator inherits from empty operator and thus does not have logs" 

431 ) 

432 executor_instances: dict[str, BaseExecutor] = {} 

433 DEFAULT_EXECUTOR_KEY = "_default_executor" 

434 

435 def __init__( 

436 self, 

437 base_log_folder: str, 

438 max_bytes: int = 0, 

439 backup_count: int = 0, 

440 delay: bool = False, 

441 ): 

442 super().__init__() 

443 self.handler: logging.Handler | None = None 

444 self.local_base = base_log_folder 

445 self.maintain_propagate: bool = False 

446 self.max_bytes = max_bytes 

447 self.backup_count = backup_count 

448 self.delay = delay 

449 """ 

450 If true, overrides default behavior of setting propagate=False 

451 

452 :meta private: 

453 """ 

454 

455 self.ctx_task_deferred = False 

456 """ 

457 If true, task exited with deferral to trigger. 

458 

459 Some handlers emit "end of log" markers, and may not wish to do so when task defers. 

460 """ 

461 

462 def set_context( 

463 self, ti: TaskInstance | TaskInstanceHistory, *, identifier: str | None = None 

464 ) -> None | SetContextPropagate: 

465 """ 

466 Provide task_instance context to airflow task handler. 

467 

468 Generally speaking returns None. But if attr `maintain_propagate` has 

469 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This 

470 has the effect of overriding the default behavior to set `propagate` 

471 to False whenever set_context is called. At time of writing, this 

472 functionality is only used in unit testing. 

473 

474 :param ti: task instance object 

475 :param identifier: if set, adds suffix to log file. For use when relaying exceptional messages 

476 to task logs from a context other than task or trigger run 

477 """ 

478 local_loc = self._init_file(ti, identifier=identifier) 

479 self.handler = NonCachingRotatingFileHandler( 

480 local_loc, 

481 encoding="utf-8", 

482 maxBytes=self.max_bytes, 

483 backupCount=self.backup_count, 

484 delay=self.delay, 

485 ) 

486 if self.formatter: 

487 self.handler.setFormatter(self.formatter) 

488 self.handler.setLevel(self.level) 

489 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None 

490 

491 @staticmethod 

492 def add_triggerer_suffix(full_path, job_id=None): 

493 """ 

494 Derive trigger log filename from task log filename. 

495 

496 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123 

497 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish 

498 the files because, rarely, the same trigger could get picked up by two different 

499 triggerer instances. 

500 """ 

501 full_path = Path(full_path).as_posix() 

502 full_path += f".{LogType.TRIGGER.value}" 

503 if job_id: 

504 full_path += f".{job_id}.log" 

505 return full_path 

506 

507 def emit(self, record): 

508 if self.handler: 

509 self.handler.emit(record) 

510 

511 def flush(self): 

512 if self.handler: 

513 self.handler.flush() 

514 

515 def close(self): 

516 if self.handler: 

517 self.handler.close() 

518 

519 @provide_session 

520 def _render_filename( 

521 self, ti: TaskInstance | TaskInstanceHistory, try_number: int, session=NEW_SESSION 

522 ) -> str: 

523 """Return the worker log filename.""" 

524 dag_run = ti.get_dagrun(session=session) 

525 

526 date = dag_run.logical_date or dag_run.run_after 

527 formatted_date = date.isoformat() 

528 

529 template = dag_run.get_log_template(session=session).filename 

530 str_tpl, jinja_tpl = parse_template_string(template) 

531 if jinja_tpl: 

532 return render_template( 

533 jinja_tpl, {"ti": ti, "ts": formatted_date, "try_number": try_number}, native=False 

534 ) 

535 

536 if str_tpl: 

537 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) 

538 if data_interval[0]: 

539 data_interval_start = data_interval[0].isoformat() 

540 else: 

541 data_interval_start = "" 

542 if data_interval[1]: 

543 data_interval_end = data_interval[1].isoformat() 

544 else: 

545 data_interval_end = "" 

546 return str_tpl.format( 

547 dag_id=ti.dag_id, 

548 task_id=ti.task_id, 

549 run_id=ti.run_id, 

550 data_interval_start=data_interval_start, 

551 data_interval_end=data_interval_end, 

552 logical_date=formatted_date, 

553 try_number=try_number, 

554 ) 

555 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") 

556 

557 def _get_executor_get_task_log( 

558 self, ti: TaskInstance | TaskInstanceHistory 

559 ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]]: 

560 """ 

561 Get the get_task_log method from executor of current task instance. 

562 

563 Since there might be multiple executors, so we need to get the executor of current task instance instead of getting from default executor. 

564 

565 :param ti: task instance object 

566 :return: get_task_log method of the executor 

567 """ 

568 executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY 

569 executor = self.executor_instances.get(executor_name) 

570 if executor is not None: 

571 return executor.get_task_log 

572 

573 if executor_name == self.DEFAULT_EXECUTOR_KEY: 

574 self.executor_instances[executor_name] = ExecutorLoader.get_default_executor() 

575 else: 

576 self.executor_instances[executor_name] = ExecutorLoader.load_executor(executor_name) 

577 return self.executor_instances[executor_name].get_task_log 

578 

579 def _read( 

580 self, 

581 ti: TaskInstance | TaskInstanceHistory, 

582 try_number: int, 

583 metadata: LogMetadata | None = None, 

584 ) -> tuple[LogHandlerOutputStream | LegacyProvidersLogType, LogMetadata]: 

585 """ 

586 Template method that contains custom logic of reading logs given the try_number. 

587 

588 :param ti: task instance record 

589 :param try_number: current try_number to read log from 

590 :param metadata: log metadata, 

591 can be used for steaming log reading and auto-tailing. 

592 Following attributes are used: 

593 log_pos: (absolute) Char position to which the log 

594 which was retrieved in previous calls, this 

595 part will be skipped and only following test 

596 returned to be added to tail. 

597 :return: log message as a string and metadata. 

598 Following attributes are used in metadata: 

599 end_of_log: Boolean, True if end of log is reached or False 

600 if further calls might get more log text. 

601 This is determined by the status of the TaskInstance 

602 log_pos: (absolute) Char position to which the log is retrieved 

603 """ 

604 # Task instance here might be different from task instance when 

605 # initializing the handler. Thus explicitly getting log location 

606 # is needed to get correct log path. 

607 worker_log_rel_path = self._render_filename(ti, try_number) 

608 sources: LogSourceInfo = [] 

609 source_list: list[str] = [] 

610 remote_logs: list[RawLogStream] = [] 

611 local_logs: list[RawLogStream] = [] 

612 executor_logs: list[RawLogStream] = [] 

613 served_logs: list[RawLogStream] = [] 

614 with suppress(NotImplementedError): 

615 sources, logs = self._read_remote_logs(ti, try_number, metadata) 

616 if not logs: 

617 remote_logs = [] 

618 elif isinstance(logs, list) and isinstance(logs[0], str): 

619 # If the logs are in legacy format, convert them to a generator of log lines 

620 remote_logs = [ 

621 # We don't need to use the log_pos here, as we are using the metadata to track the position 

622 _get_compatible_log_stream(cast("list[str]", logs)) 

623 ] 

624 elif isinstance(logs, list) and _is_logs_stream_like(logs[0]): 

625 # If the logs are already in a stream-like format, we can use them directly 

626 remote_logs = cast("list[RawLogStream]", logs) 

627 else: 

628 # If the logs are in a different format, raise an error 

629 raise TypeError("Logs should be either a list of strings or a generator of log lines.") 

630 # Extend LogSourceInfo 

631 source_list.extend(sources) 

632 has_k8s_exec_pod = False 

633 if ti.state == TaskInstanceState.RUNNING: 

634 executor_get_task_log = self._get_executor_get_task_log(ti) 

635 response = executor_get_task_log(ti, try_number) 

636 if response: 

637 sources, logs = response 

638 # make the logs stream-like compatible 

639 executor_logs = [_get_compatible_log_stream(logs)] 

640 if sources: 

641 source_list.extend(sources) 

642 has_k8s_exec_pod = True 

643 if not (remote_logs and ti.state not in State.unfinished): 

644 # when finished, if we have remote logs, no need to check local 

645 worker_log_full_path = Path(self.local_base, worker_log_rel_path) 

646 sources, local_logs = self._read_from_local(worker_log_full_path) 

647 source_list.extend(sources) 

648 if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: 

649 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

650 source_list.extend(sources) 

651 elif ti.state not in State.unfinished and not (local_logs or remote_logs): 

652 # ordinarily we don't check served logs, with the assumption that users set up 

653 # remote logging or shared drive for logs for persistence, but that's not always true 

654 # so even if task is done, if no local logs or remote logs are found, we'll check the worker 

655 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

656 source_list.extend(sources) 

657 

658 out_stream: LogHandlerOutputStream = _interleave_logs( 

659 *local_logs, 

660 *remote_logs, 

661 *executor_logs, 

662 *served_logs, 

663 ) 

664 

665 # Log message source details are grouped: they are not relevant for most users and can 

666 # distract them from finding the root cause of their errors 

667 header = [ 

668 StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg] 

669 StructuredLogMessage(event="::endgroup::"), 

670 ] 

671 end_of_log = ti.try_number != try_number or ti.state not in ( 

672 TaskInstanceState.RUNNING, 

673 TaskInstanceState.DEFERRED, 

674 ) 

675 

676 with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as stream_accumulator: 

677 log_pos = stream_accumulator.total_lines 

678 out_stream = stream_accumulator.stream 

679 

680 # skip log stream until the last position 

681 if metadata and "log_pos" in metadata: 

682 islice(out_stream, metadata["log_pos"]) 

683 else: 

684 # first time reading log, add messages before interleaved log stream 

685 out_stream = chain(header, out_stream) 

686 

687 return out_stream, { 

688 "end_of_log": end_of_log, 

689 "log_pos": log_pos, 

690 } 

691 

692 @staticmethod 

693 @staticmethod 

694 def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory): 

695 pod_override = getattr(ti.executor_config, "pod_override", None) 

696 metadata = getattr(pod_override, "metadata", None) 

697 namespace = None 

698 with suppress(Exception): 

699 namespace = getattr(metadata, "namespace", None) 

700 return namespace or conf.get("kubernetes_executor", "namespace") 

701 

702 def _get_log_retrieval_url( 

703 self, 

704 ti: TaskInstance | TaskInstanceHistory, 

705 log_relative_path: str, 

706 log_type: LogType | None = None, 

707 ) -> tuple[str, str]: 

708 """Given TI, generate URL with which to fetch logs from service log server.""" 

709 if log_type == LogType.TRIGGER: 

710 if not ti.triggerer_job: 

711 raise RuntimeError("Could not build triggerer log URL; no triggerer job.") 

712 config_key = "triggerer_log_server_port" 

713 config_default = 8794 

714 hostname = ti.triggerer_job.hostname 

715 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id) 

716 else: 

717 hostname = ti.hostname 

718 config_key = "worker_log_server_port" 

719 config_default = 8793 

720 return ( 

721 urljoin( 

722 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", 

723 log_relative_path, 

724 ), 

725 log_relative_path, 

726 ) 

727 

728 def read( 

729 self, 

730 task_instance: TaskInstance | TaskInstanceHistory, 

731 try_number: int | None = None, 

732 metadata: LogMetadata | None = None, 

733 ) -> tuple[LogHandlerOutputStream, LogMetadata]: 

734 """ 

735 Read logs of given task instance from local machine. 

736 

737 :param task_instance: task instance object 

738 :param try_number: task instance try_number to read logs from. If None 

739 it returns the log of task_instance.try_number 

740 :param metadata: log metadata, can be used for steaming log reading and auto-tailing. 

741 :return: a list of listed tuples which order log string by host 

742 """ 

743 if try_number is None: 

744 try_number = task_instance.try_number 

745 

746 if try_number == 0 and task_instance.state in ( 

747 TaskInstanceState.SKIPPED, 

748 TaskInstanceState.UPSTREAM_FAILED, 

749 ): 

750 logs = [StructuredLogMessage(event="Task was skipped, no logs available.")] 

751 return chain(logs), {"end_of_log": True} 

752 

753 if try_number is None or try_number < 1: 

754 logs = [ 

755 StructuredLogMessage( # type: ignore[call-arg] 

756 level="error", event=f"Error fetching the logs. Try number {try_number} is invalid." 

757 ) 

758 ] 

759 return chain(logs), {"end_of_log": True} 

760 

761 # compatibility for es_task_handler and os_task_handler 

762 read_result = self._read(task_instance, try_number, metadata) 

763 out_stream, metadata = read_result 

764 # If the out_stream is None or empty, return the read result 

765 if not out_stream: 

766 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream) 

767 return out_stream, metadata 

768 

769 if _is_logs_stream_like(out_stream): 

770 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream) 

771 return out_stream, metadata 

772 if isinstance(out_stream, list) and isinstance(out_stream[0], StructuredLogMessage): 

773 out_stream = cast("list[StructuredLogMessage]", out_stream) 

774 return (log for log in out_stream), metadata 

775 if isinstance(out_stream, list) and isinstance(out_stream[0], str): 

776 # If the out_stream is a list of strings, convert it to a generator 

777 out_stream = cast("list[str]", out_stream) 

778 raw_stream = _stream_lines_by_chunk(io.StringIO("".join(out_stream))) 

779 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream)) 

780 return out_stream, metadata 

781 if isinstance(out_stream, str): 

782 # If the out_stream is a string, convert it to a generator 

783 raw_stream = _stream_lines_by_chunk(io.StringIO(out_stream)) 

784 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream)) 

785 return out_stream, metadata 

786 raise TypeError( 

787 "Invalid log stream type. Expected a generator of StructuredLogMessage, list of StructuredLogMessage, list of str or str." 

788 f" Got {type(out_stream).__name__} instead." 

789 f" Content type: {type(out_stream[0]).__name__ if isinstance(out_stream, (list, tuple)) and out_stream else 'empty'}" 

790 ) 

791 

792 @staticmethod 

793 def _prepare_log_folder(directory: Path, new_folder_permissions: int): 

794 """ 

795 Prepare the log folder and ensure its mode is as configured. 

796 

797 To handle log writing when tasks are impersonated, the log files need to 

798 be writable by the user that runs the Airflow command and the user 

799 that is impersonated. This is mainly to handle corner cases with the 

800 SubDagOperator. When the SubDagOperator is run, all of the operators 

801 run under the impersonated user and create appropriate log files 

802 as the impersonated user. However, if the user manually runs tasks 

803 of the SubDagOperator through the UI, then the log files are created 

804 by the user that runs the Airflow command. For example, the Airflow 

805 run command may be run by the `airflow_sudoable` user, but the Airflow 

806 tasks may be run by the `airflow` user. If the log files are not 

807 writable by both users, then it's possible that re-running a task 

808 via the UI (or vice versa) results in a permission error as the task 

809 tries to write to a log file created by the other user. 

810 

811 We leave it up to the user to manage their permissions by exposing configuration for both 

812 new folders and new log files. Default is to make new log folders and files group-writeable 

813 to handle most common impersonation use cases. The requirement in this case will be to make 

814 sure that the same group is set as default group for both - impersonated user and main airflow 

815 user. 

816 """ 

817 for parent in reversed(directory.parents): 

818 parent.mkdir(mode=new_folder_permissions, exist_ok=True) 

819 directory.mkdir(mode=new_folder_permissions, exist_ok=True) 

820 

821 def _init_file(self, ti, *, identifier: str | None = None): 

822 """ 

823 Create log directory and give it permissions that are configured. 

824 

825 See above _prepare_log_folder method for more detailed explanation. 

826 

827 :param ti: task instance object 

828 :return: relative log path of the given task instance 

829 """ 

830 new_file_permissions = int( 

831 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8 

832 ) 

833 local_relative_path = self._render_filename(ti, ti.try_number) 

834 full_path = os.path.join(self.local_base, local_relative_path) 

835 if identifier: 

836 full_path += f".{identifier}.log" 

837 elif ti.is_trigger_log_context is True: 

838 # if this is true, we're invoked via set_context in the context of 

839 # setting up individual trigger logging. return trigger log path. 

840 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id) 

841 new_folder_permissions = int( 

842 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 

843 ) 

844 self._prepare_log_folder(Path(full_path).parent, new_folder_permissions) 

845 

846 if not os.path.exists(full_path): 

847 open(full_path, "a").close() 

848 try: 

849 os.chmod(full_path, new_file_permissions) 

850 except OSError as e: 

851 logger.warning("OSError while changing ownership of the log file. ", e) 

852 

853 return full_path 

854 

855 @staticmethod 

856 def _read_from_local( 

857 worker_log_path: Path, 

858 ) -> StreamingLogResponse: 

859 sources: LogSourceInfo = [] 

860 log_streams: list[RawLogStream] = [] 

861 paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) 

862 if not paths: 

863 return sources, log_streams 

864 

865 for path in paths: 

866 sources.append(os.fspath(path)) 

867 # Read the log file and yield lines 

868 log_streams.append(_stream_lines_by_chunk(open(path, encoding="utf-8"))) 

869 return sources, log_streams 

870 

871 def _read_from_logs_server( 

872 self, 

873 ti: TaskInstance | TaskInstanceHistory, 

874 worker_log_rel_path: str, 

875 ) -> StreamingLogResponse: 

876 sources: LogSourceInfo = [] 

877 log_streams: list[RawLogStream] = [] 

878 try: 

879 log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER 

880 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) 

881 response = _fetch_logs_from_service(url, rel_path) 

882 if response.status_code == 403: 

883 sources.append( 

884 "!!!! Please make sure that all your Airflow components (e.g. " 

885 "schedulers, api-servers, dag-processors, workers and triggerer) have " 

886 "the same 'secret_key' configured in '[api]' section and " 

887 "time is synchronized on all your machines (for example with ntpd)\n" 

888 "See more at https://airflow.apache.org/docs/apache-airflow/" 

889 "stable/configurations-ref.html#secret-key" 

890 ) 

891 else: 

892 # Check if the resource was properly fetched 

893 response.raise_for_status() 

894 if int(response.headers.get("Content-Length", 0)) > 0: 

895 sources.append(url) 

896 log_streams.append( 

897 _stream_lines_by_chunk(io.TextIOWrapper(cast("IO[bytes]", response.raw))) 

898 ) 

899 except Exception as e: 

900 from requests.exceptions import InvalidURL 

901 

902 if ( 

903 isinstance(e, InvalidURL) 

904 and ti.task is not None 

905 and ti.task.inherits_from_empty_operator is True 

906 ): 

907 sources.append(self.inherits_from_empty_operator_log_message) 

908 else: 

909 sources.append(f"Could not read served logs: {e}") 

910 logger.exception("Could not read served logs") 

911 return sources, log_streams 

912 

913 def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse: 

914 """ 

915 Implement in subclasses to read from the remote service. 

916 

917 This method should return two lists, messages and logs. 

918 

919 * Each element in the messages list should be a single message, 

920 such as, "reading from x file". 

921 * Each element in the logs list should be the content of one file. 

922 """ 

923 remote_io = None 

924 try: 

925 from airflow.logging_config import REMOTE_TASK_LOG 

926 

927 remote_io = REMOTE_TASK_LOG 

928 except Exception: 

929 pass 

930 

931 if remote_io is None: 

932 # Import not found, or explicitly set to None 

933 raise NotImplementedError 

934 

935 # This living here is not really a good plan, but it just about works for now. 

936 # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. 

937 path = self._render_filename(ti, try_number) 

938 if stream_method := getattr(remote_io, "stream", None): 

939 # Use .stream interface if provider's RemoteIO supports it 

940 sources, logs = stream_method(path, ti) 

941 return sources, logs or [] 

942 # Fallback to .read interface 

943 sources, logs = remote_io.read(path, ti) 

944 return sources, logs or []