Skip to content

Commit

Permalink
Merge pull request #12207 from NixOS/mergify/bp/2.24-maintenance/pr-1…
Browse files Browse the repository at this point in the history
…1343

withFramedSink(): Don't use a thread to monitor the other side (backport #11343)
  • Loading branch information
mergify[bot] authored Jan 10, 2025
2 parents e767570 + 931eb85 commit a658baf
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 52 deletions.
3 changes: 3 additions & 0 deletions src/libstore/daemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
auto pathInfo = [&]() {
// NB: FramedSource must be out of scope before logger->stopWork();
// FIXME: this means that if there is an error
// half-way through, the client will keep sending
// data, since we haven't sent it the error yet.
auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr);
FramedSource source(conn.from);
FileSerialisationMethod dumpMethod;
Expand Down
2 changes: 1 addition & 1 deletion src/libstore/remote-store-connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle
RemoteStore::Connection & operator * () { return *handle; }
RemoteStore::Connection * operator -> () { return &*handle; }

void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

void withFramedSink(std::function<void(Sink & sink)> fun);
};
Expand Down
42 changes: 8 additions & 34 deletions src/libstore/remote-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle()
}
}

void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush)
void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block)
{
handle->processStderr(&daemonException, sink, source, flush);
handle->processStderr(&daemonException, sink, source, flush, block);
}


Expand Down Expand Up @@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::function<void(Sink & sin
{
(*this)->to.flush();

std::exception_ptr ex;

/* Handle log messages / exceptions from the remote on a separate
thread. */
std::thread stderrThread([&]()
{
try {
ReceiveInterrupts receiveInterrupts;
processStderr(nullptr, nullptr, false);
} catch (...) {
ex = std::current_exception();
}
});

Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});

{
FramedSink sink((*this)->to, ex);
FramedSink sink((*this)->to, [&]() {
/* Periodically process stderr messages and exceptions
from the daemon. */
processStderr(nullptr, nullptr, false, false);
});
fun(sink);
sink.flush();
}

stderrThread.join();
if (ex)
std::rethrow_exception(ex);
processStderr(nullptr, nullptr, false);
}

}
15 changes: 11 additions & 4 deletions src/libstore/worker-protocol-connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ static Logger::Fields readFields(Source & from)
return fields;
}

std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush)
std::exception_ptr
WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block)
{
if (flush)
to.flush();
Expand All @@ -41,6 +42,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink

while (true) {

if (!block && !from.hasData())
break;

auto msg = readNum<uint64_t>(from);

if (msg == STDERR_WRITE) {
Expand Down Expand Up @@ -95,8 +99,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
logger->result(act, type, fields);
}

else if (msg == STDERR_LAST)
else if (msg == STDERR_LAST) {
assert(block);
break;
}

else
throw Error("got unknown message type %x from Nix daemon", msg);
Expand Down Expand Up @@ -130,9 +136,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
}
}

void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush)
void WorkerProto::BasicClientConnection::processStderr(
bool * daemonException, Sink * sink, Source * source, bool flush, bool block)
{
auto ex = processStderrReturn(sink, source, flush);
auto ex = processStderrReturn(sink, source, flush, block);
if (ex) {
*daemonException = true;
std::rethrow_exception(ex);
Expand Down
5 changes: 3 additions & 2 deletions src/libstore/worker-protocol-connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection

virtual void closeWrite() = 0;

std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true);
std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true);
void
processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true);

/**
* Establishes connection, negotiating version.
Expand Down
25 changes: 25 additions & 0 deletions src/libutil/serialise.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#ifdef _WIN32
# include <fileapi.h>
# include "windows-error.hh"
#else
# include <poll.h>
#endif


Expand Down Expand Up @@ -158,6 +160,29 @@ bool FdSource::good()
}


bool FdSource::hasData()
{
if (BufferedSource::hasData()) return true;

while (true) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);

struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;

auto n = select(fd + 1, &fds, nullptr, nullptr, &timeout);
if (n < 0) {
if (errno == EINTR) continue;
throw SysError("polling file descriptor");
}
return FD_ISSET(fd, &fds);
}
}


size_t StringSource::read(char * data, size_t len)
{
if (pos == s.size()) throw EndOfFile("end of string reached");
Expand Down
29 changes: 18 additions & 11 deletions src/libutil/serialise.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ struct BufferedSource : Source

size_t read(char * data, size_t len) override;

/**
* Return true if the buffer is not empty.
*/
bool hasData();

protected:
Expand Down Expand Up @@ -163,6 +166,13 @@ struct FdSource : BufferedSource
FdSource & operator=(FdSource && s) = default;

bool good() override;

/**
* Return true if the buffer is not empty after a non-blocking
* read.
*/
bool hasData();

protected:
size_t readUnbuffered(char * data, size_t len) override;
private:
Expand Down Expand Up @@ -530,15 +540,16 @@ struct FramedSource : Source
/**
* Write as chunks in the format expected by FramedSource.
*
* The exception_ptr reference can be used to terminate the stream when you
* detect that an error has occurred on the remote end.
* The `checkError` function can be used to terminate the stream when you
* detect that an error has occurred. It does so by throwing an exception.
*/
struct FramedSink : nix::BufferedSink
{
BufferedSink & to;
std::exception_ptr & ex;
std::function<void()> checkError;

FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
FramedSink(BufferedSink & to, std::function<void()> && checkError)
: to(to), checkError(checkError)
{ }

~FramedSink()
Expand All @@ -553,13 +564,9 @@ struct FramedSink : nix::BufferedSink

void writeUnbuffered(std::string_view data) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
/* Don't send more data if an error has occured. */
checkError();

to << data.size();
to(data);
};
Expand Down

0 comments on commit a658baf

Please sign in to comment.