Skip to content

Commit

Permalink
Merge branch 'JUPY-567' of https://github.com/tarungavara/qds-sdk-py
Browse files Browse the repository at this point in the history
…into JUPY-567
  • Loading branch information
Gavara Tarun committed Mar 4, 2020
2 parents 2bff905 + 669c3d2 commit f685aa9
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 183 deletions.
44 changes: 36 additions & 8 deletions qds_sdk/cluster_info_v22.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ def set_cluster_info_from_arguments(self, arguments):
min_spot_percentage=arguments.min_spot_percentage,
min_maximum_bid_price_percentage=arguments.min_maximum_bid_price_percentage,
min_timeout_for_request=arguments.min_timeout_for_request,
min_spot_allocation_strategy=arguments.min_spot_allocation_strategy,
min_spot_fallback=arguments.min_spot_fallback,
autoscaling_ondemand_percentage=arguments.autoscaling_ondemand_percentage,
autoscaling_spot_block_percentage=arguments.autoscaling_spot_block_percentage,
autoscaling_spot_percentage=arguments.autoscaling_spot_percentage,
autoscaling_spot_block_duration=arguments.autoscaling_spot_block_duration,
autoscaling_maximum_bid_price_percentage=arguments.autoscaling_maximum_bid_price_percentage,
autoscaling_timeout_for_request=arguments.autoscaling_timeout_for_request,
autoscaling_spot_allocation_strategy=arguments.autoscaling_spot_allocation_strategy,
autoscaling_spot_fallback=arguments.autoscaling_spot_fallback)

