Skip to content

Commit

Permalink
Merge branch 'RESTAPI-1378-threadsafe-tasks-microservice' into 'master'
Browse files Browse the repository at this point in the history
Made Tasks microservice scalable

See merge request firecrest/firecrest!332
  • Loading branch information
Elia Palme committed Dec 19, 2024
2 parents 933eb00 + 8c9a1d3 commit f42b6f7
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 83 deletions.
2 changes: 1 addition & 1 deletion deploy/k8s/config/templates/cm.common.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v1
data:
F7T_DEBUG_MODE: {{ .Values.global.F7T_DEBUG_MODE | default "False" | quote }}
F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=3 --threads=3" | quote }}
F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=1 --threads=1" | quote }}
F7T_AUTH_HEADER_NAME: {{ .Values.F7T_AUTH_HEADER_NAME | default "Authorization" | quote }}
F7T_AUTH_REQUIRED_SCOPE: {{ .Values.F7T_AUTH_REQUIRED_SCOPE | default "" | quote }}
F7T_AUTH_ROLE: {{ .Values.F7T_AUTH_ROLE | default "" | quote }}
Expand Down
16 changes: 16 additions & 0 deletions src/common/async_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ def __init__(self,task_id,user,service=None,system=None,data=None,created_at=Non
self.created_at = created_at
self.updated_at = self.created_at

@classmethod
def deserialise(cls, value):

status = value["status"]
user = value["user"]
data = value["data"]
service = value["service"]
system = value["system"]
created_at = value["created_at"]
task_id = value["task_id"]

task = AsyncTask(task_id,user,service=service,system=system,created_at=created_at)
task.set_status(status,data)

return task

# create hash_id as user-task_id MD5 encoded string
# used for public access to info in Queue
def get_hashid(self,task_id,user):
Expand Down
1 change: 1 addition & 0 deletions src/common/tasks_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def get_user_tasks(r,user,task_list=None, status_code=None) -> Union[dict,None]:
# logging.info(json_task)
# decode because redis stores it in Bytes not string
task = json.loads(json_task.decode('latin-1'))
task["task_id"]=key_parts(task_id.decode('latin-1'))[2]

try:
_user = task["user"]
Expand Down
160 changes: 78 additions & 82 deletions src/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@

DEBUG_MODE = get_boolean_var(os.environ.get("F7T_DEBUG_MODE", False))

# task dict, key is the task_id
tasks = {}



app = Flask(__name__)
Expand Down Expand Up @@ -81,24 +80,6 @@ def init_queue():
app.logger.error("Tasks microservice cannot be started")
return

# dictionary: [task_id] = {hash_id,status_code,user,data}
task_list = persistence.get_all_tasks(r)

# key = task_id ; values = {status_code,user,data}
for task_id, value in task_list.items():


status = value["status"]
user = value["user"]
data = value["data"]
service = value["service"]
system = value["system"]
created_at = value["created_at"]

t = async_task.AsyncTask(task_id,user,service=service,system=system,created_at=created_at)
t.set_status(status,data)
tasks[t.hash_id] = t

# init Redis connection
init_queue()

Expand Down Expand Up @@ -189,7 +170,6 @@ def create_task():

t = async_task.AsyncTask(task_id=str(task_id), user=username, service=service, system=system,data=init_data)

tasks[t.hash_id] = t
if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
Expand Down Expand Up @@ -237,20 +217,28 @@ def get_task(id):
# for better knowledge of what this id is
hash_id = id





try:
if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403
global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])

#if not user_tasks[hash_id].is_owner(username):
# return jsonify(description="Operation not permitted. Invalid task owner."), 403
#task_status=user_tasks[hash_id].get_status()
#task_status["task_url"] = f"{KONG_URL}/tasks/{hash_id}"
current_task = current_task.get_status()
current_task["task_url"] = f"/tasks/{hash_id}"
data = jsonify(task=current_task)

task_status=tasks[hash_id].get_status()
task_status["task_url"] = f"/tasks/{hash_id}"
data = jsonify(task=task_status)
return data, 200

except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404


# update status of the task with task_id = id
@app.route("/<id>",methods=["PUT"])
def update_task(id):
Expand All @@ -277,52 +265,66 @@ def update_task(id):

