From fbfcbcaa0715f77a08f195d39107ea193fce7d98 Mon Sep 17 00:00:00 2001 From: Brian Broll Date: Thu, 14 Nov 2019 17:11:53 -0600 Subject: [PATCH] WIP #1264 Clear intermediate data on (non-debug) pipeline complete --- .../ExecutePipeline/ExecutePipeline.js | 101 +++++++++++------- src/plugins/GenerateJob/GenerateJob.js | 12 ++- 2 files changed, 71 insertions(+), 42 deletions(-) diff --git a/src/plugins/ExecutePipeline/ExecutePipeline.js b/src/plugins/ExecutePipeline/ExecutePipeline.js index ecd9fd9f5..66baf686f 100644 --- a/src/plugins/ExecutePipeline/ExecutePipeline.js +++ b/src/plugins/ExecutePipeline/ExecutePipeline.js @@ -4,6 +4,7 @@ define([ 'plugin/CreateExecution/CreateExecution/CreateExecution', 'plugin/ExecuteJob/ExecuteJob/ExecuteJob', + 'deepforge/storage/index', 'common/storage/constants', 'common/core/constants', 'deepforge/Constants', @@ -13,6 +14,7 @@ define([ ], function ( CreateExecution, ExecuteJob, + Storage, STORAGE_CONSTANTS, GME_CONSTANTS, CONSTANTS, @@ -443,50 +445,69 @@ define([ msg += 'finished!'; } - return this.isDeleted().then(isDeleted => { - this.stopExecHeartBeat(); - if (!isDeleted) { - - this.logger.debug(`Pipeline "${name}" complete!`); - this.setAttribute(this.activeNode, 'endTime', Date.now()); - this.setAttribute(this.activeNode, 'status', - (this.pipelineError ? 'failed' : - (this.canceled ? 'canceled' : 'success') - ) - ); - this.delAttribute(this.activeNode, 'executionId'); - - this._finished = true; - this.resultMsg(msg); - this.save('Pipeline execution finished') - .then(() => { - this.result.setSuccess(!this.pipelineError); - this._callback(this.pipelineError || null, this.result); - }) - .catch(e => this.logger.error(e)); - } else { // deleted! - this.logger.debug('Execution has been deleted!'); - this.result.setSuccess(!this.pipelineError); - this._callback(this.pipelineError || null, this.result); - } - }); + if (!this.isDebugMode()) { + await this.deleteIntermediateData(); + } + + const isDeleted = await this.isDeleted(); + this.stopExecHeartBeat(); + + if (!isDeleted) { + this.logger.debug(`Pipeline "${name}" complete!`); + this.core.setAttribute(this.activeNode, 'endTime', Date.now()); + this.core.setAttribute(this.activeNode, 'status', + (this.pipelineError ? 'failed' : + (this.canceled ? 'canceled' : 'success') + ) + ); + this.core.delAttribute(this.activeNode, 'executionId'); + + this._finished = true; + this.resultMsg(msg); + await this.save('Pipeline execution finished'); + } else { // deleted! + this.logger.debug('Execution has been deleted!'); + } + this.result.setSuccess(!this.pipelineError); + this._callback(this.pipelineError || null, this.result); + }; + + ExecutePipeline.prototype.isDebugMode = function() { + return !this.core.getAttribute(this.activeNode, 'snapshot'); + }; + + ExecutePipeline.prototype.getStorageConfig = function () { + const storage = this.getCurrentConfig().storage || {}; + storage.id = storage.id || 'gme'; + storage.config = storage.config || {}; + return storage; + }; + + ExecutePipeline.prototype.deleteIntermediateData = async function() { + const storageDir = this.getStorageDir(); + const config = this.getStorageConfig(); + const client = await Storage.getClient(config.id, this.logger, config.config); + return client.deleteDir(storageDir); + }; + + ExecutePipeline.prototype.getStorageDir = function () { + const execName = this.core.getAttribute(this.activeNode, 'name').replace(/\//g, '_'); + return `${this.projectId}/executions/${execName}/`; }; - ExecutePipeline.prototype.isDeleted = function () { - var activeId = this.core.getPath(this.activeNode); + ExecutePipeline.prototype.isDeleted = async function () { + const activeId = this.core.getPath(this.activeNode); // Check if the current execution has been deleted - return this.project.getBranchHash(this.branchName) - .then(hash => this.updateNodes(hash)) - .then(() => this.core.loadByPath(this.rootNode, activeId)) - .then(node => { - var deleted = node === null, - msg = `Verified that execution is ${deleted ? '' : 'not '}deleted`; - - this.logger.debug(msg); - return deleted; - }) - .fail(err => this.logger.error(err)); + const hash = await this.project.getBranchHash(this.branchName); + await this.updateNodes(hash); + + const node = await this.core.loadByPath(this.rootNode, activeId); + const isDeleted = node === null; + const msg = `Verified that execution is ${isDeleted ? '' : 'not '}deleted`; + + this.logger.debug(msg); + return isDeleted; }; ExecutePipeline.prototype.onPipelineDeleted = function () { diff --git a/src/plugins/GenerateJob/GenerateJob.js b/src/plugins/GenerateJob/GenerateJob.js index 8fbf348db..0da9325ad 100644 --- a/src/plugins/GenerateJob/GenerateJob.js +++ b/src/plugins/GenerateJob/GenerateJob.js @@ -297,6 +297,15 @@ define([ return configs; }; + GenerateJob.prototype.getStorageDir = function () { + const operation = this.activeNode; + const execution = this.core.getParent(this.core.getParent(operation)); + + const execName = this.core.getAttribute(execution, 'name').replace(/\//g, '_'); + const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_'); + return `${this.projectId}/executions/${execName}/${jobId}/`; + }; + GenerateJob.prototype.createInputs = async function (node, files) { this.logger.info('Retrieving inputs and deserialize fns...'); const allInputs = await this.getInputs(node); @@ -309,8 +318,7 @@ define([ .filter(pair => !!this.getAttribute(pair[2], 'data')); // remove empty inputs const storage = this.getStorageConfig(); - const jobId = this.core.getPath(this.activeNode).replace(/\//g, '_'); - const storageDir = `${this.projectId}/executions/${jobId}/`; + const storageDir = this.getStorageDir(); const startJS = _.template(Templates.START)({ storageDir,