Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/log/file_task_handler.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

407 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File logging handler for tasks.""" 

19 

20from __future__ import annotations 

21 

22import heapq 

23import io 

24import logging 

25import os 

26from collections.abc import Callable, Generator, Iterator 

27from contextlib import suppress 

28from datetime import datetime 

29from enum import Enum 

30from itertools import chain, islice 

31from pathlib import Path 

32from types import GeneratorType 

33from typing import IO, TYPE_CHECKING, TypedDict, cast 

34from urllib.parse import urljoin 

35 

36import pendulum 

37from pydantic import BaseModel, ConfigDict, ValidationError 

38from typing_extensions import NotRequired 

39 

40from airflow.configuration import conf 

41from airflow.executors.executor_loader import ExecutorLoader 

42from airflow.utils.helpers import parse_template_string, render_template 

43from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator 

44from airflow.utils.log.logging_mixin import SetContextPropagate 

45from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler 

46from airflow.utils.session import NEW_SESSION, provide_session 

47from airflow.utils.state import State, TaskInstanceState 

48 

49if TYPE_CHECKING: 

50 from typing import TypeAlias 

51 

52 from requests import Response 

53 

54 from airflow._shared.logging.remote import ( 

55 LogMessages, 

56 LogResponse, 

57 LogSourceInfo, 

58 RawLogStream, 

59 StreamingLogResponse, 

60 ) 

61 from airflow.executors.base_executor import BaseExecutor 

62 from airflow.models.taskinstance import TaskInstance 

63 from airflow.models.taskinstancehistory import TaskInstanceHistory 

64 

65CHUNK_SIZE = 1024 * 1024 * 5 # 5MB 

66DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1) 

67DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000) 

68SORT_KEY_OFFSET = 10000000 

69"""An offset used by the _create_sort_key utility. 

70 

71Assuming 50 characters per line, an offset of 10,000,000 can represent approximately 500 MB of file data, which is sufficient for use as a constant. 

72""" 

73HEAP_DUMP_SIZE = 5000 

74HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2 

75 

76StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None] 

77"""Structured log stream, containing structured log messages.""" 

78LogHandlerOutputStream: TypeAlias = ( 

79 StructuredLogStream | Iterator["StructuredLogMessage"] | chain["StructuredLogMessage"] 

80) 

81"""Output stream, containing structured log messages or a chain of them.""" 

82ParsedLog: TypeAlias = tuple[datetime | None, int, "StructuredLogMessage"] 

83"""Parsed log record, containing timestamp, line_num and the structured log message.""" 

84ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None] 

85LegacyProvidersLogType: TypeAlias = list["StructuredLogMessage"] | str | list[str] 

86"""Return type used by legacy `_read` methods for Alibaba Cloud, Elasticsearch, OpenSearch, and Redis log handlers. 

87 

88- For Elasticsearch and OpenSearch: returns either a list of structured log messages. 

89- For Alibaba Cloud: returns a string. 

90- For Redis: returns a list of strings. 

91""" 

92 

93 

94logger = logging.getLogger(__name__) 

95 

96 

97class LogMetadata(TypedDict): 

98 """Metadata about the log fetching process, including `end_of_log` and `log_pos`.""" 

99 

100 end_of_log: bool 

101 log_pos: NotRequired[int] 

102 # the following attributes are used for Elasticsearch and OpenSearch log handlers 

103 offset: NotRequired[str | int] 

104 # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly 

105 # on the client. Sending as a string prevents this issue. 

106 # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER 

107 last_log_timestamp: NotRequired[str] 

108 max_offset: NotRequired[str] 

109 

110 

111class StructuredLogMessage(BaseModel): 

112 """An individual log message.""" 

113 

114 timestamp: datetime | None = None 

115 event: str 

116 

117 # Collisions of sort_key may occur due to duplicated messages. If this happens, the heap will use the second element, 

118 # which is the StructuredLogMessage for comparison. Therefore, we need to define a comparator for it. 

119 def __lt__(self, other: StructuredLogMessage) -> bool: 

120 return self.sort_key < other.sort_key 

121 

122 @property 

123 def sort_key(self) -> datetime: 

124 return self.timestamp or DEFAULT_SORT_DATETIME 

125 

126 # We don't need to cache string when parsing in to this, as almost every line will have a different 

127 # values; `extra=allow` means we'll create extra properties as needed. Only timestamp and event are 

128 # required, everything else is up to what ever is producing the logs 

129 model_config = ConfigDict(cache_strings=False, extra="allow") 

130 

131 

132class LogType(str, Enum): 

133 """ 