# for better knowledge of what this id is
hash_id = id
auth_header = request.headers[AUTH_HEADER_NAME]

# getting username from auth_header
is_username_ok = get_username(auth_header)

if not is_username_ok["result"]:
app.logger.error(f"Couldn't extract username from JWT token: {is_username_ok['reason']}")
return jsonify(description=f"Couldn't retrieve task. Reason: {is_username_ok['reason']}"), 401

username = is_username_ok["username"]


# check if task exist
try:
current_task=tasks[hash_id]
except KeyError:
data = jsonify(error=f"Task {hash_id} does not exist")
return data, 404
global r
current_task = async_task.AsyncTask.deserialise(persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])



if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
span.set_tag('f7t_task_id', hash_id)
except Exception as e:
app.logger.info(e)
if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
span.set_tag('f7t_task_id', hash_id)
except Exception as e:
app.logger.info(e)


# checks if status request is valid:
if status not in async_task.status_codes:
data = jsonify(error="Status code error",status=status)
app.logger.error(data)
return data, 400
# checks if status request is valid:
if status not in async_task.status_codes:
data = jsonify(error="Status code error",status=status)
app.logger.error(data)
return data, 400


# if no msg on request, default status msg:
if msg == None:
msg = async_task.status_codes[status]
# if no msg on request, default status msg:
if msg == None:
msg = async_task.status_codes[status]

# update task in memory
tasks[hash_id].set_status(status=status, data=msg)
# update task in memory
current_task.set_status(status=status, data=msg)

# getting service from task, to set exp_time according to the service
service = tasks[hash_id].get_internal_status()["service"]
# getting service from task, to set exp_time according to the service
service = current_task.get_internal_status()["service"]

global r
exp_time = STORAGE_TASK_EXP_TIME

exp_time = STORAGE_TASK_EXP_TIME

if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME
if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME

#update task in persistence server
if not persistence.save_task(r,tasks[hash_id].task_id, task=tasks[hash_id].get_internal_status(), exp_time=exp_time):
app.logger.error("Error saving task")
app.logger.error(tasks[hash_id].get_internal_status())
return jsonify(description="Couldn't update task"), 400
#update task in persistence server
if not persistence.save_task(r,current_task.task_id, task=current_task.get_internal_status(), exp_time=exp_time):
app.logger.error("Error saving task")
app.logger.error(current_task.get_internal_status())
return jsonify(description="Couldn't update task"), 400

app.logger.info(f"New status for task {hash_id}: {status}")
app.logger.info(f"New status for task {hash_id}: {status}")
except KeyError:
data = jsonify(error=f"Task {hash_id} does not exist")
return data, 404

data = jsonify(success="task updated")
return data, 200
Expand All @@ -347,23 +349,20 @@ def delete_task(id):
hash_id = id

# if username isn't taks owner, then deny access
try:
if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403
except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404

try:
global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])


if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=300):
if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=300):
return jsonify(error=f"Failed to delete task {hash_id} on persistence server"), 400

data = jsonify(success=f"Task {hash_id} deleted")
tasks[hash_id].set_status(status=async_task.INVALID, data="")
return data, 204

except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404
except Exception as e:
app.logger.error(f"Failed to delete task {hash_id} on persistence server")
app.logger.error(f"Error: {type(e)}")
Expand Down Expand Up @@ -403,30 +402,27 @@ def expire_task(id):

# if username isn't taks owner, then deny access
try:
if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403
except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404

global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])


exp_time = STORAGE_TASK_EXP_TIME

if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME
exp_time = STORAGE_TASK_EXP_TIME

try:
global r
if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME

app.logger.info(f"Set expiration for task {tasks[hash_id].task_id} - {exp_time} secs")
if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=exp_time):
app.logger.info(f"Set expiration for task {current_task.task_id} - {exp_time} secs")
if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=exp_time):
app.logger.warning(f"Task couldn't be marked as expired")
return jsonify(error="Failed to set expiration time on task in persistence server"), 400

data = jsonify(success=f"Task expiration time set to {exp_time} secs.")

return data, 200

except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404
except Exception:
data = jsonify(Error="Failed to set expiration time on task in persistence server")
return data, 400
Expand Down

0 comments on commit f42b6f7

Please sign in to comment.