Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: make transfer list behave like web MessagePort #29319

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength =
V(dns_soa_string, "SOA") \
V(dns_srv_string, "SRV") \
V(dns_txt_string, "TXT") \
V(done_string, "done") \
V(duration_string, "duration") \
V(emit_warning_string, "emitWarning") \
V(encoding_string, "encoding") \
Expand Down Expand Up @@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength =
V(modulus_string, "modulus") \
V(name_string, "name") \
V(netmask_string, "netmask") \
V(next_string, "next") \
V(nistcurve_string, "nistCurve") \
V(nsname_string, "nsname") \
V(ocsp_request_string, "OCSPRequest") \
Expand Down Expand Up @@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength =
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
V(transfer_string, "transfer") \
V(ttl_string, "ttl") \
V(type_string, "type") \
V(uid_string, "uid") \
Expand Down
217 changes: 144 additions & 73 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Symbol;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
Expand Down Expand Up @@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
Local<Value> transfer_list_v,
const TransferList& transfer_list_v,
Local<Object> source_port) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
Expand All @@ -317,72 +318,66 @@ Maybe<bool> Message::Serialize(Environment* env,
delegate.serializer = &serializer;

std::vector<Local<ArrayBuffer>> array_buffers;
if (transfer_list_v->IsArray()) {
Local<Array> transfer_list = transfer_list_v.As<Array>();
uint32_t length = transfer_list->Length();
for (uint32_t i = 0; i < length; ++i) {
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
Local<Value> entry = transfer_list_v[i];
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
continue;
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
}

serializer.WriteHeader();
Expand Down Expand Up @@ -664,7 +659,7 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {

Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
Local<Value> transfer_v) {
const TransferList& transfer_v) {
Isolate* isolate = env->isolate();
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
Expand Down Expand Up @@ -705,20 +700,98 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Just(true);
}

static Maybe<bool> ReadIterable(Environment* env,
Local<Context> context,
// NOLINTNEXTLINE(runtime/references)
TransferList& transfer_list,
Local<Value> object) {
if (!object->IsObject()) return Just(false);

if (object->IsArray()) {
Local<Array> arr = object.As<Array>();
size_t length = arr->Length();
transfer_list.AllocateSufficientStorage(length);
for (size_t i = 0; i < length; i++) {
if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
return Nothing<bool>();
}
return Just(true);
}

Isolate* isolate = env->isolate();
Local<Value> iterator_method;
if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
.ToLocal(&iterator_method)) return Nothing<bool>();
if (!iterator_method->IsFunction()) return Just(false);

Local<Value> iterator;
if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
.ToLocal(&iterator)) return Nothing<bool>();
if (!iterator->IsObject()) return Just(false);

Local<Value> next;
if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
return Nothing<bool>();
if (!next->IsFunction()) return Just(false);

std::vector<Local<Value>> entries;
while (env->can_call_into_js()) {
Local<Value> result;
if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
.ToLocal(&result)) return Nothing<bool>();
if (!result->IsObject()) return Just(false);

Local<Value> done;
if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
return Nothing<bool>();
if (done->BooleanValue(isolate)) break;

Local<Value> val;
if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
return Nothing<bool>();
entries.push_back(val);
}

transfer_list.AllocateSufficientStorage(entries.size());
std::copy(entries.begin(), entries.end(), &transfer_list[0]);
return Just(true);
}

void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();

if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}

if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
// Browsers ignore null or undefined, and otherwise accept an array or an
// options object.
// TODO(addaleax): Add support for an options object and generic sequence
// support.
// Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional transferList argument must be an array");
"Optional transferList argument must be an iterable");
}

TransferList transfer_list;
if (args[1]->IsObject()) {
bool was_iterable;
if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
return;
if (!was_iterable) {
Local<Value> transfer_option;
if (!args[1].As<Object>()->Get(context, env->transfer_string())
.ToLocal(&transfer_option)) return;
if (!transfer_option->IsUndefined()) {
if (!ReadIterable(env, context, transfer_list, transfer_option)
.To(&was_iterable)) return;
if (!was_iterable) {
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional options.transfer argument must be an iterable");
}
}
}
}

MessagePort* port = Unwrap<MessagePort>(args.This());
Expand All @@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
// transfers.
if (port == nullptr) {
Message msg;
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();
USE(msg.Serialize(env, context, args[0], args[1], obj));
USE(msg.Serialize(env, context, args[0], transfer_list, obj));
return;
}

port->PostMessage(env, args[0], args[1]);
port->PostMessage(env, args[0], transfer_list);
}

void MessagePort::Start() {
Expand Down
6 changes: 4 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace worker {
class MessagePortData;
class MessagePort;

typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;

// Represents a single communication message.
class Message : public MemoryRetainer {
public:
Expand Down Expand Up @@ -44,7 +46,7 @@ class Message : public MemoryRetainer {
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list,
const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());

Expand Down Expand Up @@ -149,7 +151,7 @@ class MessagePort : public HandleWrap {
// serialized with transfers, then silently discarded.
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Value> message,
v8::Local<v8::Value> transfer);
const TransferList& transfer);

// Start processing messages on this port as a receiving end.
void Start();
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-worker-message-port-terminate-transfer-list.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';
const common = require('../common');

const { parentPort, MessageChannel, Worker } = require('worker_threads');

// Do not use isMainThread so that this test itself can be run inside a Worker.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = 1;
const w = new Worker(__filename);
w.once('message', common.mustCall(() => {
w.once('message', common.mustNotCall());
setTimeout(() => w.terminate(), 100);
}));
} else {
const { port1 } = new MessageChannel();

parentPort.postMessage('ready');

// Make sure we don’t end up running JS after the infinite loop is broken.
port1.postMessage({}, {
transfer: (function*() { while (true); })()
});

parentPort.postMessage('UNREACHABLE');
process.kill(process.pid, 'SIGINT');
}
Loading