134 Type of service from which we retrieve logs. 

135 

136 :meta private: 

137 """ 

138 

139 TRIGGER = "trigger" 

140 WORKER = "worker" 

141 

142 

143def _set_task_deferred_context_var(): 

144 """ 

145 Tell task log handler that task exited with deferral. 

146 

147 This exists for the sole purpose of telling elasticsearch handler not to 

148 emit end_of_log mark after task deferral. 

149 

150 Depending on how the task is run, we may need to set this in task command or in local task job. 

151 Kubernetes executor requires the local task job invocation; local executor requires the task 

152 command invocation. 

153 

154 :meta private: 

155 """ 

156 logger = logging.getLogger() 

157 with suppress(StopIteration): 

158 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred")) 

159 h.ctx_task_deferred = True 

160 

161 

162def _fetch_logs_from_service(url: str, log_relative_path: str) -> Response: 

163 # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 

164 import requests 

165 

166 from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_key 

167 

168 timeout = conf.getint("api", "log_fetch_timeout_sec", fallback=None) 

169 generator = JWTGenerator( 

170 secret_key=get_signing_key("api", "secret_key"), 

171 # Since we are using a secret key, we need to be explicit about the algorithm here too 

172 algorithm="HS512", 

173 # We must set an empty private key here as otherwise it can be automatically loaded by JWTGenerator 

174 # and secret_key and private_key cannot be set together 

175 private_key=None, # type: ignore[arg-type] 

176 issuer=None, 

177 valid_for=conf.getint("webserver", "log_request_clock_grace", fallback=30), 

178 audience="task-instance-logs", 

179 ) 

180 response = requests.get( 

181 url, 

182 timeout=timeout, 

183 headers={"Authorization": generator.generate({"filename": log_relative_path})}, 

184 stream=True, 

185 ) 

186 response.encoding = "utf-8" 

187 return response 

188 

189 

190_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None) 

191 

192if not _parse_timestamp: 

193 

194 def _parse_timestamp(line: str): 

195 timestamp_str, _ = line.split(" ", 1) 

196 return pendulum.parse(timestamp_str.strip("[]")) 

197 

198 

199def _stream_lines_by_chunk( 

200 log_io: IO[str], 

201) -> RawLogStream: 

202 """ 

203 Stream lines from a file-like IO object. 

204 

205 :param log_io: A file-like IO object to read from. 

206 :return: A generator that yields individual lines within the specified range. 

207 """ 

208 # Skip processing if file is already closed 

209 if log_io.closed: 

210 return 

211 

212 # Seek to beginning if possible 

213 if log_io.seekable(): 

214 try: 

215 log_io.seek(0) 

216 except Exception as e: 

217 logger.error("Error seeking in log stream: %s", e) 

218 return 

219 

220 buffer = "" 

221 while True: 

222 # Check if file is already closed 

223 if log_io.closed: 

224 break 

225 

226 try: 

227 chunk = log_io.read(CHUNK_SIZE) 

228 except Exception as e: 

229 logger.error("Error reading log stream: %s", e) 

230 break 

231 

232 if not chunk: 

233 break 

234 

235 buffer += chunk 

236 *lines, buffer = buffer.split("\n") 

237 yield from lines 

238 

239 if buffer: 

240 yield from buffer.split("\n") 

241 

242 

243def _log_stream_to_parsed_log_stream( 

244 log_stream: RawLogStream, 

245) -> ParsedLogStream: 

246 """ 

247 Turn a str log stream into a generator of parsed log lines. 

248 

249 :param log_stream: The stream to parse. 

250 :return: A generator of parsed log lines. 

251 """ 

252 from airflow._shared.timezones.timezone import coerce_datetime 

253 

254 timestamp = None 

255 next_timestamp = None 

256 idx = 0 

257 for line in log_stream: 

258 if line: 

259 try: 

260 log = StructuredLogMessage.model_validate_json(line) 

261 except ValidationError: 

262 with suppress(Exception): 

263 # If we can't parse the timestamp, don't attach one to the row 

264 if isinstance(line, str): 

265 next_timestamp = _parse_timestamp(line) 

266 log = StructuredLogMessage(event=str(line), timestamp=next_timestamp) 

267 if log.timestamp: 

268 log.timestamp = coerce_datetime(log.timestamp) 

269 timestamp = log.timestamp 

270 yield timestamp, idx, log 

271 idx += 1 

272 

273 

274def _create_sort_key(timestamp: datetime | None, line_num: int) -> int: 

275 """ 

