Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add chunk size parameter for copying java data to native #1041

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class LightGBMClassifier(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, getObjective, modelStr,
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames,
getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ trait LightGBMExecutionParams extends Wrappable {

def getNumTasks: Int = $(numTasks)
def setNumTasks(value: Int): this.type = set(numTasks, value)

val chunkSize = new IntParam(this, "chunkSize",
"Advanced parameter to specify the chunk size for copying Java data to native. " +
"If set too high, memory may be wasted, but if set too low, performance may be reduced during data copy." +
"If dataset size is known beforehand, set to the number of rows in the dataset.")
setDefault(chunkSize -> 10000)

def getChunkSize: Int = $(chunkSize)
def setChunkSize(value: Int): this.type = set(chunkSize, value)
}

/** Defines common parameters across all LightGBM learners related to learning score evolution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class LightGBMRanker(override val uid: String)
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class LightGBMRegressor(override val uid: String)
getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf,
numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getChunkSize)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ object LightGBMUtils {
(lightgbmlib.double_to_voidp_ptr(data), data)
}

def getNumRowsForChunksArray(numRows: Int, numCols: Int, defaultChunkSize: Int): SWIGTYPE_p_int = {
var numChunks = Math.floorDiv(numRows, defaultChunkSize)
var leftoverChunk = numRows % defaultChunkSize
def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = {
var numChunks = Math.floorDiv(numRows, chunkSize)
var leftoverChunk = numRows % chunkSize
if (leftoverChunk > 0) {
numChunks += 1
}
Expand All @@ -220,7 +220,7 @@ object LightGBMUtils {
if (index == numChunks - 1 && leftoverChunk > 0) {
lightgbmlib.intArray_setitem(numRowsForChunks, index, leftoverChunk)
} else {
lightgbmlib.intArray_setitem(numRowsForChunks, index, defaultChunkSize)
lightgbmlib.intArray_setitem(numRowsForChunks, index, chunkSize)
}
})
numRowsForChunks
Expand All @@ -229,7 +229,7 @@ object LightGBMUtils {
def generateDenseDataset(numRows: Int, numCols: Int, featuresArray: doubleChunkedArray,
referenceDataset: Option[LightGBMDataset],
featureNamesOpt: Option[Array[String]],
trainParams: TrainParams, defaultChunkSize: Int): LightGBMDataset = {
trainParams: TrainParams, chunkSize: Int): LightGBMDataset = {
val isRowMajor = 1
val datasetOutPtr = lightgbmlib.voidpp_handle()
val datasetParams = s"max_bin=${trainParams.maxBin} is_pre_partition=True " +
Expand All @@ -238,7 +238,7 @@ object LightGBMUtils {
else s"categorical_feature=${trainParams.categoricalFeatures.mkString(",")}")
val data64bitType = lightgbmlibConstants.C_API_DTYPE_FLOAT64
var data: Option[(SWIGTYPE_p_void, SWIGTYPE_p_double)] = None
val numRowsForChunks = getNumRowsForChunksArray(numRows, numCols, defaultChunkSize)
val numRowsForChunks = getNumRowsForChunksArray(numRows, chunkSize)
try {
// Generate the dataset for features
featuresArray.get_last_chunk_add_count()
Expand All @@ -249,6 +249,7 @@ object LightGBMUtils {
"Dataset create")
} finally {
featuresArray.release()
lightgbmlib.delete_intArray(numRowsForChunks)
}
val dataset = new LightGBMDataset(lightgbmlib.voidpp_value(datasetOutPtr))
dataset.setFeatureNames(featureNamesOpt, numCols)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ abstract class TrainParams extends Serializable {
def minDataInLeaf: Int
def featureNames: Array[String]
def delegate: Option[LightGBMDelegate]
def chunkSize: Int

override def toString: String = {
// Since passing `isProvideTrainingMetric` to LightGBM as a config parameter won't work,
Expand Down Expand Up @@ -71,7 +72,8 @@ case class ClassifierTrainParams(parallelism: String, topK: Int, numIterations:
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
val extraStr =
Expand All @@ -95,7 +97,8 @@ case class RegressorTrainParams(parallelism: String, topK: Int, numIterations: I
boostingType: String, lambdaL1: Double, lambdaL2: Double,
isProvideTrainingMetric: Boolean, metric: String, minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
s"alpha=$alpha tweedie_variance_power=$tweedieVariancePower boost_from_average=${boostFromAverage.toString} " +
Expand All @@ -116,7 +119,8 @@ case class RankerTrainParams(parallelism: String, topK: Int, numIterations: Int,
labelGain: Array[Double], isProvideTrainingMetric: Boolean,
metric: String, evalAt: Array[Int], minGainToSplit: Double,
maxDeltaStep: Double, maxBinByFeature: Array[Int], minDataInLeaf: Int,
featureNames: Array[String], delegate: Option[LightGBMDelegate])
featureNames: Array[String], delegate: Option[LightGBMDelegate],
chunkSize: Int)
extends TrainParams {
override def toString(): String = {
val labelGainStr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ private object TrainUtils extends Serializable {
referenceDataset: Option[LightGBMDataset], schema: StructType,
log: Logger, trainParams: TrainParams): Option[LightGBMDataset] = {
var numRows = 0
val defaultChunkSize = 1000
val labelsChunkedArray = new floatChunkedArray(defaultChunkSize)
val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(defaultChunkSize) }
val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(defaultChunkSize) }
val chunkSize = trainParams.chunkSize
val labelsChunkedArray = new floatChunkedArray(chunkSize)
val weightChunkedArrayOpt = columnParams.weightColumn.map { _ => new floatChunkedArray(chunkSize) }
val initScoreChunkedArrayOpt = columnParams.initScoreColumn.map { _ => new doubleChunkedArray(chunkSize) }
var featuresChunkedArrayOpt: Option[doubleChunkedArray] = None
val groupColumnValues: ListBuffer[Row] = new ListBuffer[Row]()
try {
Expand All @@ -130,7 +130,7 @@ private object TrainUtils extends Serializable {
val rowAsDoubleArray = getRowAsDoubleArray(row, columnParams, schema)
numCols = rowAsDoubleArray.length
if (featuresChunkedArrayOpt.isEmpty) {
featuresChunkedArrayOpt = Some(new doubleChunkedArray(numCols * defaultChunkSize))
featuresChunkedArrayOpt = Some(new doubleChunkedArray(numCols * chunkSize))
}
addFeaturesToChunkedArray(featuresChunkedArrayOpt, numCols, rowAsDoubleArray)
addInitScoreColumnRow(initScoreChunkedArrayOpt, row, columnParams, schema)
Expand All @@ -140,7 +140,7 @@ private object TrainUtils extends Serializable {
val slotNames = getSlotNames(schema, columnParams.featuresColumn, numCols, trainParams)
log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns")
val datasetPtr = Some(LightGBMUtils.generateDenseDataset(numRows, numCols, featuresChunkedArrayOpt.get,
referenceDataset, slotNames, trainParams, defaultChunkSize))
referenceDataset, slotNames, trainParams, chunkSize))
datasetPtr.get.addFloatField(labelsChunkedArray, "label", numRows)

weightChunkedArrayOpt.foreach(datasetPtr.get.addFloatField(_, "weight", numRows))
Expand Down