-
Notifications
You must be signed in to change notification settings - Fork 834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add streaming API for MVAD #1893
Conversation
Hey @serena-ruan 👋! We use semantic commit messages to streamline the release process. Examples of commit messages with semantic prefixes:
To test your commit locally, please follow our guild on building from source. |
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
Codecov Report
@@ Coverage Diff @@
## master #1893 +/- ##
=======================================
Coverage 85.83% 85.83%
=======================================
Files 301 301
Lines 15622 15677 +55
Branches 813 815 +2
=======================================
+ Hits 13409 13457 +48
- Misses 2213 2220 +7
... and 3 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
MADUtils.checkModelStatus(getUrl, getModelId, getSubscriptionKey) | ||
|
||
val convertTimeFormatUdf = UDFUtils.oldUdf( | ||
{ value: String => convertTimeFormat("Timestamp column", value) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of "Timestamp column" here doesent look like its necessary
val window = Window.partitionBy("group").rowsBetween(-getBatchSize, 0) | ||
var collectedDF = formattedDF | ||
var columnNames = Array(getTimestampCol) ++ getInputVariablesCols | ||
for (columnName <- columnNames) { | ||
collectedDF = collectedDF.withColumn(s"${columnName}_list", collect_list(columnName).over(window)) | ||
} | ||
collectedDF = collectedDF.drop("group") | ||
columnNames = columnNames.map(name => s"${name}_list") | ||
|
||
val testDF = getInternalTransformer(collectedDF.schema).transform(collectedDF) | ||
|
||
testDF | ||
.withColumn("isAnomaly", when(col(getOutputCol).isNotNull, | ||
col(s"$getOutputCol.results.value.isAnomaly")(0)).otherwise(null)) | ||
.withColumn("DetectDataTimestamp", when(col(getOutputCol).isNotNull, | ||
col(s"$getOutputCol.results.timestamp")(0)).otherwise(null)) | ||
.drop(columnNames: _*) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like alot of the column names here are hard-coded. Will there be any instances where this doesent work with a given input df or setting of the params?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hard-coded part "{name}_list" is because I use collect_list to aggregate those values in different rows to a single row with a list of values. Only the suffix '_list' is hard-coded, and finally those columns will be dropped and values will be mapped back to the original dataframe.
override protected def getInternalTransformer(schema: StructType): PipelineModel = { | ||
val dynamicParamColName = DatasetExtensions.findUnusedColumnName("dynamic", schema) | ||
val lambda = Lambda(_.withColumn(dynamicParamColName, struct( | ||
s"${getTimestampCol}_list", getInputVariablesCols.map(name => s"${name}_list"): _*))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise here
lazy val df: DataFrame = spark.read.format("csv") | ||
.option("header", "true").schema(fileSchema).load(fileLocation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not infer proper schema here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes... I don't know why but it seems fails to infer the schema :(
val hc: Configuration = spark.sparkContext.hadoopConfiguration | ||
hc.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") | ||
hc.set(s"fs.azure.account.keyprovider.$storageAccount.blob.core.windows.net", | ||
"org.apache.hadoop.fs.azure.SimpleKeyProvider") | ||
hc.set(s"fs.azure.account.key.$storageAccount.blob.core.windows.net", storageKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this lazy then print it in test you need to trigger it
val sfma: SimpleFitMultivariateAnomaly = new SimpleFitMultivariateAnomaly() | ||
.setSubscriptionKey(anomalyKey) | ||
.setLocation(anomalyLocation) | ||
.setOutputCol("result") | ||
.setStartTime(startTime) | ||
.setEndTime(endTime) | ||
.setIntermediateSaveDir(intermediateSaveDir) | ||
.setTimestampCol(timestampColumn) | ||
.setInputCols(inputColumns) | ||
.setSlidingWindow(50) | ||
|
||
val model: SimpleDetectMultivariateAnomaly = sfma.fit(df) | ||
val modelId: String = model.getModelId | ||
|
||
MADUtils.CreatedModels += modelId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this stuff lazy, add the
lazy val modelId = {
val model = sfma.fit(df)
MADUtils.CreatedModels += model.getModelId
model.getModelId
}
test("Error if batch size is smaller than sliding window") { | ||
val result = dlma.setBatchSize(10).transform(df.limit(50)) | ||
result.show(50, truncate = false) | ||
assert(result.collect().head.getAs[StringType](dlma.getErrorCol).toString.contains("NotEnoughData")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide more details on why this is a failure? Im a little confused about whats going on here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the number of data points we use to infer should be larger than sliding window, say your sliding window is 30, then you could only use the first 30 data points to infer the 31st data point. In this test case, batchSize is 10, but the model sfma has sliding window 50 (.setSlidingWindow(50)), so it couldn't infer any datapoints using 10 data points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lovely work! A few open questions for you
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
Related Issues/PRs
#xxx
What changes are proposed in this pull request?
Briefly describe the changes included in this Pull Request.
How is this patch tested?
Does this PR change any dependencies?
Does this PR add a new feature? If so, have you added samples on website?
website/docs/documentation
folder.Make sure you choose the correct class
estimators/transformers
and namespace.DocTable
points to correct API link.yarn run start
to make sure the website renders correctly.<!--pytest-codeblocks:cont-->
before each python code blocks to enable auto-tests for python samples.WebsiteSamplesTests
job pass in the pipeline.