276 Create a sort key for log record, to be used in K-way merge. 

277 

278 :param timestamp: timestamp of the log line 

279 :param line_num: line number of the log line 

280 :return: a integer as sort key to avoid overhead of memory usage 

281 """ 

282 return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * SORT_KEY_OFFSET + line_num 

283 

284 

285def _is_sort_key_with_default_timestamp(sort_key: int) -> bool: 

286 """ 

287 Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP. 

288 

289 This is used to identify log records that don't have timestamp. 

290 

291 :param sort_key: The sort key to check 

292 :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, False otherwise 

293 """ 

294 # Extract the timestamp part from the sort key (remove the line number part) 

295 timestamp_part = sort_key // SORT_KEY_OFFSET 

296 return timestamp_part == DEFAULT_SORT_TIMESTAMP 

297 

298 

299def _add_log_from_parsed_log_streams_to_heap( 

300 heap: list[tuple[int, StructuredLogMessage]], 

301 parsed_log_streams: dict[int, ParsedLogStream], 

302) -> None: 

303 """ 

304 Add one log record from each parsed log stream to the heap, and will remove empty log stream from the dict after iterating. 

305 

306 :param heap: heap to store log records 

307 :param parsed_log_streams: dict of parsed log streams 

308 """ 

309 # We intend to initialize the list lazily, as in most cases we don't need to remove any log streams. 

310 # This reduces memory overhead, since this function is called repeatedly until all log streams are empty. 

311 log_stream_to_remove: list[int] | None = None 

312 for idx, log_stream in parsed_log_streams.items(): 

313 record: ParsedLog | None = next(log_stream, None) 

314 if record is None: 

315 if log_stream_to_remove is None: 

316 log_stream_to_remove = [] 

317 log_stream_to_remove.append(idx) 

318 continue 

319 timestamp, line_num, line = record 

320 # take int as sort key to avoid overhead of memory usage 

321 heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line)) 

322 # remove empty log stream from the dict 

323 if log_stream_to_remove is not None: 

324 for idx in log_stream_to_remove: 

325 del parsed_log_streams[idx] 

326 

327 

328def _flush_logs_out_of_heap( 

329 heap: list[tuple[int, StructuredLogMessage]], 

330 flush_size: int, 

331 last_log_container: list[StructuredLogMessage | None], 

332) -> Generator[StructuredLogMessage, None, None]: 

333 """ 

334 Flush logs out of the heap, deduplicating them based on the last log. 

335 

336 :param heap: heap to flush logs from 

337 :param flush_size: number of logs to flush 

338 :param last_log_container: a container to store the last log, to avoid duplicate logs 

339 :return: a generator that yields deduplicated logs 

340 """ 

341 last_log = last_log_container[0] 

342 for _ in range(flush_size): 

343 sort_key, line = heapq.heappop(heap) 

344 if line != last_log or _is_sort_key_with_default_timestamp(sort_key): # dedupe 

345 yield line 

346 last_log = line 

347 # update the last log container with the last log 

348 last_log_container[0] = last_log 

349 

350 

351def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream: 

352 """ 

353 Merge parsed log streams using K-way merge. 

354 

355 By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we can reduce the chance of messing up the global order. 

356 Since there are multiple log streams, we can't guarantee that the records are in global order. 

357 

358 e.g. 

359 

360 log_stream1: ---------- 

361 log_stream2: ---- 

362 log_stream3: -------- 

363 

364 The first record of log_stream3 is later than the fourth record of log_stream1 ! 

365 :param parsed_log_streams: parsed log streams 

366 :return: interleaved log stream 

