diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index beaf311aa197..d8316f0c8611 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -26,7 +26,12 @@ from ray._raylet import GcsClient, get_session_key_from_storage from ray._private.resource_spec import ResourceSpec from ray._private.services import serialize_config, get_address -from ray._private.utils import open_log, try_to_create_directory, try_to_symlink +from ray._private.utils import ( + open_log, + open_log_with_rotation, + try_to_create_directory, + try_to_symlink, +) # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray configures it by default automatically @@ -913,6 +918,38 @@ def get_log_file_handles( log_stderr = None if log_stderr_fname is None else open_log(log_stderr_fname) return log_stdout, log_stderr + def get_log_file_handles_with_rotation( + self, + name: str, + unique: bool = False, + create_out: bool = True, + create_err: bool = True, + rotation_max_size: int = sys.maxsize, + rotation_file_num: int = 1, + ) -> Tuple[Optional[IO[AnyStr]], Optional[IO[AnyStr]]]: + log_stdout_fname, log_stderr_fname = self.get_log_file_names( + name, unique=unique, create_out=create_out, create_err=create_err + ) + log_stdout = ( + None + if log_stdout_fname is None + else open_log_with_rotation( + log_stdout_fname, + rotation_max_size=rotation_max_size, + rotation_file_num=rotation_file_num, + ) + ) + log_stderr = ( + None + if log_stderr_fname is None + else open_log_with_rotation( + log_stderr_fname, + rotation_max_size=rotation_max_size, + rotation_file_num=rotation_file_num, + ) + ) + return log_stdout, log_stderr + def _get_log_file_name( self, name: str, diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index bd23131bebdf..6d66771133d0 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -19,6 +19,7 @@ import threading import time from urllib.parse import urlencode, unquote, urlparse, parse_qsl, urlunparse +from logging.handlers import RotatingFileHandler import warnings from inspect import signature from pathlib import Path @@ -420,6 +421,46 @@ def __getattr__(self, attr): return getattr(self.stream, attr) +class RotatingFileStream: + """ + A wrapper around RotatingFileHandler that emulates a stream interface. + """ + + def __init__(self, path, max_bytes, backup_count, encoding="utf-8", mode="a"): + self.name = path + self.logger = logging.getLogger(f"RotatingLogger_{path}") + self.logger.setLevel(logging.DEBUG) + handler = RotatingFileHandler( + path, + maxBytes=max_bytes, + backupCount=backup_count, + encoding=encoding, + mode=mode, + ) + # Only log message content without anything else. + formatter = logging.Formatter("%(message)s") + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def write(self, message): + if message.strip(): # Avoid writing empty messages + self.logger.debug(message) + + def flush(self): + for handler in self.logger.handlers: + handler.flush() + + def close(self): + for handler in self.logger.handlers: + handler.close() + + def name(self): + return self.name + + def seekable(self): + return False + + def open_log(path, unbuffered=False, **kwargs): """ Opens the log file at `path`, with the provided kwargs being given to @@ -436,6 +477,24 @@ def open_log(path, unbuffered=False, **kwargs): return stream +def open_log_with_rotation( + path, rotation_max_size=sys.maxsize, rotation_file_num=1, unbuffered=False, **kwargs +): + """Opens the log file at `path`, which is backed by `logging` module supporting + rotation.""" + stream = RotatingFileStream( + path, + max_bytes=rotation_max_size, + backup_count=rotation_file_num, + encoding=kwargs.get("encoding", "utf-8"), + mode=kwargs.get("mode", "a"), + ) + if unbuffered: + return Unbuffered(stream) + else: + return stream + + def get_system_memory( # For cgroups v1: memory_limit_filename="/sys/fs/cgroup/memory/memory.limit_in_bytes", diff --git a/python/ray/_private/workers/default_worker.py b/python/ray/_private/workers/default_worker.py index 2a6ffe2bcce9..b4292025cee8 100644 --- a/python/ray/_private/workers/default_worker.py +++ b/python/ray/_private/workers/default_worker.py @@ -9,9 +9,10 @@ import ray._private.ray_constants as ray_constants import ray._private.utils import ray.actor +import sys from ray._private.async_compat import try_install_uvloop from ray._private.parameter import RayParams -from ray._private.ray_logging import configure_log_file, get_worker_log_file_name +from ray._private.ray_logging import get_worker_log_file_name from ray._private.runtime_env.setup_hook import load_and_execute_setup_hook parser = argparse.ArgumentParser( @@ -273,10 +274,13 @@ worker = ray._private.worker.global_worker # Setup log file. - out_file, err_file = node.get_log_file_handles( - get_worker_log_file_name(args.worker_type) + out_file, err_file = node.get_log_file_handles_with_rotation( + get_worker_log_file_name(args.worker_type), + rotation_max_size=args.logging_rotate_bytes, + rotation_file_num=args.logging_rotate_backup_count, ) - configure_log_file(out_file, err_file) + sys.stdout = out_file + sys.stderr = err_file worker.set_out_file(out_file) worker.set_err_file(err_file) diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index 466c5eb567b5..0f5bac7d2ab6 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -273,25 +273,26 @@ def component_file_only_one_log_entry(component): f"backup count {backup_count}, file count: {file_cnt}" ) - # TODO(hjiang): Enable after log rotation implemented for user application. + # Test application log, which starts with `worker-`. + # Should be tested separately with other components since "worker" is a substring + # of "python-core-worker". # - # # Test application log, which starts with `worker-`. - # # Should be tested separately with other components since "worker" is a substring - # # of "python-core-worker". - # # - # # Check file count. - # application_stdout_paths = [] - # for path in paths: - # if path.stem.startswith("worker-") and re.search(r".*\.out(\.\d+)?", str(path)) - # # and path.stat().st_size > 0: - # application_stdout_paths.append(path) - # assert len(application_stdout_paths) == 4, application_stdout_paths - - # # Check file content, each file should have one line. - # for cur_path in application_stdout_paths: - # with cur_path.open() as f: - # lines = f.readlines() - # assert len(lines) == 1, lines + # Check file count. + application_stdout_paths = [] + for path in paths: + if ( + path.stem.startswith("worker-") + and re.search(r".*\.out(\.\d+)?", str(path)) + and path.stat().st_size > 0 + ): + application_stdout_paths.append(path) + assert len(application_stdout_paths) == 4, application_stdout_paths + + # Check file content, each file should have one line. + for cur_path in application_stdout_paths: + with cur_path.open() as f: + lines = f.readlines() + assert len(lines) == 1, lines def test_periodic_event_stats(shutdown_only):