Skip to content

Commit

Permalink
Reverted in memory tasks cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Elia Palme committed Dec 20, 2024
1 parent 89e94d8 commit b376402
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 108 deletions.
5 changes: 2 additions & 3 deletions deploy/docker/storage/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ ENV F7T_SSL_KEY /ssl/f7t_internal.key
ENV F7T_GUNICORN_LOG --error-logfile ${F7T_LOG_PATH}/storage.gunicorn.log
ENV F7T_GUNICORN_SSL --ciphers TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256,DHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES128-GCM-SHA256 \
--ssl-version TLSv1_2 --keyfile $F7T_SSL_KEY --certfile $F7T_SSL_CRT
ENV F7T_GUNICORN_WORKER_STORAGE --workers=1 --threads=1

# Note: storage workers are kept to 1 to reduce load on /taskslist endpoint

ENTRYPOINT /usr/local/bin/gunicorn ${F7T_GUNICORN_SSL} --workers=1 --threads=1 --bind :${F7T_STORAGE_PORT} ${F7T_GUNICORN_LOG} storage:app
ENTRYPOINT /usr/local/bin/gunicorn ${F7T_GUNICORN_SSL} ${F7T_GUNICORN_WORKER_STORAGE} --bind :${F7T_STORAGE_PORT} ${F7T_GUNICORN_LOG} storage:app
7 changes: 4 additions & 3 deletions deploy/docker/tasks/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ENV F7T_SSL_KEY /ssl/f7t_internal.key
ENV F7T_GUNICORN_LOG --error-logfile ${F7T_LOG_PATH}/tasks.gunicorn.log
ENV F7T_GUNICORN_SSL --ciphers TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256,DHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES128-GCM-SHA256 \
--ssl-version TLSv1_2 --keyfile $F7T_SSL_KEY --certfile $F7T_SSL_CRT
ENV F7T_GUNICORN_WORKER --workers=1 --threads=1

ENTRYPOINT /usr/local/bin/gunicorn ${F7T_GUNICORN_SSL} ${F7T_GUNICORN_WORKER} --bind :${F7T_TASKS_PORT} ${F7T_GUNICORN_LOG} ${F7T_GUNICORN_LOG} tasks:app