367 """ 

368 # don't need to push whole tuple into heap, which increases too much overhead 

369 # push only sort_key and line into heap 

370 heap: list[tuple[int, StructuredLogMessage]] = [] 

371 # to allow removing empty streams while iterating, also turn the str stream into parsed log stream 

372 parsed_log_streams: dict[int, ParsedLogStream] = { 

373 idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream in enumerate(log_streams) 

374 } 

375 

376 # keep adding records from logs until all logs are empty 

377 last_log_container: list[StructuredLogMessage | None] = [None] 

378 while parsed_log_streams: 

379 _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams) 

380 

381 # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE 

382 if len(heap) >= HEAP_DUMP_SIZE: 

383 yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE, last_log_container) 

384 

385 # yield remaining records 

386 yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container) 

387 # free memory 

388 del heap 

389 del parsed_log_streams 

390 

391 

392def _is_logs_stream_like(log) -> bool: 

393 """Check if the logs are stream-like.""" 

394 return isinstance(log, (chain, GeneratorType)) 

395 

396 

397def _get_compatible_log_stream( 

398 log_messages: LogMessages, 

399) -> RawLogStream: 

400 """ 

401 Convert legacy log message blobs into a generator that yields log lines. 

402 

403 :param log_messages: List of legacy log message strings. 

404 :return: A generator that yields interleaved log lines. 

405 """ 

406 yield from chain.from_iterable( 

407 _stream_lines_by_chunk(io.StringIO(log_message)) for log_message in log_messages 

408 ) 

409 

410 

411class FileTaskHandler(logging.Handler): 

412 """ 

413 FileTaskHandler is a python log handler that handles and reads task instance logs. 

414 

415 It creates and delegates log handling to `logging.FileHandler` after receiving task 

416 instance context. It reads logs from task instance's host machine. 

417 

418 :param base_log_folder: Base log folder to place logs. 

419 :param max_bytes: max bytes size for the log file 

420 :param backup_count: backup file count for the log file 

421 :param delay: default False -> StreamHandler, True -> Handler 

422 """ 

423 

424 trigger_should_wrap = True 

425 inherits_from_empty_operator_log_message = ( 

426 "Operator inherits from empty operator and thus does not have logs" 

427 ) 

428 executor_instances: dict[str, BaseExecutor] = {} 

429 DEFAULT_EXECUTOR_KEY = "_default_executor" 

430 

431 def __init__( 

432 self, 

433 base_log_folder: str, 

434 max_bytes: int = 0, 

435 backup_count: int = 0, 

436 delay: bool = False, 

437 ): 

438 super().__init__() 

439 self.handler: logging.Handler | None = None 

440 self.local_base = base_log_folder 

441 self.maintain_propagate: bool = False 

442 self.max_bytes = max_bytes 

443 self.backup_count = backup_count 

444 self.delay = delay 

445 """ 

446 If true, overrides default behavior of setting propagate=False 

447 

448 :meta private: 

449 """ 

450 

451 self.ctx_task_deferred = False 

452 """ 

453 If true, task exited with deferral to trigger. 

454 

455 Some handlers emit "end of log" markers, and may not wish to do so when task defers. 

456 """ 

457 

458 def set_context( 

459 self, ti: TaskInstance | TaskInstanceHistory, *, identifier: str | None = None 

460 ) -> None | SetContextPropagate: 

461 """ 

462 Provide task_instance context to airflow task handler. 

463 

464 Generally speaking returns None. But if attr `maintain_propagate` has 

465 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This 

466 has the effect of overriding the default behavior to set `propagate` 

467 to False whenever set_context is called. At time of writing, this 

468 functionality is only used in unit testing. 

469 

470 :param ti: task instance object 

471 :param identifier: if set, adds suffix to log file. For use when relaying exceptional messages 

472 to task logs from a context other than task or trigger run 

473 """ 

474 local_loc = self._init_file(ti, identifier=identifier) 

475 self.handler = NonCachingRotatingFileHandler( 

476 local_loc, 

477 encoding="utf-8", 

478 maxBytes=self.max_bytes, 

479 backupCount=self.backup_count, 

480 delay=self.delay, 

481 ) 

482 if self.formatter: 

483 self.handler.setFormatter(self.formatter) 

484 self.handler.setLevel(self.level) 

485 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None 

486 

487 @staticmethod 

488 def add_triggerer_suffix(full_path, job_id=None): 

489 """ 

490 Derive trigger log filename from task log filename. 

491 

492 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123 

493 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish 

494 the files because, rarely, the same trigger could get picked up by two different 

495 triggerer instances. 

