Skip to content

Commit

Permalink
use pipe to redirect stdout/stderr
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Jan 2, 2025
1 parent 044db2f commit 2dad6b1
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 19 deletions.
119 changes: 119 additions & 0 deletions python/ray/_private/log_rotation_with_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Implement log rotation with pipe."""

from typing import IO, AnyStr
import sys
import os
import logging
import threading
import atexit
from logging.handlers import RotatingFileHandler


class PipeStreamWriteHandle(IO[AnyStr]):
def __init__(self, write_fd: int, log_name: str):
self.name = log_name
self.write_fd = write_fd
self.stream = os.fdopen(write_fd, "w")

def name(self):
return self.name

def fileno(self):
return self.write_fd

def write(self, s: str) -> int:
return self.stream.write(s)

def flush(self):
self.stream.flush()

def close(self):
self.stream.close()

def isatty(self) -> bool:
return False

def readable(self) -> bool:
return False

def writable(self) -> bool:
return True

def seekable(self) -> bool:
return False


def open_pipe_with_rotation(
fname: str, rotation_max_size: int = sys.maxsize, rotation_file_num: int = 1
) -> IO[AnyStr]:
"""Stream content into pipe, which will be listened and dumped to [fname] with
rotation."""
read_fd, write_fd = os.pipe()

log_content = []
stopped = False
lock = threading.Lock()
cond = threading.Condition(lock)

logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
fname, maxBytes=rotation_max_size, backupCount=rotation_file_num
)
# Only logging message with nothing else.
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)

# Setup read thread, which continuous read content out of pipe and append to buffer.
def read_log_content_from_pipe():
with os.fdopen(read_fd, "r") as pipe_reader:
while True:
line = pipe_reader.readline()

# An empty line is read over, which indicates write side has closed.
if line == "":
with cond:
cond.notify()
return

# Only buffer new line when not empty.
line = line.strip()
if line:
with cond:
log_content.append(line)
cond.notify()

# Setup logging thread, which continues read content out of log buffer and persist
# via logger. Two threads are used here to avoid blocking write from application.
def dump_log_content_to_buffer():
logger = logging.getLogger()

while True:
with cond:
while not log_content and not stopped:
cond.wait()

if log_content:
content = log_content.pop(0)
logger.info(content)
continue

# Thread requested to stop
return

read_thread = threading.Thread(target=read_log_content_from_pipe)
dump_thread = threading.Thread(target=dump_log_content_to_buffer)
read_thread.start()
dump_thread.start()

pipe_write_stream = PipeStreamWriteHandle(write_fd, fname)

def cleanup():
nonlocal stopped
with lock:
stopped = True
pipe_write_stream.close()

atexit.register(cleanup)

return pipe_write_stream
31 changes: 31 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink
from ray._private.log_rotation_with_pipe import open_pipe_with_rotation

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
Expand Down Expand Up @@ -884,6 +885,36 @@ def get_log_file_names(
log_stderr = self._get_log_file_name(name, "err", unique=unique)
return log_stdout, log_stderr

def get_log_file_handle_with_rotation(
self,
name: str,
unique: bool = False,
create_out: bool = True,
create_err: bool = True,
rotation_max_size: int = sys.maxsize,
rotation_file_num: int = 1,
) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]:
"""Similar to `get_log_file_handles`, but enables rotation internally by
writing to a pipe which is listened by background thread."""
log_stdout_fname, log_stderr_fname = self.get_log_file_names(
name, unique=unique, create_out=create_out, create_err=create_err
)
log_stdout = (
None
if log_stdout_fname is None
else open_pipe_with_rotation(
log_stdout_fname, rotation_max_size, rotation_file_num
)
)
log_stderr = (
None
if log_stderr_fname is None
else open_pipe_with_rotation(
log_stderr_fname, rotation_max_size, rotation_file_num
)
)
return log_stdout, log_stderr

def get_log_file_handles(
self,
name: str,
Expand Down
10 changes: 10 additions & 0 deletions python/ray/_private/ray_logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ def configure_log_file(out_file, err_file):
)


def configure_log_file_with_pipe(out_pipe_write_fd: int, err_pipe_write_fd: int):
stdout_fileno = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
# C++ logging requires redirecting the stdout file descriptor. Note that
# dup2 will automatically close the old file descriptor before overriding
# it.
os.dup2(out_pipe_write_fd, stdout_fileno)
os.dup2(err_pipe_write_fd, stderr_fileno)


class WorkerStandardStreamDispatcher:
def __init__(self):
self.handlers = []
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@
worker = ray._private.worker.global_worker

# Setup log file.
out_file, err_file = node.get_log_file_handles(
get_worker_log_file_name(args.worker_type)
out_file, err_file = node.get_log_file_handle_with_rotation(
get_worker_log_file_name(args.worker_type),
rotation_max_size=args.logging_rotate_bytes,
rotation_file_num=args.logging_rotate_backup_count,
)
configure_log_file(out_file, err_file)
worker.set_out_file(out_file)
Expand Down
37 changes: 20 additions & 17 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,26 @@ def component_file_only_one_log_entry(component):

# TODO(hjiang): Enable after log rotation implemented for user application.
#
# # Test application log, which starts with `worker-`.
# # Should be tested separately with other components since "worker" is a substring
# # of "python-core-worker".
# #
# # Check file count.
# application_stdout_paths = []
# for path in paths:
# if path.stem.startswith("worker-") and re.search(r".*\.out(\.\d+)?", str(path))
# # and path.stat().st_size > 0:
# application_stdout_paths.append(path)
# assert len(application_stdout_paths) == 4, application_stdout_paths

# # Check file content, each file should have one line.
# for cur_path in application_stdout_paths:
# with cur_path.open() as f:
# lines = f.readlines()
# assert len(lines) == 1, lines
# Test application log, which starts with `worker-`.
# Should be tested separately with other components since "worker" is a substring
# of "python-core-worker".
#
# Check file count.
application_stdout_paths = []
for path in paths:
if (
path.stem.startswith("worker-")
and re.search(r".*\.out(\.\d+)?", str(path))
and path.stat().st_size > 0
):
application_stdout_paths.append(path)
assert len(application_stdout_paths) == 4, application_stdout_paths

# Check file content, each file should have one line.
for cur_path in application_stdout_paths:
with cur_path.open() as f:
lines = f.readlines()
assert len(lines) == 1, lines


def test_periodic_event_stats(shutdown_only):
Expand Down

0 comments on commit 2dad6b1

Please sign in to comment.