Skip to content

Commit

Permalink
feat: added chunk size parameter for copying java data to native
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft committed May 3, 2021
1 parent aad223e commit ad10270
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class LightGBMClassifier(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, getObjective, modelStr,
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames,
getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ trait LightGBMExecutionParams extends Wrappable {

def getNumTasks: Int = $(numTasks)
def setNumTasks(value: Int): this.type = set(numTasks, value)

val chunkSize = new IntParam(this, "chunkSize",
"Advanced parameter to specify the chunk size for copying Java data to native. " +
"If set too high, memory may be wasted, but if set too low, performance may be reduced during data copy." +
"If dataset size is known beforehand, set to the number of rows in the dataset.")
setDefault(chunkSize -> 10000)

def getChunkSize: Int = $(chunkSize)
def setChunkSize(value: Int): this.type = set(chunkSize, value)
}

/** Defines common parameters across all LightGBM learners related to learning score evolution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class LightGBMRanker(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class LightGBMRegressor(override val uid: String)
getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf,
numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ object LightGBMUtils {
(lightgbmlib.double_to_voidp_ptr(data), data)
}

def getNumRowsForChunksArray(numRows: Int, numCols: Int, defaultChunkSize: Int): SWIGTYPE_p_int = {
var numChunks = Math.floorDiv(numRows, defaultChunkSize)
var leftoverChunk = numRows % defaultChunkSize
def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = {
var numChunks = Math.floorDiv(numRows, chunkSize)
var leftoverChunk = numRows % chunkSize
if (leftoverChunk > 0) {
numChunks += 1
}
Expand All @@ -220,7 +220,7 @@ object LightGBMUtils {
if (index == numChunks - 1 && leftoverChunk > 0) {
lightgbmlib.intArray_setitem(numRowsForChunks, index, leftoverChunk)
} else {
lightgbmlib.intArray_setitem(numRowsForChunks, index, defaultChunkSize)
lightgbmlib.intArray_setitem(numRowsForChunks, index, chunkSize)
}
})
numRowsForChunks
Expand All @@ -229,7 +229,7 @@ object LightGBMUtils {
def generateDenseDataset(numRows: Int, numCols: Int, featuresArray: doubleChunkedArray,
referenceDataset: Option[LightGBMDataset],
featureNamesOpt: Option[Array[String]],
trainParams: TrainParams, defaultChunkSize: Int): LightGBMDataset = {
trainParams: TrainParams, chunkSize: Int): LightGBMDataset = {
val isRowMajor = 1
val datasetOutPtr = lightgbmlib.voidpp_handle()
val datasetParams = s"max_bin=${trainParams.maxBin} is_pre_partition=True " +
Expand All @@ -238,7 +238,7 @@ object LightGBMUtils {
else s"categorical_feature=${trainParams.categoricalFeatures.mkString(",")}")
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64
var data: Option[(SWIGTYPE_p_void, SWIGTYPE_p_double)] = None
val numRowsForChunks = getNumRowsForChunksArray(numRows, numCols, defaultChunkSize)
val numRowsForChunks = getNumRowsForChunksArray(numRows, chunkSize)
try {
// Generate the dataset for features
featuresArray.get_last_chunk_add_count()
Expand All @@ -249,6 +249,7 @@ object LightGBMUtils {
"Dataset create")
} finally {
featuresArray.release()
lightgbmlib.delete_intArray(numRowsForChunks)
}
val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(datasetOutPtr))
dataset.setFeatureNames(featureNamesOpt, numCols)
Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/TrainParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ abstract class TrainParams extends Serializable {
def minDataInLeaf: Int
def featureNames: Array[String]
def delegate: Option[LightGBMDelegate]
def chunkSize: Int

override def toString: String = {
// Since passing `isProvideTrainingMetric` to LightGBM as a config parameter won't work,
Expand Down Expand Up @@ -71,7 +72,8 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations:
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
val extraStr =
Expand All @@ -95,7 +97,8 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " +
Expand All @@ -116,7 +119,8 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int,
labelGain: Array[Double], isProvideTrainingMetric: Boolean,
metric: String, evalAt: Array[Int], minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
val labelGainStr =
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/TrainUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ private object TrainUtils extends Serializable {
referenceDataset: Option[LightGBMDataset], schema: StructType,
log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = {
var numRows = 0
val defaultChunkSize = 1000
val labelsChunkedArray = new floatChunkedArray(defaultChunkSize)
val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(defaultChunkSize) }
val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(defaultChunkSize) }
val chunkSize = trainParams.chunkSize
val labelsChunkedArray = new floatChunkedArray(chunkSize)
val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(chunkSize) }
val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(chunkSize) }
var featuresChunkedArrayOpt: Option[doubleChunkedArray] = None
val groupColumnValues: ListBuffer[Row] = new ListBuffer[Row]()
try {
Expand All @@ -130,7 +130,7 @@ private object TrainUtils extends Serializable {
val rowAsDoubleArray = getRowAsDoubleArray(row, columnParams, schema)
numCols = rowAsDoubleArray.length
if (featuresChunkedArrayOpt.isEmpty) {
featuresChunkedArrayOpt = Some(new doubleChunkedArray(numCols * defaultChunkSize))
featuresChunkedArrayOpt = Some(new doubleChunkedArray(numCols * chunkSize))
}
addFeaturesToChunkedArray(featuresChunkedArrayOpt, numCols, rowAsDoubleArray)
addInitScoreColumnRow(initScoreChunkedArrayOpt, row, columnParams, schema)
Expand All @@ -140,7 +140,7 @@ private object TrainUtils extends Serializable {
val slotNames = getSlotNames(schema, columnParams.featuresColumn, numCols, trainParams)
log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns")
val datasetPtr = Some(LightGBMUtils.generateDenseDataset(numRows, numCols, featuresChunkedArrayOpt.get,
referenceDataset, slotNames, trainParams, defaultChunkSize))
referenceDataset, slotNames, trainParams, chunkSize))
datasetPtr.get.addFloatField(labelsChunkedArray, "label", numRows)

weightChunkedArrayOpt.foreach(datasetPtr.get.addFloatField(_, "weight", numRows))
Expand Down

0 comments on commit ad10270

Please sign in to comment.