496 """ 

497 full_path = Path(full_path).as_posix() 

498 full_path += f".{LogType.TRIGGER.value}" 

499 if job_id: 

500 full_path += f".{job_id}.log" 

501 return full_path 

502 

503 def emit(self, record): 

504 if self.handler: 

505 self.handler.emit(record) 

506 

507 def flush(self): 

508 if self.handler: 

509 self.handler.flush() 

510 

511 def close(self): 

512 if self.handler: 

513 self.handler.close() 

514 

515 @provide_session 

516 def _render_filename( 

517 self, ti: TaskInstance | TaskInstanceHistory, try_number: int, session=NEW_SESSION 

518 ) -> str: 

519 """Return the worker log filename.""" 

520 dag_run = ti.get_dagrun(session=session) 

521 

522 date = dag_run.logical_date or dag_run.run_after 

523 formatted_date = date.isoformat() 

524 

525 template = dag_run.get_log_template(session=session).filename 

526 str_tpl, jinja_tpl = parse_template_string(template) 

527 if jinja_tpl: 

528 return render_template( 

529 jinja_tpl, {"ti": ti, "ts": formatted_date, "try_number": try_number}, native=False 

530 ) 

531 

532 if str_tpl: 

533 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) 

534 if data_interval[0]: 

535 data_interval_start = data_interval[0].isoformat() 

536 else: 

537 data_interval_start = "" 

538 if data_interval[1]: 

539 data_interval_end = data_interval[1].isoformat() 

540 else: 

541 data_interval_end = "" 

542 return str_tpl.format( 

543 dag_id=ti.dag_id, 

544 task_id=ti.task_id, 

545 run_id=ti.run_id, 

546 data_interval_start=data_interval_start, 

547 data_interval_end=data_interval_end, 

548 logical_date=formatted_date, 

549 try_number=try_number, 

550 ) 

551 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") 

552 

553 def _get_executor_get_task_log( 

554 self, ti: TaskInstance | TaskInstanceHistory 

555 ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]]: 

556 """ 

557 Get the get_task_log method from executor of current task instance. 

558 

559 Since there might be multiple executors, so we need to get the executor of current task instance instead of getting from default executor. 

560 

561 :param ti: task instance object 

562 :return: get_task_log method of the executor 

563 """ 

564 executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY 

565 executor = self.executor_instances.get(executor_name) 

566 if executor is not None: 

567 return executor.get_task_log 

568 

569 if executor_name == self.DEFAULT_EXECUTOR_KEY: 

570 self.executor_instances[executor_name] = ExecutorLoader.get_default_executor() 

571 else: 

572 self.executor_instances[executor_name] = ExecutorLoader.load_executor(executor_name) 

573 return self.executor_instances[executor_name].get_task_log 

574 

575 def _read( 

576 self, 

577 ti: TaskInstance | TaskInstanceHistory, 

578 try_number: int, 

579 metadata: LogMetadata | None = None, 

580 ) -> tuple[LogHandlerOutputStream | LegacyProvidersLogType, LogMetadata]: 

581 """ 

582 Template method that contains custom logic of reading logs given the try_number. 

583 

584 :param ti: task instance record 

585 :param try_number: current try_number to read log from 

586 :param metadata: log metadata, 

587 can be used for steaming log reading and auto-tailing. 

588 Following attributes are used: 

589 log_pos: (absolute) Char position to which the log 

590 which was retrieved in previous calls, this 

591 part will be skipped and only following test 

592 returned to be added to tail. 

593 :return: log message as a string and metadata. 

594 Following attributes are used in metadata: 

595 end_of_log: Boolean, True if end of log is reached or False 

596 if further calls might get more log text. 

597 This is determined by the status of the TaskInstance 

598 log_pos: (absolute) Char position to which the log is retrieved 