# Note: The tasks microservice stores all live tasks in-memmory.
# The code is not thread safe, hence workers and threads are enforced to 1
ENTRYPOINT /usr/local/bin/gunicorn ${F7T_GUNICORN_SSL} --workers=1 --threads=1 --bind :${F7T_TASKS_PORT} ${F7T_GUNICORN_LOG} ${F7T_GUNICORN_LOG} tasks:app
3 changes: 2 additions & 1 deletion deploy/k8s/config/templates/cm.common.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
apiVersion: v1
data:
F7T_DEBUG_MODE: {{ .Values.global.F7T_DEBUG_MODE | default "False" | quote }}
F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=1 --threads=1" | quote }}
F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=3 --threads=1" | quote }}
F7T_GUNICORN_WORKER_STORAGE: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=1 --threads=1" | quote }}
F7T_AUTH_HEADER_NAME: {{ .Values.F7T_AUTH_HEADER_NAME | default "Authorization" | quote }}
F7T_AUTH_REQUIRED_SCOPE: {{ .Values.F7T_AUTH_REQUIRED_SCOPE | default "" | quote }}
F7T_AUTH_ROLE: {{ .Values.F7T_AUTH_ROLE | default "" | quote }}
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s/tasks/templates/deploy.tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ spec:
command: ["/bin/sh", "-c", "sleep 10"]
resources:
limits:
memory: {{ .Values.F7T_REDIS_MEMORY | default "128Mi" | quote }}
memory: {{ .Values.F7T_REDIS_MEMORY | default "512Mi" | quote }}
cpu: {{ .Values.F7T_REDIS_CPU | default "500m" | quote }}
ports:
- containerPort: {{ .Values.F7T_PERSIST_PORT | default 6379 | atoi }}
Expand Down
5 changes: 4 additions & 1 deletion doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ The most complete way of installing is to setup 3 hosts:
|`F7T_LOG_TYPE` | NO | `file` | Type of logs. Valid values are `file` and `stdout` | `Backend` |
|`F7T_GUNICORN_LOG` | NO | `--error-logfile ${F7T_LOG_PATH}/<service>.gunicorn.log` | Logs configuration for Gunicorn Web Server (the server used to expose microservice's containers), Set to empty for stdout output. For more information please refer to [Gunicorn Settings](https://docs.gunicorn.org/en/stable/settings.html) | `Backend`|
|`F7T_GUNICORN_SSL` | NO | `--ciphers TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_GCM_SHA256,DHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-RSA-AES128-GCM-SHA256 --ssl-version TLSv1_2 --keyfile $F7T_SSL_KEY --certfile $F7T_SSL_CRT` |SSL configuration for Gunicorn Web Server (the server used to expose microservice's containers). For more information please refer to [Gunicorn Settings](https://docs.gunicorn.org/en/stable/settings.html) | `Backend`|
|`F7T_GUNICORN_WORKER` | NO | `--workers=1 --threads=1` | Worker configuration for Gunicorn Web Server (the server used to expose microservice's containers). For more information please refer to [Gunicorn Settings](https://docs.gunicorn.org/en/stable/settings.html) | `Backend`|
|`F7T_GUNICORN_WORKER` | NO | `--workers=1 --threads=1` | Worker configuration for Gunicorn Web Server (the server used to expose microservice's containers). For more information please refer to [Gunicorn Settings]
(https://docs.gunicorn.org/en/stable/settings.html) | `Backend`|
|`F7T_GUNICORN_WORKER_STORAGE` | NO | `--workers=1 --threads=1` | Storage Worker configuration for Gunicorn Web Server (the server used to expose microservice's containers). For more information please refer to [Gunicorn Settings]
(https://docs.gunicorn.org/en/stable/settings.html) | `Backend`|
|`F7T_JAEGER_AGENT` | NO | `''` | Set this value to the Hostname (IP or DNS) of the [Jaeger](https://www.jaegertracing.io/docs/1.54/getting-started/) tracing instance. Port is fixed at `6831/UDP`, no need to set it up (example: `F7T_JAEGER_AGENT=192.168.220.50`) | `Backend`, `Certificator`|
|`F7T_OPA_ENABLED` | NO | `False` | Set this value to `True` if the [OPA](https://www.openpolicyagent.org/docs/latest/) instance is needed for whitelisting which users can execute commands through FirecREST | `Backend`, `Certificator`|
|`F7T_OPA_URL` | NO | `http://localhost:8181` | Set this variable with the form `<schema>://host:port` where the [OPA](https://www.openpolicyagent.org/docs/latest/) instance is running. Only used if `F7T_OPA_ENABLED=True` | `Backend`, `Certificator`|
Expand Down
16 changes: 0 additions & 16 deletions src/common/async_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,6 @@ def __init__(self,task_id,user,service=None,system=None,data=None,created_at=Non
self.created_at = created_at
self.updated_at = self.created_at

@classmethod
def deserialise(cls, value):

status = value["status"]
user = value["user"]
data = value["data"]
service = value["service"]
system = value["system"]
created_at = value["created_at"]
task_id = value["task_id"]

task = AsyncTask(task_id,user,service=service,system=system,created_at=created_at)
task.set_status(status,data)

return task

# create hash_id as user-task_id MD5 encoded string
# used for public access to info in Queue
def get_hashid(self,task_id,user):
Expand Down
5 changes: 0 additions & 5 deletions src/common/tasks_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ def get_user_tasks(r,user,task_list=None, status_code=None) -> Union[dict,None]:
# logging.info(json_task)
# decode because redis stores it in Bytes not string
task = json.loads(json_task.decode('latin-1'))
task["task_id"]=key_parts(task_id.decode('latin-1'))[2]

try:
_user = task["user"]
Expand All @@ -268,10 +267,6 @@ def get_user_tasks(r,user,task_list=None, status_code=None) -> Union[dict,None]:
continue

task_dict[task["hash_id"]] = task

# check if all tasks have been found
if (task_list != None) and (len(task_list) == len(task_dict)):
break

return task_dict

Expand Down
160 changes: 82 additions & 78 deletions src/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@

DEBUG_MODE = get_boolean_var(os.environ.get("F7T_DEBUG_MODE", False))


# task dict, key is the task_id
tasks = {}


app = Flask(__name__)
Expand Down Expand Up @@ -80,6 +81,24 @@ def init_queue():
app.logger.error("Tasks microservice cannot be started")
return

# dictionary: [task_id] = {hash_id,status_code,user,data}
task_list = persistence.get_all_tasks(r)

# key = task_id ; values = {status_code,user,data}
for task_id, value in task_list.items():


status = value["status"]
user = value["user"]
data = value["data"]
service = value["service"]
system = value["system"]
created_at = value["created_at"]

t = async_task.AsyncTask(task_id,user,service=service,system=system,created_at=created_at)
t.set_status(status,data)
tasks[t.hash_id] = t

# init Redis connection
init_queue()

Expand Down Expand Up @@ -170,6 +189,7 @@ def create_task():

t = async_task.AsyncTask(task_id=str(task_id), user=username, service=service, system=system,data=init_data)

tasks[t.hash_id] = t
if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
Expand Down Expand Up @@ -217,28 +237,20 @@ def get_task(id):
# for better knowledge of what this id is
hash_id = id





try:
global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])

#if not user_tasks[hash_id].is_owner(username):
# return jsonify(description="Operation not permitted. Invalid task owner."), 403
#task_status=user_tasks[hash_id].get_status()
#task_status["task_url"] = f"{KONG_URL}/tasks/{hash_id}"
current_task = current_task.get_status()
current_task["task_url"] = f"/tasks/{hash_id}"
data = jsonify(task=current_task)
if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403

task_status=tasks[hash_id].get_status()
task_status["task_url"] = f"/tasks/{hash_id}"
data = jsonify(task=task_status)
return data, 200

except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404


# update status of the task with task_id = id
@app.route("/<id>",methods=["PUT"])
def update_task(id):
Expand All @@ -265,66 +277,52 @@ def update_task(id):

# for better knowledge of what this id is
hash_id = id
auth_header = request.headers[AUTH_HEADER_NAME]

# getting username from auth_header
is_username_ok = get_username(auth_header)

if not is_username_ok["result"]:
app.logger.error(f"Couldn't extract username from JWT token: {is_username_ok['reason']}")
return jsonify(description=f"Couldn't retrieve task. Reason: {is_username_ok['reason']}"), 401

username = is_username_ok["username"]


# check if task exist
try:
global r
current_task = async_task.AsyncTask.deserialise(persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])


current_task=tasks[hash_id]
except KeyError:
data = jsonify(error=f"Task {hash_id} does not exist")
return data, 404

if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
span.set_tag('f7t_task_id', hash_id)
except Exception as e:
app.logger.info(e)
if JAEGER_AGENT != "":
try:
span = tracing.get_span(request)
span.set_tag('f7t_task_id', hash_id)
except Exception as e:
app.logger.info(e)


# checks if status request is valid:
if status not in async_task.status_codes:
data = jsonify(error="Status code error",status=status)
app.logger.error(data)
return data, 400
# checks if status request is valid:
if status not in async_task.status_codes:
data = jsonify(error="Status code error",status=status)
app.logger.error(data)
return data, 400


# if no msg on request, default status msg:
if msg == None:
msg = async_task.status_codes[status]
# if no msg on request, default status msg:
if msg == None:
msg = async_task.status_codes[status]

# update task in memory
current_task.set_status(status=status, data=msg)
# update task in memory
tasks[hash_id].set_status(status=status, data=msg)

# getting service from task, to set exp_time according to the service
service = current_task.get_internal_status()["service"]
# getting service from task, to set exp_time according to the service
service = tasks[hash_id].get_internal_status()["service"]


exp_time = STORAGE_TASK_EXP_TIME
global r
exp_time = STORAGE_TASK_EXP_TIME

if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME
if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME

#update task in persistence server
if not persistence.save_task(r,current_task.task_id, task=current_task.get_internal_status(), exp_time=exp_time):
app.logger.error("Error saving task")
app.logger.error(current_task.get_internal_status())
return jsonify(description="Couldn't update task"), 400
#update task in persistence server
if not persistence.save_task(r,tasks[hash_id].task_id, task=tasks[hash_id].get_internal_status(), exp_time=exp_time):
app.logger.error("Error saving task")
app.logger.error(tasks[hash_id].get_internal_status())
return jsonify(description="Couldn't update task"), 400

app.logger.info(f"New status for task {hash_id}: {status}")
except KeyError:
data = jsonify(error=f"Task {hash_id} does not exist")
return data, 404
app.logger.info(f"New status for task {hash_id}: {status}")

data = jsonify(success="task updated")
return data, 200
Expand All @@ -349,20 +347,23 @@ def delete_task(id):
hash_id = id

# if username isn't taks owner, then deny access
try:
if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403
except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404

try:
global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])


if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=300):
if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=300):
return jsonify(error=f"Failed to delete task {hash_id} on persistence server"), 400

data = jsonify(success=f"Task {hash_id} deleted")
tasks[hash_id].set_status(status=async_task.INVALID, data="")
return data, 204

except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404
except Exception as e:
app.logger.error(f"Failed to delete task {hash_id} on persistence server")
app.logger.error(f"Error: {type(e)}")
Expand Down Expand Up @@ -402,27 +403,30 @@ def expire_task(id):

# if username isn't taks owner, then deny access
try:
global r
current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id])

if not tasks[hash_id].is_owner(username):
return jsonify(description="Operation not permitted. Invalid task owner."), 403
except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404


exp_time = STORAGE_TASK_EXP_TIME
exp_time = STORAGE_TASK_EXP_TIME

if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME

if service == "compute":
exp_time = COMPUTE_TASK_EXP_TIME
try:
global r

app.logger.info(f"Set expiration for task {current_task.task_id} - {exp_time} secs")
if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=exp_time):
app.logger.info(f"Set expiration for task {tasks[hash_id].task_id} - {exp_time} secs")
if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=exp_time):
app.logger.warning(f"Task couldn't be marked as expired")
return jsonify(error="Failed to set expiration time on task in persistence server"), 400

data = jsonify(success=f"Task expiration time set to {exp_time} secs.")

return data, 200
except KeyError:
data = jsonify(error=f"Task {id} does not exist")
return data, 404

except Exception:
data = jsonify(Error="Failed to set expiration time on task in persistence server")
return data, 400
Expand Down

0 comments on commit b376402

Please sign in to comment.