Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use pipe to redirect stdout #49548

Closed
wants to merge 13 commits into from

Conversation

dentiny
Copy link
Contributor

@dentiny dentiny commented Jan 2, 2025

A followup for oversized logging file. It supports rotated logging file via overriding stdout and stderr with logging module.

High-level abstract:

  • Application streams to pipefd write
  • Background threads poll pipefd read side and continues streaming the target file via logging

Update: This PR only does rotation for stdout files; as previous, stderr are less important compared with stdout, and much trickier to get right because more unit tests are listening to the stderr.

Closes #49563

@dentiny dentiny force-pushed the hjiang/pipe-application-log branch from e2f2050 to 2dad6b1 Compare January 2, 2025 22:34
@dentiny dentiny added the go add ONLY when ready to merge, run all tests label Jan 3, 2025
python/ray/_private/log_rotation_with_pipe.py Outdated Show resolved Hide resolved
python/ray/_private/log_rotation_with_pipe.py Outdated Show resolved Hide resolved
python/ray/_private/ray_logging/__init__.py Outdated Show resolved Hide resolved
@dentiny dentiny requested a review from jjyao January 3, 2025 06:11
@dentiny dentiny force-pushed the hjiang/pipe-application-log branch from b3ef8de to 3129f18 Compare January 3, 2025 06:18
Comment on lines +12 to +44
class PipeStreamWriteHandle(IO[AnyStr]):
def __init__(self, write_fd: int, log_name: str):
self.name = log_name
self.write_fd = write_fd
self.stream = os.fdopen(write_fd, "w")

def name(self):
return self.name

def fileno(self):
return self.write_fd

def write(self, s: str) -> int:
return self.stream.write(s)

def flush(self):
self.stream.flush()

def close(self):
self.stream.close()

def isatty(self) -> bool:
return False

def readable(self) -> bool:
return False

def writable(self) -> bool:
return True

def seekable(self) -> bool:
return False

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configure_log_file only requires this object has fileno() method not others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write, flush, name, fileno is definitely needed --- I have met issues without any of them;
Others are suggested by gpt, didn't verify one by one.

My personal opinion is it doesn't hurt to implement these overloads;

  • All these overload functions are straightforward and short enough
  • Some attribute function help explain the class, i.e. readable and writable.

@dentiny dentiny requested a review from jjyao January 3, 2025 06:33
@dentiny
Copy link
Contributor Author

dentiny commented Jan 3, 2025

There're CI failure, I will take a look at daytime.

Copy link
Collaborator

@MengjinYan MengjinYan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!

Also, a minor general comment: probably we should also update the wiki page here

Ray supports log rotation of log files. Note that not all components support log rotation. (Raylet, Python, and Java worker logs do not rotate).
to indicate the change.

python/ray/_private/log_rotation_with_pipe.py Outdated Show resolved Hide resolved
@dentiny dentiny requested a review from MengjinYan January 4, 2025 02:25
Signed-off-by: dentiny <[email protected]>
@dentiny dentiny requested a review from a team as a code owner January 6, 2025 19:03
Signed-off-by: dentiny <[email protected]>
@dentiny dentiny changed the title [core] Use pipe to redirect stdout/stderr [core] Use pipe to redirect stdout Jan 7, 2025
Signed-off-by: dentiny <[email protected]>

atexit.register(cleanup)

return pipe_write_stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the PipeStreamWriteHandle wrapper, can we directly return open(write_fd)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open takes a path-like object, not a fd
Ref: https://docs.python.org/3/library/functions.html#open

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fdopen is the interface we want to wrap fd into a file object
Ref: https://docs.python.org/3/library/os.html#os.fdopen

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use fdopen then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to you plan to provide name and write then?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. write() I think is supported by the object returned by fdopen()? The issue is name then. I guess you can do

file_like_io_object = os.fdopen(write_fd)
file_like_io_object.name = "xxxx"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried

Traceback (most recent call last):
  File "logging_redirection_test.py", line 13, in <module>
    _LOG_STDOUT_FNAME, _ROTATION_MAX_BYTE, _ROTATION_FILE_NUM)
  File "/home/ubuntu/py-logging-redirection/pipe_logging.py", line 90, in open_pipe_with_rotation
    pipe_write_stream.name = fname
AttributeError: attribute 'name' of '_io.TextIOWrapper' objects is not writable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code snippet:

pipe_write_stream = os.fdopen(write_fd, "w")
pipe_write_stream.name = fname

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can do https://stackoverflow.com/questions/60622854/how-to-instantiate-an-io-textiowrapper-object-with-a-name-attribute so we don't need to implement our own IO subclass with all these methods.

@dentiny dentiny requested a review from jjyao January 7, 2025 07:03

By default, logs rotate when they reach 512MB (maxBytes), and have a maximum of five backup files (backupCount). Indexes are appended to all backup files (e.g., `raylet.out.1`)
To change the log rotation configuration, specify environment variables. For example,
By default, ray doesn't rotate for internal components. To change the log rotation configuration, specify environment variables. For example,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some files that we have rotation by default? What are they?

with cond:
# An empty line is read over, which indicates write side has
# closed.
if line == _EOF_TOKEN:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need _EOF_TOKEN? If you close the pipe, reader will know? readline() should return empty string to indicate EOF.

Comment on lines +111 to +112
read_thread = threading.Thread(target=read_log_content_from_pipe, daemon=True)
dump_thread = threading.Thread(target=dump_log_content_to_buffer, daemon=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give these two threads name?

"log file, default is "
f"{ray_constants.LOGGING_ROTATE_BYTES} bytes.",
default=sys.maxsize,
help="Specify the max bytes for rotating " "log file, default no rotation.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help="Specify the max bytes for rotating " "log file, default no rotation.",
help="Specify the max bytes for rotating log file, default no rotation.",

logger.info(cur_content)

# Python `atexit` hook only invokes when no other non-daemon threads running.
read_thread = threading.Thread(target=read_log_content_from_pipe, daemon=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually thinking more, I think it's better to implement these in C++ so that the same implementation can be used for all language frontends (e.g. python , java, c++). The current way only works for python workers. I think implementing in C++ should be too much harder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will create another PR implemented in C++, I will try to get it out before tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] Log rotation on workers
4 participants