599 """ 

600 # Task instance here might be different from task instance when 

601 # initializing the handler. Thus explicitly getting log location 

602 # is needed to get correct log path. 

603 worker_log_rel_path = self._render_filename(ti, try_number) 

604 sources: LogSourceInfo = [] 

605 source_list: list[str] = [] 

606 remote_logs: list[RawLogStream] = [] 

607 local_logs: list[RawLogStream] = [] 

608 executor_logs: list[RawLogStream] = [] 

609 served_logs: list[RawLogStream] = [] 

610 with suppress(NotImplementedError): 

611 sources, logs = self._read_remote_logs(ti, try_number, metadata) 

612 if not logs: 

613 remote_logs = [] 

614 elif isinstance(logs, list) and isinstance(logs[0], str): 

615 # If the logs are in legacy format, convert them to a generator of log lines 

616 remote_logs = [ 

617 # We don't need to use the log_pos here, as we are using the metadata to track the position 

618 _get_compatible_log_stream(cast("list[str]", logs)) 

619 ] 

620 elif isinstance(logs, list) and _is_logs_stream_like(logs[0]): 

621 # If the logs are already in a stream-like format, we can use them directly 

622 remote_logs = cast("list[RawLogStream]", logs) 

623 else: 

624 # If the logs are in a different format, raise an error 

625 raise TypeError("Logs should be either a list of strings or a generator of log lines.") 

626 # Extend LogSourceInfo 

627 source_list.extend(sources) 

628 has_k8s_exec_pod = False 

629 if ti.state == TaskInstanceState.RUNNING: 

630 executor_get_task_log = self._get_executor_get_task_log(ti) 

631 response = executor_get_task_log(ti, try_number) 

632 if response: 

633 sources, logs = response 

634 # make the logs stream-like compatible 

635 executor_logs = [_get_compatible_log_stream(logs)] 

636 if sources: 

637 source_list.extend(sources) 

638 has_k8s_exec_pod = True 

639 if not (remote_logs and ti.state not in State.unfinished): 

640 # when finished, if we have remote logs, no need to check local 

641 worker_log_full_path = Path(self.local_base, worker_log_rel_path) 

642 sources, local_logs = self._read_from_local(worker_log_full_path) 

643 source_list.extend(sources) 

644 if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: 

645 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

646 source_list.extend(sources) 

647 elif ti.state not in State.unfinished and not (local_logs or remote_logs): 

648 # ordinarily we don't check served logs, with the assumption that users set up 

649 # remote logging or shared drive for logs for persistence, but that's not always true 

650 # so even if task is done, if no local logs or remote logs are found, we'll check the worker 

651 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

652 source_list.extend(sources) 

653 

654 out_stream: LogHandlerOutputStream = _interleave_logs( 

655 *local_logs, 

656 *remote_logs, 

657 *executor_logs, 

658 *served_logs, 

659 ) 

660 

661 # Log message source details are grouped: they are not relevant for most users and can 

662 # distract them from finding the root cause of their errors 

663 header = [ 

664 StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg] 

665 StructuredLogMessage(event="::endgroup::"), 

666 ] 

667 end_of_log = ti.try_number != try_number or ti.state not in ( 

668 TaskInstanceState.RUNNING, 

669 TaskInstanceState.DEFERRED, 

670 ) 

671 

672 with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as stream_accumulator: 

673 log_pos = stream_accumulator.total_lines 

674 out_stream = stream_accumulator.stream 

675 

676 # skip log stream until the last position 

677 if metadata and "log_pos" in metadata: 

678 islice(out_stream, metadata["log_pos"]) 

679 else: 

680 # first time reading log, add messages before interleaved log stream 

681 out_stream = chain(header, out_stream) 

682 

683 return out_stream, { 

684 "end_of_log": end_of_log, 

685 "log_pos": log_pos, 

686 } 

687 

688 @staticmethod 

689 @staticmethod 

690 def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory): 

691 pod_override = getattr(ti.executor_config, "pod_override", None) 

692 metadata = getattr(pod_override, "metadata", None) 

693 namespace = None 

694 with suppress(Exception): 

695 namespace = getattr(metadata, "namespace", None) 

696 return namespace or conf.get("kubernetes_executor", "namespace") 

697 

698 def _get_log_retrieval_url( 

699 self, 

700 ti: TaskInstance | TaskInstanceHistory, 

701 log_relative_path: str, 

702 log_type: LogType | None = None, 

703 ) -> tuple[str, str]: 

704 """Given TI, generate URL with which to fetch logs from service log server.""" 

705 if log_type == LogType.TRIGGER: 

706 if not ti.triggerer_job: 

707 raise RuntimeError("Could not build triggerer log URL; no triggerer job.") 

708 config_key = "triggerer_log_server_port" 

709 config_default = 8794 

710 hostname = ti.triggerer_job.hostname 

711 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id) 

712 else: 

713 hostname = ti.hostname 

714 config_key = "worker_log_server_port" 

715 config_default = 8793 

716 return ( 

717 urljoin( 

718 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", 

719 log_relative_path, 

720 ), 

721 log_relative_path, 

722 ) 

723 

724 def read( 

725 self, 

726 task_instance: TaskInstance | TaskInstanceHistory, 

727 try_number: int | None = None, 

728 metadata: LogMetadata | None = None, 

729 ) -> tuple[LogHandlerOutputStream, LogMetadata]: 

730 """ 

