Skip to content

Commit

Permalink
fix: add logging for number of columns and rows when creating dataset…
Browse files Browse the repository at this point in the history
…s, set useSingleDatasetMode=True by default
  • Loading branch information
imatiach-msft committed Nov 17, 2021
1 parent 7bfd0d7 commit f128d13
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.ml.param.shared.{HasFeaturesCol => HasFeaturesColSpark,
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.slf4j.Logger

import java.net.{ServerSocket, Socket}
import java.util.concurrent.Executors
Expand Down Expand Up @@ -277,9 +278,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
private def generateDataset(ac: BaseAggregatedColumns,
referenceDataset: Option[LightGBMDataset],
schema: StructType,
datasetParams: String): LightGBMDataset = {
datasetParams: String,
log: Logger): LightGBMDataset = {
val dataset = try {
val datasetInner = ac.generateDataset(referenceDataset, datasetParams)
val datasetInner = ac.generateDataset(referenceDataset, datasetParams, log)
getOptGroupCol.foreach(_ => datasetInner.addGroupColumn(ac.getGroups))
datasetInner.setFeatureNames(getSlotNamesWithMetadata(schema(getFeaturesCol)), ac.getNumCols)
datasetInner
Expand All @@ -301,13 +303,13 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val columnParams = getColumnParams
val datasetParams = getDatasetParams(trainParams.categoricalFeatures, trainParams.executionParams.numThreads)
beforeGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams)
val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams)
val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams, log)
try {
afterGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams)

val validDatasetOpt = validationData.map { vd =>
beforeGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams)
val out = generateDataset(vd, Some(trainDataset), schema, datasetParams)
val out = generateDataset(vd, Some(trainDataset), schema, datasetParams, log)
afterGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams)
out
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,33 @@ import org.slf4j.Logger

import java.util.concurrent.CountDownLatch

class SharedState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams) {
val mainExecutorWorker: Long = LightGBMUtils.getTaskId
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
class SharedDatasetState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams,
sharedState: SharedState) {
val chunkSize: Int = trainParams.executionParams.chunkSize
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
val matrixType: String = trainParams.executionParams.matrixType

lazy val denseAggregatedColumns: BaseDenseAggregatedColumns = new DenseSyncAggregatedColumns(chunkSize)

lazy val sparseAggregatedColumns: BaseSparseAggregatedColumns = new SparseSyncAggregatedColumns(chunkSize)

def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = {
if (matrixType == "auto") {
sampleRowsForArrayType(rowsIter, columnParams)
} else if (matrixType == "sparse") {
(rowsIter: Iterator[Row], true)
} else if (matrixType == "dense") {
(rowsIter: Iterator[Row], false)
} else {
throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}")
}
}