def set_cluster_info(self,
Expand Down Expand Up @@ -237,13 +239,15 @@ def set_composition(self,
min_spot_percentage=None,
min_maximum_bid_price_percentage=None,
min_timeout_for_request=None,
min_spot_allocation_strategy=None,
min_spot_fallback=None,
autoscaling_ondemand_percentage=None,
autoscaling_spot_block_percentage=None,
autoscaling_spot_percentage=None,
autoscaling_spot_block_duration=None,
autoscaling_maximum_bid_price_percentage=None,
autoscaling_timeout_for_request=None,
autoscaling_spot_allocation_strategy=None,
autoscaling_spot_fallback=None):

self.cluster_info["composition"] = {}
Expand All @@ -260,6 +264,7 @@ def set_composition(self,
min_spot_percentage,
min_maximum_bid_price_percentage,
min_timeout_for_request,
min_spot_allocation_strategy,
min_spot_fallback)

self.set_autoscaling_config(autoscaling_ondemand_percentage,
Expand All @@ -268,6 +273,7 @@ def set_composition(self,
autoscaling_spot_percentage,
autoscaling_maximum_bid_price_percentage,
autoscaling_timeout_for_request,
autoscaling_spot_allocation_strategy,
autoscaling_spot_fallback)

def set_master_config(self,
Expand All @@ -293,6 +299,7 @@ def set_min_config(self,
min_spot_percentage,
min_maximum_bid_price_percentage,
min_timeout_for_request,
min_spot_allocation_strategy,
min_spot_fallback):
self.cluster_info["composition"]["min_nodes"] = {"nodes": []}
if not min_ondemand_percentage and not min_spot_block_percentage and not min_spot_percentage:
Expand All @@ -305,7 +312,8 @@ def set_min_config(self,
min_spot_block_percentage, min_spot_block_duration)
if min_spot_percentage:
self.set_min_spot(min_spot_percentage, min_maximum_bid_price_percentage,
min_timeout_for_request, min_spot_fallback)
min_timeout_for_request, min_spot_allocation_strategy,
min_spot_fallback)

def set_autoscaling_config(self,
autoscaling_ondemand_percentage,
Expand All @@ -314,20 +322,24 @@ def set_autoscaling_config(self,
autoscaling_spot_percentage,
autoscaling_maximum_bid_price_percentage,
autoscaling_timeout_for_request,
autoscaling_spot_allocation_strategy,
autoscaling_spot_fallback):
self.cluster_info["composition"]["autoscaling_nodes"] = {"nodes": []}
if not autoscaling_ondemand_percentage and not autoscaling_spot_block_percentage and not autoscaling_spot_percentage:
self.set_autoscaling_ondemand(50)
self.set_autoscaling_spot(50, 100, 1, 'ondemand')
self.set_autoscaling_spot(50, 100, 1, None, 'ondemand')
else:
if autoscaling_ondemand_percentage:
self.set_autoscaling_ondemand(autoscaling_ondemand_percentage)
if autoscaling_spot_block_percentage:
self.set_autoscaling_spot_block(autoscaling_spot_block_percentage,
autoscaling_spot_block_duration)
if autoscaling_spot_percentage:
self.set_autoscaling_spot(autoscaling_spot_percentage, autoscaling_maximum_bid_price_percentage,
autoscaling_timeout_for_request, autoscaling_spot_fallback)
self.set_autoscaling_spot(autoscaling_spot_percentage,
autoscaling_maximum_bid_price_percentage,
autoscaling_timeout_for_request,
autoscaling_spot_allocation_strategy,
autoscaling_spot_fallback)

def set_master_ondemand(self, master_ondemand_percentage=None):
ondemand = {"percentage": master_ondemand_percentage, "type": "ondemand"}
Expand Down Expand Up @@ -360,11 +372,13 @@ def set_min_spot_block(self, min_spot_block_percentage=None, min_spot_block_dura
self.cluster_info["composition"]["min_nodes"]["nodes"].append(spot_block)

def set_min_spot(self, min_spot_percentage=None, min_maximum_bid_price_percentage=100,
min_timeout_for_request=1, min_spot_fallback=None):
min_timeout_for_request=1, min_spot_allocation_strategy=None,
min_spot_fallback=None):
spot = {"percentage": min_spot_percentage,
"type": "spot",
"maximum_bid_price_percentage": min_maximum_bid_price_percentage,
"timeout_for_request": min_timeout_for_request,
"allocation_strategy": min_spot_allocation_strategy,
"fallback": min_spot_fallback
}
self.cluster_info["composition"]["min_nodes"]["nodes"].append(spot)
Expand All @@ -380,12 +394,16 @@ def set_autoscaling_spot_block(self, autoscaling_spot_block_percentage=None, aut
"timeout": autoscaling_spot_block_duration}
self.cluster_info["composition"]["autoscaling_nodes"]["nodes"].append(spot_block)

def set_autoscaling_spot(self, autoscaling_spot_percentage=None, autoscaling_maximum_bid_price_percentage=100,
autoscaling_timeout_for_request=1, autoscaling_spot_fallback=None):
def set_autoscaling_spot(self, autoscaling_spot_percentage=None,
autoscaling_maximum_bid_price_percentage=100,
autoscaling_timeout_for_request=1,
autoscaling_spot_allocation_strategy=None,
autoscaling_spot_fallback=None):
spot = {"percentage": autoscaling_spot_percentage,
"type": "spot",
"maximum_bid_price_percentage": autoscaling_maximum_bid_price_percentage,
"timeout_for_request": autoscaling_timeout_for_request,
"allocation_strategy": autoscaling_spot_allocation_strategy,
"fallback": autoscaling_spot_fallback
}
self.cluster_info["composition"]["autoscaling_nodes"]["nodes"].append(spot)
Expand Down Expand Up @@ -653,7 +671,11 @@ def cluster_info_parser(argparser, action):
default=None,
help="whether to fallback to on-demand instances for min nodes" +
" if spot instances aren't available")

composition_group.add_argument("--min-spot-allocation-strategy",
dest="min_spot_allocation_strategy",
choices=["lowestPrice", "capacityOptimized", None],
default=None,
help="allocation strategy for min spot nodes")
composition_group.add_argument("--autoscaling-ondemand-percentage",
dest="autoscaling_ondemand_percentage",
type=int,
Expand Down Expand Up @@ -689,6 +711,12 @@ def cluster_info_parser(argparser, action):
default=None,
help="whether to fallback to on-demand instances for autoscaling nodes" +
" if spot instances aren't available")
composition_group.add_argument("--autoscaling-spot-allocation-strategy",
dest="autoscaling_spot_allocation_strategy",
choices=["lowestPrice", "capacityOptimized", None],
default=None,
help="allocation strategy for autoscaling" +
" spot nodes")

# monitoring settings
monitoring_group = argparser.add_argument_group("monitoring settings")
Expand Down
10 changes: 5 additions & 5 deletions qds_sdk/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
"""


class MyAdapter(HTTPAdapter):
class RequestAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
super(MyAdapter, self).__init__(*args, **kwargs)
super(RequestAdapter, self).__init__(*args, **kwargs)

def init_poolmanager(self, connections, maxsize,block=False):
self.poolmanager = PoolManager(num_pools=connections,
Expand All @@ -49,11 +49,11 @@ def __init__(self, auth, rest_url, skip_ssl_cert_check,
self.base_retry_delay = base_retry_delay
if reuse:
self.session = requests.Session()
self.session.mount('https://', MyAdapter())
self.session.mount('https://', RequestAdapter())

# retries for get requests
self.session_with_retries = requests.Session()
self.session_with_retries.mount('https://', MyAdapter(max_retries=3))
self.session_with_retries.mount('https://', RequestAdapter(max_retries=3))

def retry(ExceptionToCheck, tries=5, delay=10, backoff=2):
def deco_retry(f):
Expand Down Expand Up @@ -107,7 +107,7 @@ def _api_call_raw(self, req_type, path, data=None, params=None):
else:
x = requests
x_with_retries = requests.Session()
x_with_retries.mount('https://', MyAdapter(max_retries=3))
x_with_retries.mount('https://', RequestAdapter(max_retries=3))

kwargs = {'headers': self._headers, 'auth': self.auth, 'verify': not self.skip_ssl_cert_check}

Expand Down
46 changes: 32 additions & 14 deletions qds_sdk/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ def __init__(self, flavour=None):
self.hadoop_settings = {}
self.presto_settings = {}
self.spark_settings = {}
self.airflow_settings ={}
self.airflow_settings = {}
self.engine_config = {}
self.mlflow_settings = {}

def set_engine_config(self,
custom_hadoop_config=None,
Expand All @@ -31,7 +32,8 @@ def set_engine_config(self,
airflow_version=None,
airflow_python_version=None,
is_ha=None,
enable_rubix=None):
enable_rubix=None,
mlflow_version=None):
'''
Args:
Expand Down Expand Up @@ -68,13 +70,16 @@ def set_engine_config(self,
is_ha: Enabling HA config for cluster
is_deeplearning : this is a deeplearning cluster config
enable_rubix: Enable rubix on the cluster
mlflow_version : this is the version of the mlflow cluster
'''

self.set_hadoop_settings(custom_hadoop_config, use_qubole_placement_policy, is_ha, fairscheduler_config_xml, default_pool, enable_rubix)
self.set_hadoop_settings(custom_hadoop_config, use_qubole_placement_policy, is_ha, fairscheduler_config_xml,
default_pool, enable_rubix)
self.set_presto_settings(presto_version, custom_presto_config)
self.set_spark_settings(spark_version, custom_spark_config)
self.set_airflow_settings(dbtap_id, fernet_key, overrides, airflow_version, airflow_python_version)
self.set_mlflow_settings(mlflow_version)

def set_fairscheduler_settings(self,
fairscheduler_config_xml=None,
Expand Down Expand Up @@ -121,11 +126,15 @@ def set_airflow_settings(self,
self.airflow_settings['version'] = airflow_version
self.airflow_settings['airflow_python_version'] = airflow_python_version

def set_mlflow_settings(self,
mlflow_version="1.5"):
self.mlflow_settings['version'] = mlflow_version

def set_engine_config_settings(self, arguments):
custom_hadoop_config = util._read_file(arguments.custom_hadoop_config_file)
fairscheduler_config_xml = util._read_file(arguments.fairscheduler_config_xml_file)
custom_presto_config = util._read_file(arguments.presto_custom_config_file)
is_deeplearning=False
is_deeplearning = False

self.set_engine_config(custom_hadoop_config=custom_hadoop_config,
use_qubole_placement_policy=arguments.use_qubole_placement_policy,
Expand All @@ -140,14 +149,16 @@ def set_engine_config_settings(self, arguments):
overrides=arguments.overrides,
airflow_version=arguments.airflow_version,
airflow_python_version=arguments.airflow_python_version,
enable_rubix=arguments.enable_rubix)
enable_rubix=arguments.enable_rubix,
mlflow_version=arguments.mlflow_version)

@staticmethod
def engine_parser(argparser):
engine_group = argparser.add_argument_group("engine settings")
engine_group.add_argument("--flavour",
dest="flavour",
choices=["hadoop", "hadoop2", "hs2", "hive", "presto", "spark", "sparkstreaming", "hbase", "airflow", "deeplearning"],
choices=["hadoop", "hadoop2", "hs2", "hive", "presto", "spark", "sparkstreaming",
"hbase", "airflow", "deeplearning", "mlflow"],
default=None,
help="Set engine flavour")

Expand All @@ -172,15 +183,15 @@ def engine_parser(argparser):
" for clusters with spot nodes", )
enable_rubix_group = hadoop_settings_group.add_mutually_exclusive_group()
enable_rubix_group.add_argument("--enable-rubix",
dest="enable_rubix",
action="store_true",
default=None,
help="Enable rubix for cluster", )
dest="enable_rubix",
action="store_true",
default=None,
help="Enable rubix for cluster", )
enable_rubix_group.add_argument("--no-enable-rubix",
dest="enable_rubix",
action="store_false",
default=None,
help="Do not enable rubix for cluster", )
dest="enable_rubix",
action="store_false",
default=None,
help="Do not enable rubix for cluster", )

fairscheduler_group = argparser.add_argument_group(
"fairscheduler configuration options")
Expand Down Expand Up @@ -236,3 +247,10 @@ def engine_parser(argparser):
default=None,
help="python environment version for airflow cluster", )

mlflow_settings_group = argparser.add_argument_group("mlflow settings")

mlflow_settings_group.add_argument("--mlflow-version",
dest="mlflow_version",
default=None,
help="mlflow version for mlflow cluster", )

Loading

0 comments on commit f685aa9

Please sign in to comment.