Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use rotation log file for application logging #49544

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
from ray._private.utils import open_log, try_to_create_directory, try_to_symlink
from ray._private.utils import (
open_log,
open_log_with_rotation,
try_to_create_directory,
try_to_symlink,
)

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
Expand Down Expand Up @@ -913,6 +918,38 @@ def get_log_file_handles(
log_stderr = None if log_stderr_fname is None else open_log(log_stderr_fname)
return log_stdout, log_stderr

def get_log_file_handles_with_rotation(
self,
name: str,
unique: bool = False,
create_out: bool = True,
create_err: bool = True,
rotation_max_size: int = sys.maxsize,
rotation_file_num: int = 1,
) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]:
log_stdout_fname, log_stderr_fname = self.get_log_file_names(
name, unique=unique, create_out=create_out, create_err=create_err
)
log_stdout = (
None
if log_stdout_fname is None
else open_log_with_rotation(
log_stdout_fname,
rotation_max_size=rotation_max_size,
rotation_file_num=rotation_file_num,
)
)
log_stderr = (
None
if log_stderr_fname is None
else open_log_with_rotation(
log_stderr_fname,
rotation_max_size=rotation_max_size,
rotation_file_num=rotation_file_num,
)
)
return log_stdout, log_stderr

def _get_log_file_name(
self,
name: str,
Expand Down
59 changes: 59 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import threading
import time
from urllib.parse import urlencode, unquote, urlparse, parse_qsl, urlunparse
from logging.handlers import RotatingFileHandler
import warnings
from inspect import signature
from pathlib import Path
Expand Down Expand Up @@ -420,6 +421,46 @@ def __getattr__(self, attr):
return getattr(self.stream, attr)


class RotatingFileStream:
"""
A wrapper around RotatingFileHandler that emulates a stream interface.
"""

def __init__(self, path, max_bytes, backup_count, encoding="utf-8", mode="a"):
self.name = path
self.logger = logging.getLogger(f"RotatingLogger_{path}")
self.logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler(
path,
maxBytes=max_bytes,
backupCount=backup_count,
encoding=encoding,
mode=mode,
)
# Only log message content without anything else.
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)

def write(self, message):
if message.strip(): # Avoid writing empty messages
self.logger.debug(message)

def flush(self):
for handler in self.logger.handlers:
handler.flush()

def close(self):
for handler in self.logger.handlers:
handler.close()

def name(self):
return self.name

def seekable(self):
return False


def open_log(path, unbuffered=False, **kwargs):
"""
Opens the log file at `path`, with the provided kwargs being given to
Expand All @@ -436,6 +477,24 @@ def open_log(path, unbuffered=False, **kwargs):
return stream


def open_log_with_rotation(
path, rotation_max_size=sys.maxsize, rotation_file_num=1, unbuffered=False, **kwargs
):
"""Opens the log file at `path`, which is backed by `logging` module supporting
rotation."""
stream = RotatingFileStream(
path,
max_bytes=rotation_max_size,
backup_count=rotation_file_num,
encoding=kwargs.get("encoding", "utf-8"),
mode=kwargs.get("mode", "a"),
)
if unbuffered:
return Unbuffered(stream)
else:
return stream


def get_system_memory(
# For cgroups v1:
memory_limit_filename="/sys/fs/cgroup/memory/memory.limit_in_bytes",
Expand Down
12 changes: 8 additions & 4 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import ray._private.ray_constants as ray_constants
import ray._private.utils
import ray.actor
import sys
from ray._private.async_compat import try_install_uvloop
from ray._private.parameter import RayParams
from ray._private.ray_logging import configure_log_file, get_worker_log_file_name
from ray._private.ray_logging import get_worker_log_file_name
from ray._private.runtime_env.setup_hook import load_and_execute_setup_hook

parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -273,10 +274,13 @@
worker = ray._private.worker.global_worker

# Setup log file.
out_file, err_file = node.get_log_file_handles(
get_worker_log_file_name(args.worker_type)
out_file, err_file = node.get_log_file_handles_with_rotation(
get_worker_log_file_name(args.worker_type),
rotation_max_size=args.logging_rotate_bytes,
rotation_file_num=args.logging_rotate_backup_count,
)
configure_log_file(out_file, err_file)
sys.stdout = out_file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work if user's actor/task calls c++ code and in c++ there is std::cout << "xxxx"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redo in #49548

sys.stderr = err_file
worker.set_out_file(out_file)
worker.set_err_file(err_file)

Expand Down
37 changes: 19 additions & 18 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,25 +273,26 @@ def component_file_only_one_log_entry(component):
f"backup count {backup_count}, file count: {file_cnt}"
)

# TODO(hjiang): Enable after log rotation implemented for user application.
# Test application log, which starts with `worker-`.
# Should be tested separately with other components since "worker" is a substring
# of "python-core-worker".
#
# # Test application log, which starts with `worker-`.
# # Should be tested separately with other components since "worker" is a substring
# # of "python-core-worker".
# #
# # Check file count.
# application_stdout_paths = []
# for path in paths:
# if path.stem.startswith("worker-") and re.search(r".*\.out(\.\d+)?", str(path))
# # and path.stat().st_size > 0:
# application_stdout_paths.append(path)
# assert len(application_stdout_paths) == 4, application_stdout_paths

# # Check file content, each file should have one line.
# for cur_path in application_stdout_paths:
# with cur_path.open() as f:
# lines = f.readlines()
# assert len(lines) == 1, lines
# Check file count.
application_stdout_paths = []
for path in paths:
if (
path.stem.startswith("worker-")
and re.search(r".*\.out(\.\d+)?", str(path))
and path.stat().st_size > 0
):
application_stdout_paths.append(path)
assert len(application_stdout_paths) == 4, application_stdout_paths

# Check file content, each file should have one line.
for cur_path in application_stdout_paths:
with cur_path.open() as f:
lines = f.readlines()
assert len(lines) == 1, lines


def test_periodic_event_stats(shutdown_only):
Expand Down
Loading