731 Read logs of given task instance from local machine. 

732 

733 :param task_instance: task instance object 

734 :param try_number: task instance try_number to read logs from. If None 

735 it returns the log of task_instance.try_number 

736 :param metadata: log metadata, can be used for steaming log reading and auto-tailing. 

737 :return: a list of listed tuples which order log string by host 

738 """ 

739 if try_number is None: 

740 try_number = task_instance.try_number 

741 

742 if try_number == 0 and task_instance.state in ( 

743 TaskInstanceState.SKIPPED, 

744 TaskInstanceState.UPSTREAM_FAILED, 

745 ): 

746 logs = [StructuredLogMessage(event="Task was skipped, no logs available.")] 

747 return chain(logs), {"end_of_log": True} 

748 

749 if try_number is None or try_number < 1: 

750 logs = [ 

751 StructuredLogMessage( # type: ignore[call-arg] 

752 level="error", event=f"Error fetching the logs. Try number {try_number} is invalid." 

753 ) 

754 ] 

755 return chain(logs), {"end_of_log": True} 

756 

757 # compatibility for es_task_handler and os_task_handler 

758 read_result = self._read(task_instance, try_number, metadata) 

759 out_stream, metadata = read_result 

760 # If the out_stream is None or empty, return the read result 

761 if not out_stream: 

762 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream) 

763 return out_stream, metadata 

764 

765 if _is_logs_stream_like(out_stream): 

766 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream) 

767 return out_stream, metadata 

768 if isinstance(out_stream, list) and isinstance(out_stream[0], StructuredLogMessage): 

769 out_stream = cast("list[StructuredLogMessage]", out_stream) 

770 return (log for log in out_stream), metadata 

771 if isinstance(out_stream, list) and isinstance(out_stream[0], str): 

772 # If the out_stream is a list of strings, convert it to a generator 

773 out_stream = cast("list[str]", out_stream) 

774 raw_stream = _stream_lines_by_chunk(io.StringIO("".join(out_stream))) 

775 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream)) 

776 return out_stream, metadata 

777 if isinstance(out_stream, str): 

778 # If the out_stream is a string, convert it to a generator 

779 raw_stream = _stream_lines_by_chunk(io.StringIO(out_stream)) 

780 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream)) 

781 return out_stream, metadata 

782 raise TypeError( 

783 "Invalid log stream type. Expected a generator of StructuredLogMessage, list of StructuredLogMessage, list of str or str." 

784 f" Got {type(out_stream).__name__} instead." 

785 f" Content type: {type(out_stream[0]).__name__ if isinstance(out_stream, (list, tuple)) and out_stream else 'empty'}" 

786 ) 

787 

788 @staticmethod 

789 def _prepare_log_folder(directory: Path, new_folder_permissions: int): 

790 """ 

791 Prepare the log folder and ensure its mode is as configured. 

792 

793 To handle log writing when tasks are impersonated, the log files need to 

794 be writable by the user that runs the Airflow command and the user 

795 that is impersonated. This is mainly to handle corner cases with the 

796 SubDagOperator. When the SubDagOperator is run, all of the operators 

797 run under the impersonated user and create appropriate log files 

798 as the impersonated user. However, if the user manually runs tasks 

799 of the SubDagOperator through the UI, then the log files are created 

800 by the user that runs the Airflow command. For example, the Airflow 

801 run command may be run by the `airflow_sudoable` user, but the Airflow 

802 tasks may be run by the `airflow` user. If the log files are not 

803 writable by both users, then it's possible that re-running a task 

804 via the UI (or vice versa) results in a permission error as the task 

805 tries to write to a log file created by the other user. 

806 

807 We leave it up to the user to manage their permissions by exposing configuration for both 

808 new folders and new log files. Default is to make new log folders and files group-writeable 

809 to handle most common impersonation use cases. The requirement in this case will be to make 

810 sure that the same group is set as default group for both - impersonated user and main airflow 

811 user. 

812 """ 

813 for parent in reversed(directory.parents): 

814 parent.mkdir(mode=new_folder_permissions, exist_ok=True) 

815 directory.mkdir(mode=new_folder_permissions, exist_ok=True) 

816 

817 def _init_file(self, ti, *, identifier: str | None = None): 

818 """ 

819 Create log directory and give it permissions that are configured. 

820 

