Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enso_File integration update: Multi-part upload and presigned URL for download #11440

Merged
merged 18 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ type Enso_File
asset = Existing_Enso_Asset.get_asset_reference_for self
response = case asset.asset_type of
Enso_Asset_Type.File ->
Utils.http_request HTTP_Method.Get (asset.internal_uri + "/contents")
presigned_url = asset.get_file_description |> get_required_field "url" expected_type=Text
# TODO once #11342 is integrated add cache_policy=..No_Cache because the presigned urls are unlikely to be reused between requests
# But we could implement custom cloud cache that relies on checking asset version
HTTP.fetch presigned_url HTTP_Method.Get
radeusgd marked this conversation as resolved.
Show resolved Hide resolved
Enso_Asset_Type.Data_Link ->
Runtime.assert (open_options.contains Data_Link_Access.No_Follow)
Utils.http_request HTTP_Method.Get asset.internal_uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import project.Error.Error
import project.Errors.File_Error.File_Error
import project.Errors.Illegal_Argument.Illegal_Argument
import project.Errors.Illegal_State.Illegal_State
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.HTTP.Request_Body.Request_Body
import project.Network.HTTP.Request_Error
import project.Network.URI.URI
import project.Nothing.Nothing
import project.Random.Random
import project.Runtime
import project.System.File.File
import project.System.File.File_Access.File_Access
import project.System.Output_Stream.Output_Stream
from project.Data.Boolean import Boolean, False, True
from project.Data.Text.Extensions import all
Expand Down Expand Up @@ -73,26 +77,53 @@ generic_create_asset (destination : Enso_File) (allow_existing : Boolean) (creat

## PRIVATE
`generate_request_body_and_result` should return a pair,
where the first element is the request body and the second element is the result to be returned.
where the first element is the file to be uploaded and the second element is the result to be returned.
It is executed lazily, only after all pre-conditions are successfully met.
perform_upload (destination : Enso_File) (allow_existing : Boolean) (~generate_request_body_and_result) =
generic_create_asset destination allow_existing parent_directory_asset-> existing_asset-> error_handlers->
if existing_asset.is_nothing.not && existing_asset.asset_type != Enso_Asset_Type.File then Error.throw (Illegal_Argument.Error "The destination must be a path to a file, not "+existing_asset.asset_type.to_text+".") else
existing_asset_id = existing_asset.if_not_nothing <| existing_asset.id
file_name = destination.name
base_uri = URI.from Utils.files_api
. add_query_argument "parent_directory_id" parent_directory_asset.id
. add_query_argument "file_name" file_name
full_uri = case existing_asset_id of
Nothing -> base_uri
_ -> base_uri . add_query_argument "file_id" existing_asset_id
base_uri = (URI.from Utils.files_api) / "upload"

pair = generate_request_body_and_result
payload = pair.first : File
result = pair.second
file_size = payload.size
# Based on MAX_CHUNK_SIZE in s3.rs
chunk_size = 10000000

Asset_Cache.invalidate destination
response = Utils.http_request_as_json HTTP_Method.Post full_uri pair.first error_handlers=error_handlers
response.if_not_error <|
id = get_required_field "id" response expected_type=Text
Asset_Cache.update destination (Existing_Enso_Asset.new id file_name) . if_not_error <|
pair.second
upload_start_payload = JS_Object.from_pairs [["fileName", file_name], ["size", file_size]]
upload_setup = Utils.http_request_as_json HTTP_Method.Post (base_uri / "start") upload_start_payload error_handlers=error_handlers

upload_setup.if_not_error <|
presigned_urls = get_required_field "presignedUrls" upload_setup expected_type=Vector

# Metadata to be passed to `upload/end`
upload_id = get_required_field "uploadId" upload_setup expected_type=Text
source_path = get_required_field "sourcePath" upload_setup expected_type=Text

Runtime.assert (presigned_urls * chunk_size >= file_size) "The sum of the chunk sizes must be greater than or equal to the file size."
radeusgd marked this conversation as resolved.
Show resolved Hide resolved

# Currently we upload chunks one-by-one, in the future this could be done in parallel.
parts = payload.with_input_stream [File_Access.Read] input_stream->
presigned_urls.map_with_index i-> part_url->
chunk_bytes = input_stream.read_n_bytes chunk_size
request_body = Request_Body.Byte_Array chunk_bytes
response = HTTP.post part_url request_body HTTP_Method.Put
e_tag = response.get_header "ETag" if_missing=(Error.throw (Illegal_State.Error "The ETag header is missing in the multipart upload response."))
JS_Object.from_pairs [["partNumber", i + 1], ["eTag", e_tag]]

upload_end_payload = JS_Object.from_pairs <|
[["uploadId", upload_id], ["sourcePath", source_path]]
+ [["parts", parts]]
+ [["fileName", file_name], ["parentDirectoryId", parent_directory_asset.id]]
+ (if existing_asset.is_nothing.not then [["assetId", existing_asset.id]] else [])
response = Utils.http_request_as_json HTTP_Method.Post (base_uri / "end") upload_end_payload error_handlers=error_handlers
response.if_not_error <|
id = get_required_field "id" response expected_type=Text
Asset_Cache.update destination (Existing_Enso_Asset.new id file_name) . if_not_error <|
result

## PRIVATE
Creates a directory at the given path, also creating parent directories if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ private _resolve_body body:Request_Body hash_function =
## ToDo: Support hashing a file.
hash = if hash_function.is_nothing then "" else Unimplemented.throw "Hashing a file body is not yet supported."
Resolved_Body.Value (body_publishers.ofFile path) Nothing hash
Request_Body.Byte_Array bytes ->
hash = if hash_function.is_nothing then "" else hash_function bytes
Resolved_Body.Value (body_publishers.ofByteArray bytes) Nothing hash
Request_Body.Form_Data form_data url_encoded ->
_resolve_form_body form_data url_encoded hash_function
Request_Body.Empty ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type Request_Body
- file: The file to send.
Binary (file:File=(Missing_Argument.throw "file"))

## PRIVATE
ADVANCED
Raw bytes array to be sent as binary data.
This is mostly used for internal purposes.
Byte_Array bytes

## Request body with form data.

Arguments:
Expand All @@ -61,10 +67,12 @@ type Request_Body
default_content_type_header : Header | Nothing
default_content_type_header self =
case self of
Request_Body.Text _ _ _ -> Header.content_type "text/plain" encoding=Encoding.utf_8
Request_Body.Json _ -> Header.content_type "application/json"
Request_Body.Binary _ -> Header.content_type "application/octet-stream"
Request_Body.Form_Data _ url_encoded -> if url_encoded then Header.application_x_www_form_urlencoded else Nothing
Request_Body.Text _ _ _ -> Header.content_type "text/plain" encoding=Encoding.utf_8
Request_Body.Json _ -> Header.content_type "application/json"
Request_Body.Binary _ -> Header.content_type "application/octet-stream"
Request_Body.Byte_Array _ -> Header.content_type "application/octet-stream"
Request_Body.Form_Data _ url_encoded ->
if url_encoded then Header.application_x_www_form_urlencoded else Nothing
Request_Body.Empty -> Nothing

## PRIVATE
Expand Down
17 changes: 17 additions & 0 deletions test/Base_Tests/src/Network/Enso_Cloud/Enso_File_Spec.enso
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ add_specs suite_builder setup:Cloud_Tests_Setup = suite_builder.group "Enso Clou
"hi!".utf_8.write_bytes f . should_succeed
f.read ..Plain_Text . should_equal "hi!"

group_builder.specify "should be able to upload and download big files" <|
f = test_root.get / "big_file.txt"
# Let's say 42MB is big enough of a stress test
kilobytes_to_write = 42 * 1024
one_kilobyte = "A".repeat 1024 . utf_8
r = f.with_output_stream [File_Access.Write] output_stream->
0.up_to kilobytes_to_write . each _->
output_stream.write_bytes one_kilobyte
r.should_succeed
# Check that the uploaded size is as expected
f.size . should_equal (kilobytes_to_write * 1024)

# And now try to download it
v = f.read Bytes
v.should_be_a Vector
v.length . should_equal f.size

group_builder.specify "does not currently support append" <|
f = test_root.get / "written_file3.txt"
Test.expect_panic Unimplemented <|
Expand Down
Loading