def prep(iter: Iterator[Row]): BaseChunkedColumns = {
val (concatRowsIter: Iterator[Row], isSparseHere: Boolean) = getArrayType(iter, matrixType)
val peekableIter = new PeekingIterator(concatRowsIter)
// Note: the first worker sets "is sparse", other workers read it
linkIsSparse(isSparseHere)
sharedState.linkIsSparse(isSparseHere)

if (!isSparse.get) {
if (!sharedState.isSparse.get) {
new DenseChunkedColumns(peekableIter, columnParams, schema, chunkSize)
} else {
new SparseChunkedColumns(peekableIter, columnParams, schema, chunkSize, useSingleDataset)
}
}

def merge(ts: BaseChunkedColumns): BaseAggregatedColumns = {
val isSparseVal = isSparse.get
val isSparseVal = sharedState.isSparse.get
val aggregatedColumns = if (!isSparseVal) {
if (useSingleDataset) denseAggregatedColumns
else new DenseAggregatedColumns(chunkSize)
Expand All @@ -68,6 +56,41 @@ class SharedState(columnParams: ColumnParams,
aggregatedColumns
}

@volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0)

def incrementArrayProcessedSignal(log: Logger): Int = {
this.synchronized {
val count = arrayProcessedSignal.getCount.toInt + 1
arrayProcessedSignal = new CountDownLatch(count)
log.info(s"Task incrementing ArrayProcessedSignal to $count")
count
}
}

def getArrayType(rowsIter: Iterator[Row], matrixType: String): (Iterator[Row], Boolean) = {
if (matrixType == "auto") {
sampleRowsForArrayType(rowsIter, columnParams)
} else if (matrixType == "sparse") {
(rowsIter: Iterator[Row], true)
} else if (matrixType == "dense") {
(rowsIter: Iterator[Row], false)
} else {
throw new Exception(s"Invalid parameter matrix type specified: ${matrixType}")
}
}
}

class SharedState(columnParams: ColumnParams,
schema: StructType,
trainParams: TrainParams) {
val mainExecutorWorker: Long = LightGBMUtils.getTaskId
val useSingleDataset: Boolean = trainParams.executionParams.useSingleDatasetMode
val chunkSize: Int = trainParams.executionParams.chunkSize
val matrixType: String = trainParams.executionParams.matrixType

val datasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this)
val validationDatasetState: SharedDatasetState = new SharedDatasetState(columnParams, schema, trainParams, this)

@volatile var isSparse: Option[Boolean] = None

def linkIsSparse(isSparse: Boolean): Unit = {
Expand All @@ -80,15 +103,9 @@ class SharedState(columnParams: ColumnParams,
}
}

@volatile var arrayProcessedSignal: CountDownLatch = new CountDownLatch(0)

def incrementArrayProcessedSignal(log: Logger): Int = {
this.synchronized {
val count = arrayProcessedSignal.getCount.toInt + 1
arrayProcessedSignal = new CountDownLatch(count)
log.info(s"Task incrementing ArrayProcessedSignal to $count")
count
}
datasetState.incrementArrayProcessedSignal(log)
validationDatasetState.incrementArrayProcessedSignal(log)
}

@volatile var doneSignal: CountDownLatch = new CountDownLatch(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ object TaskTrainingMethods {
validationData: Option[Broadcast[Array[Row]]],
sharedState: SharedState): (BaseAggregatedColumns, Option[BaseAggregatedColumns]) = {
val aggregatedColumns = {
val prepAggregatedColumns = sharedState.prep(inputRows)
sharedState.merge(prepAggregatedColumns)
val prepAggregatedColumns = sharedState.datasetState.prep(inputRows)
sharedState.datasetState.merge(prepAggregatedColumns)
}

val aggregatedValidationColumns = validationData.map { data =>
val prepAggregatedColumns = sharedState.prep(data.value.toIterator)
sharedState.merge(prepAggregatedColumns)
val prepAggregatedColumns = sharedState.validationDatasetState.prep(data.value.toIterator)
sharedState.validationDatasetState.merge(prepAggregatedColumns)
}
(aggregatedColumns, aggregatedValidationColumns)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.slf4j.Logger

import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -211,7 +212,7 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) {
initScores.foreach(_.delete())
}

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset
def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String, log: Logger): LightGBMDataset

def incrementCount(chunkedCols: BaseChunkedColumns): Unit = {
rowCount.addAndGet(chunkedCols.rowCount)
Expand Down Expand Up @@ -327,14 +328,17 @@ private[lightgbm] abstract class BaseDenseAggregatedColumns(chunkSize: Int) exte

def getFeatures: DoubleSwigArray = features

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String, log: Logger): LightGBMDataset = {
val pointer = lightgbmlib.voidpp_handle()
try {
val numRows = rowCount.get().toInt
log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMat(
lightgbmlib.double_to_voidp_ptr(features.array),
lightgbmlibConstants.C_API_DTYPE_FLOAT64,
rowCount.get().toInt,
numRows,
numCols,
1,
datasetParams,
Expand Down Expand Up @@ -434,9 +438,12 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
}
}

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String, log: Logger): LightGBMDataset = {
indexPointerArrayIncrement(getIndexPointers.array)
val pointer = lightgbmlib.voidpp_handle()
val numRows = indptrCount.get() - 1
log.info(s"LightGBM task generating sparse dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSR(
lightgbmlib.int_to_voidp_ptr(indexPointers.array),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait LightGBMExecutionParams extends Wrappable {
val useSingleDatasetMode = new BooleanParam(this, "useSingleDatasetMode",
"Use single dataset execution mode to create a single native dataset per executor (singleton) " +
"to reduce memory and communication overhead. Note this is disabled when running spark in local mode.")
setDefault(useSingleDatasetMode -> false)
setDefault(useSingleDatasetMode -> true)

def getUseSingleDatasetMode: Boolean = $(useSingleDatasetMode)
def setUseSingleDatasetMode(value: Boolean): this.type = set(useSingleDatasetMode, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,14 @@ class VerifyLightGBMClassifier extends Benchmarks with EstimatorFuzzing[LightGBM
}
}
val scoredDF1 = baseModel
.setUseSingleDatasetMode(false)
.fit(pimaDF)
.transform(pimaDF)

// Note: run for more iterations than non-custom objective to prevent flakiness
// Note we intentionally overfit here on the training data and don't do a split
val scoredDF2 = baseModel
.setUseSingleDatasetMode(false)
.setFObj(new LogLikelihood())
.setNumIterations(300)
.fit(pimaDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class VerifyLightGBMRegressor extends Benchmarks
verifyLearnerOnRegressionCsvFile("energyefficiency2012_data.train.csv", "Y1", 0,
Some(Seq("X1", "X2", "X3", "X4", "X5", "X6", "X7", "X8", "Y2")))
verifyLearnerOnRegressionCsvFile("airfoil_self_noise.train.csv", "Scaled sound pressure level", 1)
verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -3)
verifyLearnerOnRegressionCsvFile("Buzz.TomsHardware.train.csv", "Mean Number of display (ND)", -4)
verifyLearnerOnRegressionCsvFile("machine.train.csv", "ERP", -2)
// TODO: Spark doesn't seem to like the column names here because of '.', figure out how to read in the data
// verifyLearnerOnRegressionCsvFile("slump_test.train.csv", "Compressive Strength (28-day)(Mpa)", 2)
Expand Down

0 comments on commit f128d13

Please sign in to comment.