Skip to content

Commit

Permalink
[HUDI-8849] Reenable TestExpressionIndex (#12607)
Browse files Browse the repository at this point in the history
* Reenable TestExpressionIndex

* Fix test failures
  • Loading branch information
lokeshj1703 authored Jan 10, 2025
1 parent 7c1431d commit c8cde9b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
val expressionIndexRecords = loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
loadTransposed(queryReferencedColumns, readInMemory, expressionIndexRecords, expressionIndexQuery) {
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true, Option.apply(indexDefinition)))
}
} else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
val prunedPartitionAndFileNames = getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, includeLogFiles = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR
Expand Down Expand Up @@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
(prunedPartitions, prunedFiles)
}

protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = {
val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String],
isExpressionIndex: Boolean = false, indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String] = {
val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) {
indexDefinitionOpt.get.getSourceFields.asScala.toSeq
} else {
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
}
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SaveMode, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.scalatest.Ignore

import java.util.stream.Collectors
import scala.collection.JavaConverters

@Ignore
class TestExpressionIndex extends HoodieSparkSqlTestBase {

override protected def beforeAll(): Unit = {
Expand Down Expand Up @@ -777,6 +775,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -856,6 +855,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1471,6 +1471,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1618,6 +1619,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
if (HoodieSparkUtils.gteqSpark3_4) {
Expand Down

0 comments on commit c8cde9b

Please sign in to comment.