Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add logging for number of columns and rows when creating datasets, set useSingleDatasetMode=True by default, fix validation dataset handling #1222

Merged
merged 2 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,33 @@ import org.slf4j.Logger

import java.util.concurrent.CountDownLatch

class SharedState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams) {
val mainExecutorWorker: Long = LightGBMUtils.getTaskId
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
class SharedDatasetState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams,
sharedState: SharedState) {
val chunkSize: Int = trainParams.executionParams.chunkSize
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
val matrixType: String = trainParams.executionParams.matrixType

lazy val denseAggregatedColumns: BaseDenseAggregatedColumns = new DenseSyncAggregatedColumns(chunkSize)

lazy val sparseAggregatedColumns: BaseSparseAggregatedColumns = new SparseSyncAggregatedColumns(chunkSize)

def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = {
if (matrixType == "auto") {
sampleRowsForArrayType(rowsIter, columnParams)
} else if (matrixType == "sparse") {
(rowsIter: Iterator[Row], true)
} else if (matrixType == "dense") {
(rowsIter: Iterator[Row], false)
} else {
throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}")
}
}

def prep(iter: Iterator[Row]): BaseChunkedColumns = {
val (concatRowsIter: Iterator[Row], isSparseHere: Boolean) = getArrayType(iter, matrixType)
val peekableIter = new PeekingIterator(concatRowsIter)
// Note: the first worker sets "is sparse", other workers read it
linkIsSparse(isSparseHere)
sharedState.linkIsSparse(isSparseHere)

if (!isSparse.get) {
if (!sharedState.isSparse.get) {
new DenseChunkedColumns(peekableIter, columnParams, schema, chunkSize)
} else {
new SparseChunkedColumns(peekableIter, columnParams, schema, chunkSize, useSingleDataset)
}
}

def merge(ts: BaseChunkedColumns): BaseAggregatedColumns = {
val isSparseVal = isSparse.get
val isSparseVal = sharedState.isSparse.get
val aggregatedColumns = if (!isSparseVal) {
if (useSingleDataset) denseAggregatedColumns
else new DenseAggregatedColumns(chunkSize)
Expand All @@ -68,6 +56,41 @@ class SharedState(columnParams: ColumnParams,
aggregatedColumns
}

@volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0)

def incrementArrayProcessedSignal(log: Logger): Int = {
this.synchronized {
val count = arrayProcessedSignal.getCount.toInt + 1
arrayProcessedSignal = new CountDownLatch(count)
log.info(s"Task incrementing ArrayProcessedSignal to $count")
count
}
}

def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = {
if (matrixType == "auto") {
sampleRowsForArrayType(rowsIter, columnParams)
} else if (matrixType == "sparse") {
(rowsIter: Iterator[Row], true)
} else if (matrixType == "dense") {
(rowsIter: Iterator[Row], false)
} else {
throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}")
}
}
}

class SharedState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams) {
val mainExecutorWorker: Long = LightGBMUtils.getTaskId
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
val chunkSize: Int = trainParams.executionParams.chunkSize
val matrixType: String = trainParams.executionParams.matrixType

val datasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this)
val validationDatasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this)

@volatile var isSparse: Option[Boolean] = None

def linkIsSparse(isSparse: Boolean): Unit = {
Expand All @@ -80,15 +103,9 @@ class SharedState(columnParams: ColumnParams,
}
}

@volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0)

def incrementArrayProcessedSignal(log: Logger): Int = {
this.synchronized {
val count = arrayProcessedSignal.getCount.toInt + 1
arrayProcessedSignal = new CountDownLatch(count)
log.info(s"Task incrementing ArrayProcessedSignal to $count")
count
}
datasetState.incrementArrayProcessedSignal(log)
validationDatasetState.incrementArrayProcessedSignal(log)
}