821 See above _prepare_log_folder method for more detailed explanation. 

822 

823 :param ti: task instance object 

824 :return: relative log path of the given task instance 

825 """ 

826 new_file_permissions = int( 

827 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8 

828 ) 

829 local_relative_path = self._render_filename(ti, ti.try_number) 

830 full_path = os.path.join(self.local_base, local_relative_path) 

831 if identifier: 

832 full_path += f".{identifier}.log" 

833 elif ti.is_trigger_log_context is True: 

834 # if this is true, we're invoked via set_context in the context of 

835 # setting up individual trigger logging. return trigger log path. 

836 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id) 

837 new_folder_permissions = int( 

838 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 

839 ) 

840 self._prepare_log_folder(Path(full_path).parent, new_folder_permissions) 

841 

842 if not os.path.exists(full_path): 

843 open(full_path, "a").close() 

844 try: 

845 os.chmod(full_path, new_file_permissions) 

846 except OSError as e: 

847 logger.warning("OSError while changing ownership of the log file. ", e) 

848 

849 return full_path 

850 

851 @staticmethod 

852 def _read_from_local( 

853 worker_log_path: Path, 

854 ) -> StreamingLogResponse: 

855 sources: LogSourceInfo = [] 

856 log_streams: list[RawLogStream] = [] 

857 paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) 

858 if not paths: 

859 return sources, log_streams 

860 

861 for path in paths: 

862 sources.append(os.fspath(path)) 

863 # Read the log file and yield lines 

864 log_streams.append(_stream_lines_by_chunk(open(path, encoding="utf-8"))) 

865 return sources, log_streams 

866 

867 def _read_from_logs_server( 

868 self, 

869 ti: TaskInstance | TaskInstanceHistory, 

870 worker_log_rel_path: str, 

871 ) -> StreamingLogResponse: 

872 sources: LogSourceInfo = [] 

873 log_streams: list[RawLogStream] = [] 

874 try: 

875 log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER 

876 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) 

877 response = _fetch_logs_from_service(url, rel_path) 

878 if response.status_code == 403: 

879 sources.append( 

880 "!!!! Please make sure that all your Airflow components (e.g. " 

881 "schedulers, api-servers, dag-processors, workers and triggerer) have " 

882 "the same 'secret_key' configured in '[api]' section and " 

883 "time is synchronized on all your machines (for example with ntpd)\n" 

884 "See more at https://airflow.apache.org/docs/apache-airflow/" 

885 "stable/configurations-ref.html#secret-key" 

886 ) 

887 else: 

888 # Check if the resource was properly fetched 

889 response.raise_for_status() 

890 if int(response.headers.get("Content-Length", 0)) > 0: 

891 sources.append(url) 

892 log_streams.append( 

893 _stream_lines_by_chunk(io.TextIOWrapper(cast("IO[bytes]", response.raw))) 

894 ) 

895 except Exception as e: 

896 from requests.exceptions import InvalidURL 

897 

898 if ( 

899 isinstance(e, InvalidURL) 

900 and ti.task is not None 

901 and ti.task.inherits_from_empty_operator is True 

902 ): 

903 sources.append(self.inherits_from_empty_operator_log_message) 

904 else: 

905 sources.append(f"Could not read served logs: {e}") 

906 logger.exception("Could not read served logs") 

907 return sources, log_streams 

908 

909 def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse: 

910 """ 

911 Implement in subclasses to read from the remote service. 

912 

913 This method should return two lists, messages and logs. 

914 

915 * Each element in the messages list should be a single message, 

916 such as, "reading from x file". 

917 * Each element in the logs list should be the content of one file. 

918 """ 

919 remote_io = None 

920 try: 

921 from airflow.logging_config import get_remote_task_log 

922 

923 remote_io = get_remote_task_log() 

924 except Exception: 

925 pass 

926 

927 if remote_io is None: 

928 # Import not found, or explicitly set to None 

929 raise NotImplementedError 

930 

931 # This living here is not really a good plan, but it just about works for now. 

932 # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler. 

933 path = self._render_filename(ti, try_number) 

934 if stream_method := getattr(remote_io, "stream", None): 

935 # Use .stream interface if provider's RemoteIO supports it 

936 sources, logs = stream_method(path, ti) 

937 return sources, logs or [] 

938 # Fallback to .read interface 

939 sources, logs = remote_io.read(path, ti) 

940 return sources, logs or []