diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 08f30a7c062..c2166145913 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -21,6 +21,7 @@ import org.apache.spark.ml.param.shared.{HasFeaturesCol => HasFeaturesColSpark, import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.sql._ import org.apache.spark.sql.types._ +import org.slf4j.Logger import java.net.{ServerSocket, Socket} import java.util.concurrent.Executors @@ -277,9 +278,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine private def generateDataset(ac: BaseAggregatedColumns, referenceDataset: Option[LightGBMDataset], schema: StructType, - datasetParams: String): LightGBMDataset = { + datasetParams: String, + log: Logger): LightGBMDataset = { val dataset = try { - val datasetInner = ac.generateDataset(referenceDataset, datasetParams) + val datasetInner = ac.generateDataset(referenceDataset, datasetParams, log) getOptGroupCol.foreach(_ => datasetInner.addGroupColumn(ac.getGroups)) datasetInner.setFeatureNames(getSlotNamesWithMetadata(schema(getFeaturesCol)), ac.getNumCols) datasetInner @@ -301,13 +303,13 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine val columnParams = getColumnParams val datasetParams = getDatasetParams(trainParams.categoricalFeatures, trainParams.executionParams.numThreads) beforeGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) - val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams) + val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams, log) try { afterGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams) val validDatasetOpt = validationData.map { vd => beforeGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) - val out = generateDataset(vd, Some(trainDataset), schema, datasetParams) + val out = generateDataset(vd, Some(trainDataset), schema, datasetParams, log) afterGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams) out } diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/SharedState.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/SharedState.scala index 45d103730bd..9dc200754df 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/SharedState.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/SharedState.scala @@ -12,37 +12,25 @@ import org.slf4j.Logger import java.util.concurrent.CountDownLatch -class SharedState(columnParams: ColumnParams, - schema: StructType, - trainParams: TrainParams) { - val mainExecutorWorker: Long = LightGBMUtils.getTaskId - val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode +class SharedDatasetState(columnParams: ColumnParams, + schema: StructType, + trainParams: TrainParams, + sharedState: SharedState) { val chunkSize: Int = trainParams.executionParams.chunkSize + val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode val matrixType: String = trainParams.executionParams.matrixType lazy val denseAggregatedColumns: BaseDenseAggregatedColumns = new DenseSyncAggregatedColumns(chunkSize) lazy val sparseAggregatedColumns: BaseSparseAggregatedColumns = new SparseSyncAggregatedColumns(chunkSize) - def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = { - if (matrixType == "auto") { - sampleRowsForArrayType(rowsIter, columnParams) - } else if (matrixType == "sparse") { - (rowsIter: Iterator[Row], true) - } else if (matrixType == "dense") { - (rowsIter: Iterator[Row], false) - } else { - throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}") - } - } - def prep(iter: Iterator[Row]): BaseChunkedColumns = { val (concatRowsIter: Iterator[Row], isSparseHere: Boolean) = getArrayType(iter, matrixType) val peekableIter = new PeekingIterator(concatRowsIter) // Note: the first worker sets "is sparse", other workers read it - linkIsSparse(isSparseHere) + sharedState.linkIsSparse(isSparseHere) - if (!isSparse.get) { + if (!sharedState.isSparse.get) { new DenseChunkedColumns(peekableIter, columnParams, schema, chunkSize) } else { new SparseChunkedColumns(peekableIter, columnParams, schema, chunkSize, useSingleDataset) @@ -50,7 +38,7 @@ class SharedState(columnParams: ColumnParams, } def merge(ts: BaseChunkedColumns): BaseAggregatedColumns = { - val isSparseVal = isSparse.get + val isSparseVal = sharedState.isSparse.get val aggregatedColumns = if (!isSparseVal) { if (useSingleDataset) denseAggregatedColumns else new DenseAggregatedColumns(chunkSize) @@ -68,6 +56,41 @@ class SharedState(columnParams: ColumnParams, aggregatedColumns } + @volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0) + + def incrementArrayProcessedSignal(log: Logger): Int = { + this.synchronized { + val count = arrayProcessedSignal.getCount.toInt + 1 + arrayProcessedSignal = new CountDownLatch(count) + log.info(s"Task incrementing ArrayProcessedSignal to $count") + count + } + } + + def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = { + if (matrixType == "auto") { + sampleRowsForArrayType(rowsIter, columnParams) + } else if (matrixType == "sparse") { + (rowsIter: Iterator[Row], true) + } else if (matrixType == "dense") { + (rowsIter: Iterator[Row], false) + } else { + throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}") + } + } +} + +class SharedState(columnParams: ColumnParams, + schema: StructType, + trainParams: TrainParams) { + val mainExecutorWorker: Long = LightGBMUtils.getTaskId + val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode + val chunkSize: Int = trainParams.executionParams.chunkSize + val matrixType: String = trainParams.executionParams.matrixType + + val datasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this) + val validationDatasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this) + @volatile var isSparse: Option[Boolean] = None def linkIsSparse(isSparse: Boolean): Unit = { @@ -80,15 +103,9 @@ class SharedState(columnParams: ColumnParams, } } - @volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0) - def incrementArrayProcessedSignal(log: Logger): Int = { - this.synchronized { - val count = arrayProcessedSignal.getCount.toInt + 1 - arrayProcessedSignal = new CountDownLatch(count) - log.info(s"Task incrementing ArrayProcessedSignal to $count") - count - } + datasetState.incrementArrayProcessedSignal(log) + validationDatasetState.incrementArrayProcessedSignal(log) } @volatile var doneSignal: CountDownLatch = new CountDownLatch(0) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/TaskTrainingMethods.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/TaskTrainingMethods.scala index bd09b4d784a..41615863495 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/TaskTrainingMethods.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/TaskTrainingMethods.scala @@ -38,13 +38,12 @@ object TaskTrainingMethods { validationData: Option[Broadcast[Array[Row]]], sharedState: SharedState): (BaseAggregatedColumns, Option[BaseAggregatedColumns]) = { val aggregatedColumns = { - val prepAggregatedColumns = sharedState.prep(inputRows) - sharedState.merge(prepAggregatedColumns) + val prepAggregatedColumns = sharedState.datasetState.prep(inputRows) + sharedState.datasetState.merge(prepAggregatedColumns) } - val aggregatedValidationColumns = validationData.map { data => - val prepAggregatedColumns = sharedState.prep(data.value.toIterator) - sharedState.merge(prepAggregatedColumns) + val prepAggregatedColumns = sharedState.validationDatasetState.prep(data.value.toIterator) + sharedState.validationDatasetState.merge(prepAggregatedColumns) } (aggregatedColumns, aggregatedValidationColumns) } diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/DatasetAggregator.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/DatasetAggregator.scala index 33ba5cda660..6d2b1c7e161 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/DatasetAggregator.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/dataset/DatasetAggregator.scala @@ -11,6 +11,7 @@ import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.linalg.{DenseVector, SparseVector} import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType +import org.slf4j.Logger import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ListBuffer @@ -211,7 +212,7 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) { initScores.foreach(_.delete()) } - def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset + def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String, log: Logger): LightGBMDataset def incrementCount(chunkedCols: BaseChunkedColumns): Unit = { rowCount.addAndGet(chunkedCols.rowCount) @@ -327,14 +328,17 @@ private[lightgbm] abstract class BaseDenseAggregatedColumns(chunkSize: Int) exte def getFeatures: DoubleSwigArray = features - def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = { + def generateDataset(referenceDataset: Option[LightGBMDataset], + datasetParams: String, log: Logger): LightGBMDataset = { val pointer = lightgbmlib.voidpp_handle() try { + val numRows = rowCount.get().toInt + log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns") // Generate the dataset for features LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMat( lightgbmlib.double_to_voidp_ptr(features.array), lightgbmlibConstants.C_API_DTYPE_FLOAT64, - rowCount.get().toInt, + numRows, numCols, 1, datasetParams, @@ -434,9 +438,12 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int) } } - def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = { + def generateDataset(referenceDataset: Option[LightGBMDataset], + datasetParams: String, log: Logger): LightGBMDataset = { indexPointerArrayIncrement(getIndexPointers.array) val pointer = lightgbmlib.voidpp_handle() + val numRows = indptrCount.get() - 1 + log.info(s"LightGBM task generating sparse dataset with $numRows rows and $numCols columns") // Generate the dataset for features LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSR( lightgbmlib.int_to_voidp_ptr(indexPointers.array), diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/params/LightGBMParams.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/params/LightGBMParams.scala index b7c27f88216..f50123dbf19 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/params/LightGBMParams.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/params/LightGBMParams.scala @@ -61,7 +61,7 @@ trait LightGBMExecutionParams extends Wrappable { val useSingleDatasetMode = new BooleanParam(this, "useSingleDatasetMode", "Use single dataset execution mode to create a single native dataset per executor (singleton) " + "to reduce memory and communication overhead. Note this is disabled when running spark in local mode.") - setDefault(useSingleDatasetMode -> false) + setDefault(useSingleDatasetMode -> true) def getUseSingleDatasetMode: Boolean = $(useSingleDatasetMode) def setUseSingleDatasetMode(value: Boolean): this.type = set(useSingleDatasetMode, value) diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMClassifier.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMClassifier.scala index 8c52c73951c..24f9a75cb63 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMClassifier.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split1/VerifyLightGBMClassifier.scala @@ -329,11 +329,14 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM } } val scoredDF1 = baseModel + .setUseSingleDatasetMode(false) .fit(pimaDF) .transform(pimaDF) + // Note: run for more iterations than non-custom objective to prevent flakiness // Note we intentionally overfit here on the training data and don't do a split val scoredDF2 = baseModel + .setUseSingleDatasetMode(false) .setFObj(new LogLikelihood()) .setNumIterations(300) .fit(pimaDF) diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/VerifyLightGBMRegressor.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/VerifyLightGBMRegressor.scala index cbca56fc8a1..53177011278 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/VerifyLightGBMRegressor.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split2/VerifyLightGBMRegressor.scala @@ -27,7 +27,7 @@ class VerifyLightGBMRegressor extends Benchmarks verifyLearnerOnRegressionCsvFile("energyefficiency2012_data.train.csv", "Y1", 0, Some(Seq("X1", "X2", "X3", "X4", "X5", "X6", "X7", "X8", "Y2"))) verifyLearnerOnRegressionCsvFile("airfoil_self_noise.train.csv", "Scaled sound pressure level", 1) - verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -3) + verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -4) verifyLearnerOnRegressionCsvFile("machine.train.csv", "ERP", -2) // TODO: Spark doesn't seem to like the column names here because of '.', figure out how to read in the data // verifyLearnerOnRegressionCsvFile("slump_test.train.csv", "Compressive Strength (28-day)(Mpa)", 2)