@volatile var doneSignal: CountDownLatch = new CountDownLatch(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ object TaskTrainingMethods {
validationData: Option[Broadcast[Array[Row]]],
sharedState: SharedState): (BaseAggregatedColumns, Option[BaseAggregatedColumns]) = {
val aggregatedColumns = {
val prepAggregatedColumns = sharedState.prep(inputRows)
sharedState.merge(prepAggregatedColumns)
val prepAggregatedColumns = sharedState.datasetState.prep(inputRows)
sharedState.datasetState.merge(prepAggregatedColumns)
}

val aggregatedValidationColumns = validationData.map { data =>
val prepAggregatedColumns = sharedState.prep(data.value.toIterator)
sharedState.merge(prepAggregatedColumns)
val prepAggregatedColumns = sharedState.validationDatasetState.prep(data.value.toIterator)
sharedState.validationDatasetState.merge(prepAggregatedColumns)
}
(aggregatedColumns, aggregatedValidationColumns)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.microsoft.azure.synapse.ml.lightgbm.dataset.DatasetUtils.getRowAsDoub
import com.microsoft.azure.synapse.ml.lightgbm.swig._
import com.microsoft.azure.synapse.ml.lightgbm.{ColumnParams, LightGBMUtils}
import com.microsoft.ml.lightgbm.{SWIGTYPE_p_int, lightgbmlib, lightgbmlibConstants}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -181,7 +182,7 @@ private[lightgbm] final class DenseChunkedColumns(rowsIter: PeekingIterator[Row]

}

private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) {
private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) extends Logging {
protected var labels: FloatSwigArray = _
protected var weights: Option[FloatSwigArray] = None
protected var initScores: Option[DoubleSwigArray] = None
Expand Down Expand Up @@ -327,14 +328,17 @@ private[lightgbm] abstract class BaseDenseAggregatedColumns(chunkSize: Int) exte

def getFeatures: DoubleSwigArray = features

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String): LightGBMDataset = {
val pointer = lightgbmlib.voidpp_handle()
try {
val numRows = rowCount.get().toInt
logInfo(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMat(
lightgbmlib.double_to_voidp_ptr(features.array),
lightgbmlibConstants.C_API_DTYPE_FLOAT64,
rowCount.get().toInt,
numRows,
numCols,
1,
datasetParams,
Expand Down Expand Up @@ -434,9 +438,12 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
}
}

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String): LightGBMDataset = {
indexPointerArrayIncrement(getIndexPointers.array)
val pointer = lightgbmlib.voidpp_handle()
val numRows = indptrCount.get() - 1
logInfo(s"LightGBM task generating sparse dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSR(
lightgbmlib.int_to_voidp_ptr(indexPointers.array),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait LightGBMExecutionParams extends Wrappable {
val useSingleDatasetMode = new BooleanParam(this, "useSingleDatasetMode",
"Use single dataset execution mode to create a single native dataset per executor (singleton) " +
"to reduce memory and communication overhead. Note this is disabled when running spark in local mode.")
setDefault(useSingleDatasetMode -> false)
setDefault(useSingleDatasetMode -> true)

def getUseSingleDatasetMode: Boolean = $(useSingleDatasetMode)
def setUseSingleDatasetMode(value: Boolean): this.type = set(useSingleDatasetMode, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,14 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM
}
}
val scoredDF1 = baseModel
.setUseSingleDatasetMode(false)
.fit(pimaDF)
.transform(pimaDF)

// Note: run for more iterations than non-custom objective to prevent flakiness
// Note we intentionally overfit here on the training data and don't do a split
val scoredDF2 = baseModel
.setUseSingleDatasetMode(false)
.setFObj(new LogLikelihood())
.setNumIterations(300)
.fit(pimaDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class VerifyLightGBMRegressor extends Benchmarks
verifyLearnerOnRegressionCsvFile("energyefficiency2012_data.train.csv", "Y1", 0,
Some(Seq("X1", "X2", "X3", "X4", "X5", "X6", "X7", "X8", "Y2")))
verifyLearnerOnRegressionCsvFile("airfoil_self_noise.train.csv", "Scaled sound pressure level", 1)
verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -3)
verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -4)
verifyLearnerOnRegressionCsvFile("machine.train.csv", "ERP", -2)
// TODO: Spark doesn't seem to like the column names here because of '.', figure out how to read in the data
// verifyLearnerOnRegressionCsvFile("slump_test.train.csv", "Compressive Strength (28-day)(Mpa)", 2)
Expand Down