Skip to content

Commit

Permalink
Fix disk usage bug (#4473)
Browse files Browse the repository at this point in the history
* Add back in update_user_disk_used computation frmo scratch

* add in more updates

* Final fixes. Shoudl work now

* Fix issue with data size being updated after upload failure

* formatting

* add in new disk test at end of other tests since that seems to produce errors elsewhere for kill and run

---------

Co-authored-by: AndrewJGaut <[email protected]>
  • Loading branch information
AndrewJGaut and AndrewJGaut authored May 23, 2023
1 parent 944ad60 commit d2f13d9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 46 deletions.
9 changes: 3 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ jobs:
strategy:
matrix:
test:
- disk
- unittest gen-rest-docs gen-cli-docs gen-readthedocs basic auth status batch anonymous competition unicode rest1 upload1 upload2 upload3 upload4 download
- unittest gen-rest-docs gen-cli-docs gen-readthedocs basic auth status batch anonymous competition unicode rest1 upload1 upload2 upload3 upload4 download disk
- refs binary rm make worksheet_search worksheet_tags bundle_freeze_unfreeze worksheet_freeze_unfreeze detach perm search_time groups
- worker_manager service
- run time
Expand Down Expand Up @@ -290,8 +289,7 @@ jobs:
strategy:
matrix:
test:
- disk
- basic status batch anonymous unicode rest1 upload1 download
- basic status batch anonymous unicode rest1 upload1 download disk
- refs binary rm make worksheet_search worksheet_tags bundle_freeze_unfreeze worksheet_freeze_unfreeze detach perm search_time groups
- run
- search read kill write mimic workers
Expand Down Expand Up @@ -503,8 +501,7 @@ jobs:
strategy:
matrix:
test:
- disk
- unittest gen-rest-docs gen-cli-docs gen-readthedocs basic auth status batch anonymous competition unicode rest1 upload1 upload2 upload3 upload4 download
- unittest gen-rest-docs gen-cli-docs gen-readthedocs basic auth status batch anonymous competition unicode rest1 upload1 upload2 upload3 upload4 download disk
- refs binary rm make worksheet_search worksheet_tags bundle_freeze_unfreeze worksheet_freeze_unfreeze detach perm search_time groups
- worker_manager service
- run time
Expand Down
15 changes: 8 additions & 7 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,18 +416,19 @@ def upload_to_bundle_store(

def has_contents(self, bundle):
# TODO: make this non-fs-specific.
return os.path.exists(self._bundle_store.get_bundle_location(bundle.uuid))
bundle_location = self._bundle_store.get_bundle_location(bundle.uuid)
return (
os.path.lexists(bundle_location)
or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value)
or bundle_location.startswith(StorageURLScheme.GCS_STORAGE.value)
)

def cleanup_existing_contents(self, bundle):
data_size = int(
self._bundle_model.get_bundle_metadata([bundle.uuid], 'data_size')[bundle.uuid]
)
bundle_location = self._bundle_store.get_bundle_location(bundle.uuid)
removed = self._bundle_store.cleanup(bundle_location, dry_run=False)
self._bundle_store.cleanup(bundle_location, dry_run=False)
bundle_update = {'metadata': {'data_size': 0}}
self._bundle_model.update_bundle(bundle, bundle_update)
if removed:
self._bundle_model.increment_user_disk_used(bundle.owner_id, -data_size)
self._bundle_model.update_user_disk_used(bundle.owner_id)

def get_bundle_sas_token(self, path, **kwargs):
"""
Expand Down
32 changes: 14 additions & 18 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,10 @@ def transition_bundle_running(self, bundle, worker_run, row, user_id, worker_id,
'remote': worker_run.remote,
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'data_size': worker_run.disk_utilization,
}
if self.get_bundle_state(bundle.uuid) != State.FAILED:
# If the bundle state is failed, it means it failed on uploading_results and data_size was wiped.
metadata_update['data_size'] = worker_run.disk_utilization

# Increment user time and disk as we go to ensure user doesn't go over quota.
# time increment is the change in running time for this bundle since the last checkin.
Expand Down Expand Up @@ -1100,12 +1102,6 @@ def transition_bundle_finalizing(self, bundle, worker_run, user_id, connection):
if failure_message is None and exitcode is not None and exitcode != 0:
failure_message = 'Exit code %d' % exitcode

if user_id == self.root_user_id:
time_increment = worker_run.container_time_total - bundle.metadata.time
self.increment_user_time_used(bundle.owner_id, time_increment)
disk_increment = worker_run.disk_utilization - bundle.metadata.data_size
self.increment_user_disk_used(bundle.owner_id, disk_increment)

# Build metadata
metadata = {}
if failure_message is not None:
Expand Down Expand Up @@ -1175,19 +1171,9 @@ def update_disk_metadata(self, bundle, bundle_location):
Only used by bundle_manager when creating make bundles.
"""
data_size = self.get_data_size(bundle_location)
try:
if 'data_size' in bundle.metadata.__dict__:
current_data_size = bundle.metadata.data_size
else:
current_data_size = int(
self.get_bundle_metadata([bundle.uuid], 'data_size')[bundle.uuid]
)
except Exception:
current_data_size = 0
disk_increment = data_size - current_data_size
bundle_update = {'metadata': {'data_size': data_size}}
self.update_bundle(bundle, bundle_update)
self.increment_user_disk_used(bundle.owner_id, disk_increment)
self.update_user_disk_used(bundle.owner_id)

def bundle_checkin(self, bundle, worker_run, user_id, worker_id):
"""
Expand Down Expand Up @@ -2788,6 +2774,16 @@ def get_user_disk_quota_left(self, user_id, user_info=None):
user_info = self.get_user_info(user_id)
return user_info['disk_quota'] - user_info['disk_used']

def _get_disk_used(self, user_id):
# TODO(Ashwin): don't include linked bundles
return self.search_bundles(user_id, ['size=.sum', 'owner_id=' + user_id])['result'] or 0

def update_user_disk_used(self, user_id):
user_info = self.get_user_info(user_id)
# Compute from scratch for simplicity
user_info['disk_used'] = self._get_disk_used(user_id)
self.update_user_info(user_info)

def get_user_parallel_run_quota_left(self, user_id, user_info=None):
if not user_info:
user_info = self.get_user_info(user_id)
Expand Down
28 changes: 14 additions & 14 deletions codalab/rest/bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ def _update_bundle_state(bundle_uuid: str):
local.model.update_bundle(
bundle, {'state': state_on_failure, 'metadata': {'failure_message': error_msg},},
)
local.model.update_disk_metadata(bundle, bundle_location)
local.model.enforce_disk_quota(bundle, bundle_location)
bundles_dict = get_bundle_infos([bundle_uuid])
return BundleSchema(many=True).dump([bundles_dict]).data

Expand Down Expand Up @@ -1202,12 +1204,11 @@ def _update_bundle_contents_blob(uuid):
)
bundle_link_url = getattr(bundle.metadata, "link_url", None)
bundle_location = bundle_link_url or local.bundle_store.get_bundle_location(bundle.uuid)
local.model.update_disk_metadata(bundle, bundle_location)
local.model.enforce_disk_quota(bundle, bundle_location)

except UsageError as err:
# This is a user error (most likely disk quota overuser) so raise a client HTTP error
if local.upload_manager.has_contents(bundle):
local.upload_manager.cleanup_existing_contents(bundle)
msg = "Upload failed: %s" % err
local.model.update_bundle(
bundle,
Expand All @@ -1216,13 +1217,12 @@ def _update_bundle_contents_blob(uuid):
'metadata': {'failure_message': msg, 'error_traceback': traceback.format_exc()},
},
)
if local.upload_manager.has_contents(bundle):
local.upload_manager.cleanup_existing_contents(bundle)
abort(http.client.BAD_REQUEST, msg)

except Exception as e:
# Upload failed: cleanup, update state if desired, and return HTTP error
if local.upload_manager.has_contents(bundle):
local.upload_manager.cleanup_existing_contents(bundle)

msg = "Upload failed: %s" % e

# The client may not want to finalize the bundle on failure, to keep
Expand All @@ -1242,9 +1242,12 @@ def _update_bundle_contents_blob(uuid):
else:
local.model.update_bundle(
bundle,
{'metadata': {'failure_message': msg, 'error_traceback': traceback.format_exc()},},
{'metadata': {'failure_message': msg, 'error_traceback': traceback.format_exc()}},
)

if local.upload_manager.has_contents(bundle):
local.upload_manager.cleanup_existing_contents(bundle)

abort(http.client.INTERNAL_SERVER_ERROR, msg)

else:
Expand Down Expand Up @@ -1349,13 +1352,14 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run):
)

# cache these so we have them even after the metadata for the bundle has been deleted
bundle_data_sizes = local.model.get_bundle_metadata(relevant_uuids, 'data_size')
bundle_locations = {
uuid: local.bundle_store.get_bundle_location(uuid) for uuid in relevant_uuids
}

# Delete the actual bundle
if not dry_run:
for bundle in bundles:
local.model.update_bundle(bundle, {'metadata': {'data_size': 0}})
if not data_only:
# Delete bundle metadata.
local.model.delete_bundles(relevant_uuids)
Expand All @@ -1371,19 +1375,15 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run):
bundle_location = bundle_locations[uuid]

# Remove bundle
removed = False
if (
os.path.lexists(bundle_location)
or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value)
or bundle_location.startswith(StorageURLScheme.GCS_STORAGE.value)
):
removed = local.bundle_store.cleanup(bundle_location, dry_run)
local.bundle_store.cleanup(bundle_location, dry_run)

# Update user disk used.
if removed and uuid in bundle_data_sizes:
local.model.increment_user_disk_used(
request.user.user_id, -int(bundle_data_sizes[uuid])
)
# Update user disk used.
local.model.update_user_disk_used(request.user.user_id)
return relevant_uuids


Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ def test_disk(ctx):
disk_used = _run_command([cl, 'uinfo', 'codalab', '-f', 'disk_used'])
_run_command([cl, 'uedit', 'codalab', '--disk-quota', f'{int(disk_used) + 10}'])
uuid = _run_command(
[cl, 'run', 'head -c 1000 /dev/zero > test.txt',], request_disk=None, request_memory=None,
[cl, 'run', 'head -c 1000 /dev/zero > test.txt',], request_disk=None, request_memory='10m',
)
wait_until_state(uuid, State.FAILED)
_run_command([cl, 'uedit', 'codalab', '--disk-quota', ctx.disk_quota]) # reset disk quota
Expand Down

0 comments on commit d2f13d9

Please sign in to comment.