diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala index 8dd9b62a62c1..983b6482d6f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala @@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession, val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices) val expressionIndexRecords = loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory) loadTransposed(queryReferencedColumns, readInMemory, expressionIndexRecords, expressionIndexQuery) { - transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true)) + transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true, Option.apply(indexDefinition))) } } else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) { val prunedPartitionAndFileNames = getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, includeLogFiles = true) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala index 06c012ac6089..9cd53a75ff5b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala @@ -21,7 +21,7 @@ package org.apache.hudi import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.keygen.KeyGenUtils import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR @@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (prunedPartitions, prunedFiles) } - protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = { - val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq + protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], + isExpressionIndex: Boolean = false, indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String] = { + val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) { + indexDefinitionOpt.get.getSourceFields.asScala.toSeq + } else { + metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq + } val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And) if (indexFilter.equals(TrueLiteral)) { // if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala index 4a57e1ed59b3..9d8c5613f1b1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala @@ -57,12 +57,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, SaveMode, functions} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test -import org.scalatest.Ignore import java.util.stream.Collectors import scala.collection.JavaConverters -@Ignore class TestExpressionIndex extends HoodieSparkSqlTestBase { override protected def beforeAll(): Unit = { @@ -777,6 +775,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { |location '$basePath' |""".stripMargin) + setCompactionConfigs(tableType) spark.sql("set hoodie.parquet.small.file.limit=0") if (HoodieSparkUtils.gteqSpark3_4) { spark.sql("set spark.sql.defaultColumn.enabled=false") @@ -856,6 +855,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { |location '$basePath' |""".stripMargin) + setCompactionConfigs(tableType) spark.sql("set hoodie.parquet.small.file.limit=0") if (HoodieSparkUtils.gteqSpark3_4) { spark.sql("set spark.sql.defaultColumn.enabled=false") @@ -1471,6 +1471,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { |location '$basePath' |""".stripMargin) + setCompactionConfigs(tableType) spark.sql("set hoodie.parquet.small.file.limit=0") if (HoodieSparkUtils.gteqSpark3_4) { spark.sql("set spark.sql.defaultColumn.enabled=false") @@ -1618,6 +1619,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase { |location '$basePath' |""".stripMargin) + setCompactionConfigs(tableType) spark.sql("set hoodie.parquet.small.file.limit=0") spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict") if (HoodieSparkUtils.gteqSpark3_4) {