Skip to content

Commit

Permalink
KAFKA-13093: Log compaction should write new segments with record ver…
Browse files Browse the repository at this point in the history
…sion v2 (KIP-724) (#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before #18267 - restored, modified and
extended them.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
ijuma authored Jan 9, 2025
1 parent a116753 commit cf7029c
Show file tree
Hide file tree
Showing 29 changed files with 463 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.message.KRaftVersionRecord;
Expand Down Expand Up @@ -137,31 +136,25 @@ public Integer firstBatchSize() {
/**
* Filter the records into the provided ByteBuffer.
*
* @param partition The partition that is filtered (used only for logging)
* @param filter The filter function
* @param destinationBuffer The byte buffer to write the filtered records to
* @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch
* exceeds this after filtering, we log a warning, but the batch will still be
* created.
* @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported. For small
* record batches, allocating a potentially large buffer (64 KB for LZ4) will
* dominate the cost of decompressing and iterating over the records in the
* batch. As such, a supplier that reuses buffers will have a significant
* performance impact.
* @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer
*/
public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) {
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer, BufferSupplier decompressionBufferSupplier) {
return filterTo(batches(), filter, destinationBuffer, decompressionBufferSupplier);
}

/**
* Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
* to the delete horizon of the tombstones or txn markers which are present in the batch.
*/
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
ByteBuffer destinationBuffer, BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
for (MutableRecordBatch batch : batches) {
Expand All @@ -174,15 +167,9 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
if (batchRetention == BatchRetention.DELETE)
continue;

// We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to
// allow for the possibility that a previous version corrupted the log by writing a compressed record batch
// with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
// recopy the messages to the destination buffer.
byte batchMagic = batch.magic();
List<Record> retainedRecords = new ArrayList<>();

final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
batchMagic, true, retainedRecords);
final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult,
filter);
List<Record> retainedRecords = iterationResult.retainedRecords;
boolean containsTombstones = iterationResult.containsTombstones;
boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
long maxOffset = iterationResult.maxOffset;
Expand All @@ -191,8 +178,8 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
// we check if the delete horizon should be set to a new value
// in which case, we need to reset the base timestamp and overwrite the timestamp deltas
// if the batch does not contain tombstones, then we don't need to overwrite batch
boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
&& batch.deleteHorizonMs().isEmpty();
boolean needToSetDeleteHorizon = (containsTombstones || containsMarkerForEmptyTxn) &&
batch.deleteHorizonMs().isEmpty();
if (writeOriginalBatch && !needToSetDeleteHorizon) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
Expand All @@ -202,26 +189,21 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
else
deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
try (final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs)) {
try (final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords,
bufferOutputStream, deleteHorizonMs)) {
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
"(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
"increase their fetch sizes.",
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);

MemoryRecordsBuilder.RecordsInfo info = builder.info();
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
maxOffset, retainedRecords.size(), filteredBatchSize);
}
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) // should never happen
throw new IllegalStateException("Empty batches are only supported for magic v2 and above");

bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), RecordBatch.CURRENT_MAGIC_VALUE, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
Expand All @@ -243,23 +225,18 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
private static BatchFilterResult filterBatch(RecordBatch batch,
BufferSupplier decompressionBufferSupplier,
FilterResult filterResult,
RecordFilter filter,
byte batchMagic,
boolean writeOriginalBatch,
List<Record> retainedRecords) {
long maxOffset = -1;
boolean containsTombstones = false;
RecordFilter filter) {
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
long maxOffset = -1;
boolean containsTombstones = false;
// Convert records with old record versions
boolean writeOriginalBatch = batch.magic() >= RecordBatch.CURRENT_MAGIC_VALUE;
List<Record> retainedRecords = new ArrayList<>();
while (iterator.hasNext()) {
Record record = iterator.next();
filterResult.messagesRead += 1;

if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
writeOriginalBatch = false;

if (record.offset() > maxOffset)
maxOffset = record.offset();

Expand All @@ -272,17 +249,20 @@ private static BatchFilterResult filterBatch(RecordBatch batch,
writeOriginalBatch = false;
}
}
return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
return new BatchFilterResult(retainedRecords, writeOriginalBatch, containsTombstones, maxOffset);
}
}

private static class BatchFilterResult {
private final List<Record> retainedRecords;
private final boolean writeOriginalBatch;
private final boolean containsTombstones;
private final long maxOffset;
private BatchFilterResult(final boolean writeOriginalBatch,
final boolean containsTombstones,
final long maxOffset) {
private BatchFilterResult(List<Record> retainedRecords,
final boolean writeOriginalBatch,
final boolean containsTombstones,
final long maxOffset) {
this.retainedRecords = retainedRecords;
this.writeOriginalBatch = writeOriginalBatch;
this.containsTombstones = containsTombstones;
this.maxOffset = maxOffset;
Expand All @@ -293,23 +273,28 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
List<Record> retainedRecords,
ByteBufferOutputStream bufferOutputStream,
final long deleteHorizonMs) {
byte magic = originalBatch.magic();
Compression compression = Compression.of(originalBatch.compressionType()).build();
TimestampType timestampType = originalBatch.timestampType();
// V0 has no timestamp type or timestamp, so we set the timestamp to CREATE_TIME and timestamp to NO_TIMESTAMP.
// Note that this differs from produce up-conversion where the timestamp type topic config is used and the log append
// time is generated if the config is LOG_APPEND_TIME. The reason for the different behavior is that there is
// no appropriate log append time we can generate at compaction time.
TimestampType timestampType = originalBatch.timestampType() == TimestampType.NO_TIMESTAMP_TYPE ?
TimestampType.CREATE_TIME : originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ?
long baseOffset = originalBatch.magic() >= RecordBatch.MAGIC_VALUE_V2 ?
originalBatch.baseOffset() : retainedRecords.get(0).offset();

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
// Convert records with older record versions to the current one
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, RecordBatch.CURRENT_MAGIC_VALUE,
compression, timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);

for (Record record : retainedRecords)
builder.append(record);

if (magic >= RecordBatch.MAGIC_VALUE_V2)
if (originalBatch.magic() >= RecordBatch.MAGIC_VALUE_V2)
// we must preserve the last offset from the initial batch in order to ensure that the
// last sequence number from the batch remains even after compaction. Otherwise, the producer
// could incorrectly see an out of sequence error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2532,7 +2532,7 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -2542,7 +2542,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2518,7 +2518,7 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -2528,7 +2528,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ public void testFetchWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));

// Remove the last record to simulate compaction
MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
@Override
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
Expand All @@ -1344,7 +1344,7 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
}, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
result.outputBuffer().flip();
MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());

Expand Down
Loading

0 comments on commit cf7029c

Please sign in to comment.