# pylint: disable=unused-argument
from __future__ import annotations
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import IO, Callable, Generator, NamedTuple, Optional, Sequence
from dagster._core.storage.compute_log_manager import ComputeIOType
MAX_BYTES_CHUNK_READ = 4194304 # 4 MB
class CapturedLogContext(
NamedTuple(
"_CapturedLogContext",
[
("log_key", Sequence[str]),
("external_url", Optional[str]),
],
)
):
"""
Object representing the context in which logs are captured. Can be used by external logging
sidecar implementations to point dagit to an external url to view compute logs instead of a
Dagster-managed location.
"""
def __new__(cls, log_key: Sequence[str], external_url: Optional[str] = None):
return super(CapturedLogContext, cls).__new__(cls, log_key, external_url=external_url)
class CapturedLogData(
NamedTuple(
"_CapturedLogData",
[
("log_key", Sequence[str]),
("stdout", Optional[bytes]),
("stderr", Optional[bytes]),
("cursor", Optional[str]),
],
)
):
"""
Object representing captured log data, either a partial chunk of the log data or the full
capture. Contains the raw bytes and optionally the cursor offset for the partial chunk.
"""
def __new__(
cls,
log_key: Sequence[str],
stdout: Optional[bytes] = None,
stderr: Optional[bytes] = None,
cursor: Optional[str] = None,
):
return super(CapturedLogData, cls).__new__(cls, log_key, stdout, stderr, cursor)
class CapturedLogMetadata(
NamedTuple(
"_CapturedLogMetadata",
[
("stdout_location", Optional[str]),
("stderr_location", Optional[str]),
("stdout_download_url", Optional[str]),
("stderr_download_url", Optional[str]),
],
)
):
"""
Object representing metadata info for the captured log data, containing a display string for
the location of the log data and a URL for direct download of the captured log data.
"""
def __new__(
cls,
stdout_location: Optional[str] = None,
stderr_location: Optional[str] = None,
stdout_download_url: Optional[str] = None,
stderr_download_url: Optional[str] = None,
):
return super(CapturedLogMetadata, cls).__new__(
cls,
stdout_location=stdout_location,
stderr_location=stderr_location,
stdout_download_url=stdout_download_url,
stderr_download_url=stderr_download_url,
)
class CapturedLogSubscription:
def __init__(self, manager: CapturedLogManager, log_key: Sequence[str], cursor: Optional[str]):
self._manager = manager
self._log_key = log_key
self._cursor = cursor
self._observer: Optional[Callable[[CapturedLogData], None]] = None
self.is_complete = False
def __call__(self, observer: Optional[Callable[[CapturedLogData], None]]):
self._observer = observer
self.fetch()
if self._manager.is_capture_complete(self._log_key):
self.complete()
return self
@property
def log_key(self):
return self._log_key
def dispose(self):
self._observer = None
self._manager.unsubscribe(self)
def fetch(self):
if not self._observer:
return
should_fetch = True
while should_fetch:
log_data = self._manager.get_log_data(
self._log_key,
self._cursor,
max_bytes=MAX_BYTES_CHUNK_READ,
)
if not self._cursor or log_data.cursor != self._cursor:
self._observer(log_data)
self._cursor = log_data.cursor
should_fetch = _has_max_data(log_data.stdout) or _has_max_data(log_data.stderr)
def complete(self):
self.is_complete = True
def _has_max_data(chunk) -> bool:
return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ
[docs]class CapturedLogManager(ABC):
"""Abstract base class for capturing the unstructured logs (stdout/stderr) in the current
process, stored / retrieved with a provided log_key."""
@abstractmethod
@contextmanager
def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]:
"""
Context manager for capturing the stdout/stderr within the current process, and persisting
it under the given log key.
Args:
log_key (List[String]): The log key identifying the captured logs
"""
@abstractmethod
@contextmanager
def open_log_stream(
self, log_key: Sequence[str], io_type: ComputeIOType
) -> Generator[Optional[IO], None, None]:
"""
Context manager for providing an IO stream that enables the caller to write to a log stream
managed by the captured log manager, to be read later using the given log key.
Args:
log_key (List[String]): The log key identifying the captured logs
"""
@abstractmethod
def is_capture_complete(self, log_key: Sequence[str]) -> bool:
"""Flag indicating when the log capture for a given log key has completed.
Args:
log_key (List[String]): The log key identifying the captured logs
Returns:
Boolean
"""
@abstractmethod
def get_log_data(
self,
log_key: Sequence[str],
cursor: Optional[str] = None,
max_bytes: Optional[int] = None,
) -> CapturedLogData:
"""Returns a chunk of the captured stdout logs for a given log key
Args:
log_key (List[String]): The log key identifying the captured logs
cursor (Optional[str]): A cursor representing the position of the log chunk to fetch
max_bytes (Optional[int]): A limit on the size of the log chunk to fetch
Returns:
CapturedLogData
"""
@abstractmethod
def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata:
"""Returns the metadata of the captured logs for a given log key, including
displayable information on where the logs are persisted.
Args:
log_key (List[String]): The log key identifying the captured logs
Returns:
CapturedLogMetadata
"""
@abstractmethod
def delete_logs(
self, log_key: Optional[Sequence[str]] = None, prefix: Optional[Sequence[str]] = None
):
"""Deletes the captured logs for a given log key.
Args:
log_key(Optional[List[String]]): The log key of the logs to delete
prefix(Optional[List[String]]): The prefix of the log keys to delete
"""
@abstractmethod
def subscribe(
self, log_key: Sequence[str], cursor: Optional[str] = None
) -> CapturedLogSubscription:
"""Registers an observable object for log data
Args:
log_key (List[String]): The log key identifying the captured logs
cursor (Optional[String]): The string cursor marking the position within the log stream
Returns:
ComputeLogSubscription
"""
@abstractmethod
def unsubscribe(self, subscription: CapturedLogSubscription):
"""Deregisters an observable object from receiving log updates
Args:
subscription (CapturedLogSubscription): subscription object which manages when to send
back data to the subscriber
"""
def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]:
"""Legacy adapter to translate run_id/key to captured log manager-based log_key"""
return [run_id, "compute_logs", step_key]