Skip to content

Commit

Permalink
[HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionS…
Browse files Browse the repository at this point in the history
…trategy to improve performance (#12537)
  • Loading branch information
TheR1sing3un authored Jan 11, 2025
1 parent 45486bf commit 257979f
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
public static final String SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
public static final String PLAN_PARTITION_FILTER_MODE =
Expand Down Expand Up @@ -653,9 +655,11 @@ private void validate() {
ValidationUtils.checkArgument(
planStrategy.equalsIgnoreCase(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
"Consistent hashing bucket index only supports clustering plan strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
String clusteringConfigString = clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME);
ValidationUtils.checkArgument(
clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
clusteringConfigString.equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY) || clusteringConfigString.equals(SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY + " or "
+ SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY);
}
}
}
Expand Down Expand Up @@ -683,7 +687,7 @@ private String getDefaultPlanStrategyClassName(EngineType engineType) {
private String getDefaultExecutionStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return isConsistentHashingBucketIndex() ? SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY : SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
return isConsistentHashingBucketIndex() ? SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY : SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,31 @@

package org.apache.hudi.table.action.cluster.strategy;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;

/**
Expand All @@ -41,12 +55,14 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> implements Seriali
private final transient HoodieEngineContext engineContext;
protected final HoodieWriteConfig writeConfig;
protected final HoodieRecordType recordType;
protected final Schema readerSchemaWithMetaFields;

public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;
this.engineContext = engineContext;
this.recordType = table.getConfig().getRecordMerger().getRecordType();
this.readerSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
}

/**
Expand All @@ -67,4 +83,52 @@ protected HoodieEngineContext getEngineContext() {
protected HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}

protected ClosableIterator<HoodieRecord<T>> getRecordIteratorWithLogFiles(ClusteringOperation operation, String instantTime, long maxMemory,
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> baseFileReaderOpt) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(operation.getDeltaFilePaths())
.withReaderSchema(readerSchemaWithMetaFields)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemory)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(operation.getPartitionPath())
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.withTableMetaClient(table.getMetaClient())
.build();

try {
return new HoodieFileSliceReader(baseFileReaderOpt, scanner, readerSchemaWithMetaFields, tableConfig.getPreCombineField(), config.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading file slices", e);
}
}

protected ClosableIterator<HoodieRecord<T>> getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, HoodieFileReader baseFileReader) {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
ClosableIterator<HoodieRecord> baseRecordsIterator;
try {
baseRecordsIterator = baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading base file", e);
}
return new CloseableMappingIterator(
baseRecordsIterator,
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, writeConfig.getProps(), keyGeneratorOpt));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ public void testConsistentBucketIndexDefaultClusteringConfig() {
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
.build();
assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY, writeConfig.getClusteringPlanStrategyClass());
assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY, writeConfig.getClusteringExecutionStrategyClass());
assertEquals(HoodieClusteringConfig.SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY, writeConfig.getClusteringExecutionStrategyClass());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,23 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -57,12 +50,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
Expand Down Expand Up @@ -167,55 +158,19 @@ private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup clusteri
private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
List<HoodieRecord<T>> records = new ArrayList<>();

clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
Option<HoodieFileReader> baseFileReader = Option.empty();
HoodieMergedLogRecordScanner scanner = null;
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.build();

baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieIOFactory.getIOFactory(table.getStorage()).getReaderFactory(recordType)
.getFileReader(config, new StoragePath(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Option<BaseKeyGenerator> keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
Iterator<HoodieRecord<T>> fileSliceReader = new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
fileSliceReader.forEachRemaining(records::add);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
} finally {
if (scanner != null) {
scanner.close();
}
if (baseFileReader.isPresent()) {
baseFileReader.get().close();
}
}
});
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);

List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>(clusteringOps.size());
clusteringOps.forEach(op -> suppliers.add(() -> {
Option<HoodieFileReader> baseFileReader = ClusteringUtils.getBaseFileReader(getHoodieTable().getStorage(), recordType, getWriteConfig(), op.getDataFilePath());
return getRecordIteratorWithLogFiles(op, instantTime, maxMemoryPerCompaction, Option.empty(), baseFileReader);
}));
LazyConcatenatingIterator<HoodieRecord<T>> lazyIterator = new LazyConcatenatingIterator<>(suppliers);

lazyIterator.forEachRemaining(records::add);
lazyIterator.close();
return records;
}

Expand All @@ -224,21 +179,17 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
*/
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(getHoodieTable().getStorage())
.getReaderFactory(recordType)
.getFileReader(getHoodieTable().getConfig(), new StoragePath(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into the records(List).
recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>(clusteringOps.size());
clusteringOps.forEach(
op -> suppliers.add(() -> {
Option<HoodieFileReader> baseFileReaderOpt = ClusteringUtils.getBaseFileReader(getHoodieTable().getStorage(), recordType, getWriteConfig(), op.getDataFilePath());
ValidationUtils.checkArgument(baseFileReaderOpt.isPresent(), "Base file reader should be present for base file only read.");
return getRecordIteratorWithBaseFileOnly(Option.empty(), baseFileReaderOpt.get());
}));
LazyConcatenatingIterator<HoodieRecord<T>> lazyIterator = new LazyConcatenatingIterator<>(suppliers);

lazyIterator.forEachRemaining(records::add);
lazyIterator.close();
return records;
}
}
Loading

0 comments on commit 257979f

Please sign in to comment.