Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use pipe to redirect stdout #49548

Closed
wants to merge 13 commits into from
119 changes: 119 additions & 0 deletions python/ray/_private/log_rotation_with_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Implement log rotation with pipe."""

from typing import IO, AnyStr
import sys
import os
import logging
import threading
import atexit
from logging.handlers import RotatingFileHandler


class PipeStreamWriteHandle(IO[AnyStr]):
def __init__(self, write_fd: int, log_name: str):
self.name = log_name
self.write_fd = write_fd
self.stream = os.fdopen(write_fd, "w")

def name(self):
return self.name

def fileno(self):
return self.write_fd

def write(self, s: str) -> int:
return self.stream.write(s)

def flush(self):
self.stream.flush()

def close(self):
self.stream.close()

def isatty(self) -> bool:
return False

def readable(self) -> bool:
return False

def writable(self) -> bool:
return True

def seekable(self) -> bool:
return False

Comment on lines +16 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configure_log_file only requires this object has fileno() method not others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write, flush, name, fileno is definitely needed --- I have met issues without any of them;
Others are suggested by gpt, didn't verify one by one.

My personal opinion is it doesn't hurt to implement these overloads;

  • All these overload functions are straightforward and short enough
  • Some attribute function help explain the class, i.e. readable and writable.


def open_pipe_with_rotation(
fname: str, rotation_max_size: int = sys.maxsize, rotation_file_num: int = 1
) -> IO[AnyStr]:
"""Stream content into pipe, which will be listened and dumped to [fname] with
rotation."""
read_fd, write_fd = os.pipe()

log_content = []
stopped = False
lock = threading.Lock()
cond = threading.Condition(lock)

logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
dentiny marked this conversation as resolved.
Show resolved Hide resolved
fname, maxBytes=rotation_max_size, backupCount=rotation_file_num
)
# Only logging message with nothing else.
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)

# Setup read thread, which continuous read content out of pipe and append to buffer.
def read_log_content_from_pipe():
with os.fdopen(read_fd, "r") as pipe_reader:
while True:
line = pipe_reader.readline()

# An empty line is read over, which indicates write side has closed.
if line == "":
with cond:
cond.notify()
return

# Only buffer new line when not empty.
line = line.strip()
if line:
with cond:
log_content.append(line)
cond.notify()

# Setup logging thread, which continues read content out of log buffer and persist
# via logger. Two threads are used here to avoid blocking write from application.
dentiny marked this conversation as resolved.
Show resolved Hide resolved
def dump_log_content_to_buffer():
logger = logging.getLogger()
dentiny marked this conversation as resolved.
Show resolved Hide resolved

while True:
with cond:
while not log_content and not stopped:
cond.wait()

if log_content:
content = log_content.pop(0)
logger.info(content)
continue

# Thread requested to stop
return

read_thread = threading.Thread(target=read_log_content_from_pipe)
dump_thread = threading.Thread(target=dump_log_content_to_buffer)
read_thread.start()
dump_thread.start()

pipe_write_stream = PipeStreamWriteHandle(write_fd, fname)

def cleanup():
nonlocal stopped
with lock:
stopped = True
pipe_write_stream.close()

atexit.register(cleanup)

return pipe_write_stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the PipeStreamWriteHandle wrapper, can we directly return open(write_fd)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open takes a path-like object, not a fd
Ref: https://docs.python.org/3/library/functions.html#open

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fdopen is the interface we want to wrap fd into a file object
Ref: https://docs.python.org/3/library/os.html#os.fdopen

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use fdopen then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to you plan to provide name and write then?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. write() I think is supported by the object returned by fdopen()? The issue is name then. I guess you can do

file_like_io_object = os.fdopen(write_fd)
file_like_io_object.name = "xxxx"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried

Traceback (most recent call last):
  File "logging_redirection_test.py", line 13, in <module>
    _LOG_STDOUT_FNAME, _ROTATION_MAX_BYTE, _ROTATION_FILE_NUM)
  File "/home/ubuntu/py-logging-redirection/pipe_logging.py", line 90, in open_pipe_with_rotation
    pipe_write_stream.name = fname
AttributeError: attribute 'name' of '_io.TextIOWrapper' objects is not writable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code snippet:

pipe_write_stream = os.fdopen(write_fd, "w")
pipe_write_stream.name = fname

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can do https://stackoverflow.com/questions/60622854/how-to-instantiate-an-io-textiowrapper-object-with-a-name-attribute so we don't need to implement our own IO subclass with all these methods.

31 changes: 31 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink
from ray._private.log_rotation_with_pipe import open_pipe_with_rotation

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
Expand Down Expand Up @@ -884,6 +885,36 @@ def get_log_file_names(
log_stderr = self._get_log_file_name(name, "err", unique=unique)
return log_stdout, log_stderr

def get_log_file_handle_with_rotation(
self,
name: str,
unique: bool = False,
create_out: bool = True,
create_err: bool = True,
rotation_max_size: int = sys.maxsize,
rotation_file_num: int = 1,
) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]:
"""Similar to `get_log_file_handles`, but enables rotation internally by
writing to a pipe which is listened by background thread."""
log_stdout_fname, log_stderr_fname = self.get_log_file_names(
name, unique=unique, create_out=create_out, create_err=create_err
)
log_stdout = (
None
if log_stdout_fname is None
else open_pipe_with_rotation(
log_stdout_fname, rotation_max_size, rotation_file_num
)
)
log_stderr = (
None
if log_stderr_fname is None
else open_pipe_with_rotation(
log_stderr_fname, rotation_max_size, rotation_file_num
)
)
return log_stdout, log_stderr

def get_log_file_handles(
self,
name: str,
Expand Down
10 changes: 10 additions & 0 deletions python/ray/_private/ray_logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ def configure_log_file(out_file, err_file):
)


def configure_log_file_with_pipe(out_pipe_write_fd: int, err_pipe_write_fd: int):
dentiny marked this conversation as resolved.
Show resolved Hide resolved
stdout_fileno = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
# C++ logging requires redirecting the stdout file descriptor. Note that
# dup2 will automatically close the old file descriptor before overriding
# it.
os.dup2(out_pipe_write_fd, stdout_fileno)
os.dup2(err_pipe_write_fd, stderr_fileno)


class WorkerStandardStreamDispatcher:
def __init__(self):
self.handlers = []
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@
worker = ray._private.worker.global_worker

# Setup log file.
out_file, err_file = node.get_log_file_handles(
get_worker_log_file_name(args.worker_type)
out_file, err_file = node.get_log_file_handle_with_rotation(
get_worker_log_file_name(args.worker_type),
rotation_max_size=args.logging_rotate_bytes,
rotation_file_num=args.logging_rotate_backup_count,
)
configure_log_file(out_file, err_file)
worker.set_out_file(out_file)
Expand Down
37 changes: 20 additions & 17 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,26 @@ def component_file_only_one_log_entry(component):

# TODO(hjiang): Enable after log rotation implemented for user application.
#
# # Test application log, which starts with `worker-`.
# # Should be tested separately with other components since "worker" is a substring
# # of "python-core-worker".
# #
# # Check file count.
# application_stdout_paths = []
# for path in paths:
# if path.stem.startswith("worker-") and re.search(r".*\.out(\.\d+)?", str(path))
# # and path.stat().st_size > 0:
# application_stdout_paths.append(path)
# assert len(application_stdout_paths) == 4, application_stdout_paths

# # Check file content, each file should have one line.
# for cur_path in application_stdout_paths:
# with cur_path.open() as f:
# lines = f.readlines()
# assert len(lines) == 1, lines
# Test application log, which starts with `worker-`.
# Should be tested separately with other components since "worker" is a substring
# of "python-core-worker".
#
# Check file count.
application_stdout_paths = []
for path in paths:
if (
path.stem.startswith("worker-")
and re.search(r".*\.out(\.\d+)?", str(path))
and path.stat().st_size > 0
):
application_stdout_paths.append(path)
assert len(application_stdout_paths) == 4, application_stdout_paths

# Check file content, each file should have one line.
for cur_path in application_stdout_paths:
with cur_path.open() as f:
lines = f.readlines()
assert len(lines) == 1, lines


def test_periodic_event_stats(shutdown_only):
Expand Down
Loading