Skip to content

Commit

Permalink
Fix buffer leak during compression
Browse files Browse the repository at this point in the history
Ensure that buffers used for uncompressed message prior to compression
are released.

JAVA-3087
  • Loading branch information
jyemin committed Nov 13, 2018
1 parent 77bddd6 commit 0c01d71
Showing 1 changed file with 34 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,29 @@ public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decod
}
private void sendCommandMessage(final CommandMessage message,
final ByteBufferBsonOutput bsonOutput, final SessionContext sessionContext) {
try {
if (sendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
if (sendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
try {
sendMessage(bsonOutput.getByteBuffers(), message.getId());
} else {
CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), bsonOutput.getByteBuffers(),
sendCompressor,
} finally {
bsonOutput.close();
}
} else {
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
ByteBufferBsonOutput compressedBsonOutput;
try {
CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, sendCompressor,
getMessageSettings(description));
ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput(this);
compressedBsonOutput = new ByteBufferBsonOutput(this);
compressedMessage.encode(compressedBsonOutput, sessionContext);
try {
sendMessage(compressedBsonOutput.getByteBuffers(), message.getId());
} finally {
compressedBsonOutput.close();
}
} finally {
releaseAllBuffers(byteBuffers);
bsonOutput.close();
}
try {
sendMessage(compressedBsonOutput.getByteBuffers(), message.getId());
} finally {
compressedBsonOutput.close();
}
} finally {
bsonOutput.close();
}
}

Expand Down Expand Up @@ -324,11 +330,15 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
sendCommandMessageAsync(message.getId(), decoder, sessionContext, callback, bsonOutput, commandEventSender,
message.isResponseExpected());
} else {
CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), bsonOutput.getByteBuffers(),
sendCompressor,
getMessageSettings(description));
compressedMessage.encode(compressedBsonOutput, sessionContext);
bsonOutput.close();
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
try {
CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, sendCompressor,
getMessageSettings(description));
compressedMessage.encode(compressedBsonOutput, sessionContext);
} finally {
releaseAllBuffers(byteBuffers);
bsonOutput.close();
}
sendCommandMessageAsync(message.getId(), decoder, sessionContext, callback, compressedBsonOutput, commandEventSender,
message.isResponseExpected());
}
Expand All @@ -339,6 +349,12 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
}
}

private void releaseAllBuffers(final List<ByteBuf> byteBuffers) {
for (ByteBuf cur : byteBuffers) {
cur.release();
}
}

private <T> void sendCommandMessageAsync(final int messageId, final Decoder<T> decoder, final SessionContext sessionContext,
final SingleResultCallback<T> callback, final ByteBufferBsonOutput bsonOutput,
final CommandEventSender commandEventSender, final boolean responseExpected) {
Expand Down

0 comments on commit 0c01d71

Please sign in to comment.