Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Figure out larger file support (up to 5GB) #39

Merged
merged 9 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/maestro/test/session_SUITE_data/others/a.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[server]
[server.auth.tls]
# status = "disabled"
port = 8022
port = 8822
certfile = "./others/certs/a.crt"
keyfile = "./others/certs/a.key"
[server.auth.tls.authorized]
Expand Down
4 changes: 2 additions & 2 deletions apps/maestro/test/session_SUITE_data/others/a_switch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[server]
[server.auth.tls]
# status = "disabled"
port = 8022
port = 8822
certfile = "./others/certs/a.crt"
keyfile = "./others/certs/a.key"
[server.auth.tls.authorized]
Expand All @@ -21,7 +21,7 @@
[peers]
[peers.b]
sync = ["test"]
url = "127.0.0.1:8023"
url = "127.0.0.1:8823"
[peers.b.auth]
type = "tls"
certfile = "./others/certs/a.crt"
Expand Down
2 changes: 1 addition & 1 deletion apps/maestro/test/session_SUITE_data/others/b.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[peers]
[peers.a]
sync = ["test"]
url = "127.0.0.1:8022"
url = "127.0.0.1:8822"
[peers.a.auth]
type = "tls"
certfile = "./others/certs/b.crt"
Expand Down
4 changes: 2 additions & 2 deletions apps/maestro/test/session_SUITE_data/others/b_switch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[peers]
[peers.a]
sync = ["test"]
url = "127.0.0.1:8022"
url = "127.0.0.1:8822"
[peers.a.auth]
type = "tls"
certfile = "./others/certs/b.crt"
Expand All @@ -19,7 +19,7 @@
[server]
[server.auth.tls]
# status = "disabled"
port = 8023
port = 8823
certfile = "./others/certs/b.crt"
keyfile = "./others/certs/b.key"
[server.auth.tls.authorized]
Expand Down
3 changes: 2 additions & 1 deletion apps/revault/src/revault.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
aws, jsx
]},
{env, [
{backend, disk}
{backend, disk},
{multipart_size, 52428800} % 50MiB
]},
{modules, []},

Expand Down
54 changes: 51 additions & 3 deletions apps/revault/src/revault_data_wrapper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
%%% protocol here does not need to know where or how.
-module(revault_data_wrapper).
-export([peer/1, peer/2, new/0, ask/0, ok/0, error/1, fork/2]).
-export([manifest/0, manifest/1, send_file/4, send_deleted/2,
send_conflict_file/5, fetch_file/1,
-export([manifest/0, manifest/1,
send_file/4, send_multipart_file/6, send_deleted/2,
send_conflict_file/5, send_conflict_multipart_file/7, fetch_file/1,
sync_complete/0]).
-define(VSN, 1).

-include("revault_data_wrapper.hrl").

peer(Remote) ->
{peer, ?VSN, Remote}.
Expand All @@ -36,12 +38,58 @@ manifest(Data) ->
send_file(Path, Vsn, Hash, Bin) ->
{file, ?VSN, Path, {Vsn, Hash}, Bin}.

%% @private send_multipart_file/6 is special in that it implies an ordering
%% between all calls for breaking down large file uploads.
%%
%% A new multipart transfer is defined with the `PartNum' value `1'. All
%% subsequent calls must get an incrementing `PartNum' in order, with no gap,
%% until `PartNum =:= PartTotal', which signals that the last part of the file
%% has been received and the write can be considered complete.
%%
%% This API is a choice that allows transferring sequentially to disk, or in
%% parallel in some S3 upload. In picking the lowest common denominator, we
%% expect sequential transfers, such that a file on disk can be continuously
%% appended.
%%
%% The `Hash' value sent is expected to be repeated every time, and is the hash
%% of the final transfer. This is because ReVault does not have any sort of
%% file locking mechanism, and expecting the same hash from beginning to end
%% allows us to detect if the source file has been modified during the
%% transfer, and ignore the result upon the final saving.
%%
%% TODO: this interface is different from the conflict file mechanism which uses
%% a decrementing counter. We might want to align the conflict file mechanism
%% at some point.
-spec send_multipart_file(Path, Vsn, Hash, PartNum, PartTotal, binary()) ->
{file, ?VSN, Path, {Vsn, Hash}, PartNum, PartTotal, binary()}
when Path :: file:filename(),
Vsn :: revault_dirmon_tracker:stamp(),
Hash :: revault_file:hash(),
PartNum :: 1..10000,
PartTotal :: 1..10000.
send_multipart_file(Path, Vsn, Hash, M, N, Bin) when M >= 1, M =< N ->
{file, ?VSN, Path, {Vsn, Hash}, M, N, Bin}.

send_deleted(Path, Vsn) ->
{deleted_file, ?VSN, Path, {Vsn, deleted}}.

send_conflict_file(WorkPath, Path, ConflictsLeft, Meta, Bin) ->
{conflict_file, ?VSN, WorkPath, Path, ConflictsLeft, Meta, Bin}.

%% TODO: figure out if we need a way to work around conflict candidate files
%% having hashes modified from under us during the transfer
-spec send_conflict_multipart_file(WorkPath, Path, ConflictsLeft, Meta, PartNum, PartTotal, binary()) ->
{conflict_multipart_file, ?VSN, WorkPath, Path, ConflictsLeft, Meta, PartNum, PartTotal, binary()}
when WorkPath :: file:filename(),
Path :: file:filename(),
ConflictsLeft :: non_neg_integer(),
Meta :: {revault_dirmon_tracker:stamp(),
revault_file:hash() | {conflict, [revault_file:hash()], revault_file:hash()}},
PartNum :: 1..10000,
PartTotal :: 1..10000.
send_conflict_multipart_file(WorkPath, Path, ConflictsLeft, Meta, PartNum, PartTotal, Bin) ->
{conflict_multipart_file, ?VSN, WorkPath, Path, ConflictsLeft, Meta, PartNum, PartTotal, Bin}.

fetch_file(Path) ->
{fetch, ?VSN, Path}.

Expand Down
4 changes: 4 additions & 0 deletions apps/revault/src/revault_data_wrapper.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
% VSN 1: initial protocol
% VSN 2: adds multipart file transfers
% TODO: add test about protocol compatibility
-define(VSN, 2).
7 changes: 6 additions & 1 deletion apps/revault/src/revault_disterl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
%%% converters to deal with various encodings and wire formats.
-module(revault_disterl).
-export([callback/1, mode/1, peer/3, accept_peer/2, unpeer/2, send/2, reply/3, unpack/1]).
-define(VSN, 1).

-include("revault_data_wrapper.hrl").

-type state() :: ?MODULE.
-export_type([state/0]).

Expand Down Expand Up @@ -77,11 +79,14 @@ unpack({error, ?VSN, R}) -> {error, R};
unpack({manifest, ?VSN}) -> manifest;
unpack({manifest, ?VSN, Data}) -> {manifest, Data};
unpack({file, ?VSN, Path, Meta, Bin}) -> {file, Path, Meta, Bin};
unpack({file, ?VSN, Path, Meta, PartNum, PartTotal, Bin}) -> {file, Path, Meta, PartNum, PartTotal, Bin};
unpack({fetch, ?VSN, Path}) -> {fetch, Path};
unpack({sync_complete, ?VSN}) -> sync_complete;
unpack({deleted_file, ?VSN, Path, Meta}) -> {deleted_file, Path, Meta};
unpack({conflict_file, ?VSN, WorkPath, Path, Count, Meta, Bin}) ->
{conflict_file, WorkPath, Path, Count, Meta, Bin};
unpack({conflict_multipart_file, ?VSN, WorkPath, Path, Count, Meta, PartNum, PartTotal, Bin}) ->
{conflict_multipart_file, WorkPath, Path, Count, Meta, PartNum, PartTotal, Bin};
unpack(Term) ->
Term.

Expand Down
47 changes: 45 additions & 2 deletions apps/revault/src/revault_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
make_relative/2,
copy/2,
tmp/0, tmp/1, extension/2,
find_hashes/2,
find_hashes/2, size/1,
read_range/3,
multipart_init/3, multipart_update/6, multipart_final/4,
%% wrappers to file module
delete/1, consult/1, read_file/1, ensure_dir/1, is_regular/1,
write_file/2, write_file/3, rename/2]).

-type hash() :: binary().
-export_type([hash/0]).
-type multipart_state() :: {state,
file:filename(),
non_neg_integer(), pos_integer(),
hash(), term()}.
-export_type([hash/0, multipart_state/0]).

%% @doc takes a file and computes a hash for it as used to track changes
%% in ReVault. This hash is not guaranteed to be stable, but at this time
Expand Down Expand Up @@ -93,6 +99,43 @@ extension(Path, Ext) when is_binary(Path) ->
find_hashes(Dir, Pred) ->
(mod()):find_hashes(Dir, Pred).

-spec size(file:filename()) -> {ok, non_neg_integer()} | {error, term()}.
size(Path) ->
(mod()):size(Path).

-spec read_range(Path, Offset, Bytes) -> {ok, binary()} | {error, term()} when
Path :: file:filename(),
Offset :: non_neg_integer(),
Bytes :: pos_integer().
read_range(Path, Offset, Bytes) ->
(mod()):read_range(Path, Offset, Bytes).

-spec multipart_init(Path, PartsTotal, Hash) -> State when
Path :: file:filename(),
PartsTotal :: pos_integer(),
Hash :: hash(),
State :: multipart_state().
multipart_init(Path, PartsTotal, Hash) ->
(mod()):multipart_init(Path, PartsTotal, Hash).

-spec multipart_update(multipart_state(), Path, PartNum, PartsTotal, Hash, binary()) ->
{ok, multipart_state()} when
Path :: file:filename(),
PartNum :: 1..10000,
PartsTotal :: 1..10000,
Hash :: hash().
multipart_update(State={state, Path, PartsSeen, PartsTotal, Hash, _Term},
Path, PartNum, PartsTotal, Hash, Bin) when PartNum =:= PartsSeen+1 ->
(mod()):multipart_update(State, Path, PartNum, PartsTotal, Hash, Bin).

-spec multipart_final(multipart_state(), Path, PartsTotal, Hash) -> ok when
Path :: file:filename(),
PartsTotal :: pos_integer(),
Hash :: hash().
multipart_final(State={state, Path, PartsSeen, PartsTotal, Hash, _Term},
Path, PartsTotal, Hash) when PartsSeen =:= PartsTotal ->
(mod()):multipart_final(State, Path, PartsTotal, Hash).

%%%%%%%%%%%%%%%%%%%%%
%%% FILE WRAPPERS %%%
%%%%%%%%%%%%%%%%%%%%%
Expand Down
71 changes: 70 additions & 1 deletion apps/revault/src/revault_file_disk.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
-module(revault_file_disk).

-include_lib("kernel/include/file.hrl").

-export([hash/1, hash_bin/1,
copy/2,
tmp/0, tmp/1,
find_hashes/2,
find_hashes/2, size/1,
read_range/3,
multipart_init/3, multipart_update/6, multipart_final/4,
%% wrappers to file module
delete/1, consult/1, read_file/1, ensure_dir/1, is_regular/1,
write_file/2, write_file/3, rename/2]).
Expand All @@ -15,6 +20,7 @@
%% it is SHA256.
-spec hash(file:filename_all()) -> hash().
hash(Path) ->
%% TODO: support large files on this too
{ok, Bin} = read_file(Path),
hash_bin(Bin).

Expand Down Expand Up @@ -82,6 +88,69 @@ find_hashes(Dir, Pred) ->
[]
).

-spec size(file:filename()) -> {ok, non_neg_integer()} | {error, term()}.
size(Path) ->
maybe
{ok, Rec} ?= file:read_file_info(Path),
{ok, Rec#file_info.size}
end.

-spec read_range(Path, Offset, Bytes) -> {ok, binary()} | {error, term()} when
Path :: file:filename(),
Offset :: non_neg_integer(),
Bytes :: pos_integer().
read_range(Path, Offset, Bytes) ->
maybe
{ok, Fd} ?= file:open(Path, [read, raw, binary]),
Res = file:pread(Fd, Offset, Bytes),
file:close(Fd),
case Res of
{ok, Bin} when byte_size(Bin) =:= Bytes -> Res;
{error, _} -> Res;
{ok, Bin} when byte_size(Bin) =/= Bytes -> {error, invalid_range};
eof -> {error, invalid_range}
end
end.

-spec multipart_init(Path, PartsTotal, Hash) -> State when
Path :: file:filename(),
PartsTotal :: pos_integer(),
Hash :: revault_file:hash(),
State :: revault_file:multipart_state().
multipart_init(Path, PartsTotal, Hash) ->
{ok, Fd} = file:open(Path, [raw, write]),
HashState = crypto:hash_init(sha256),
{state, Path, 0, PartsTotal, Hash,
{Fd, HashState}}.

-spec multipart_update(State, Path, PartNum, PartsTotal, Hash, binary()) ->
{ok, revault_file:multipart_state()} when
State :: revault_file:multipart_state(),
Path :: file:filename(),
PartNum :: 1..10000,
PartsTotal :: 1..10000,
Hash :: revault_file:hash().
multipart_update({state, Path, _PartsSeen, PartsTotal, Hash,
{Fd, HashState}},
Path, PartNum, PartsTotal, Hash, Bin) ->
ok = file:write(Fd, Bin),
NewHashState = crypto:hash_update(HashState, Bin),
{ok, {state, Path, PartNum, PartsTotal, Hash, {Fd, NewHashState}}}.

-spec multipart_final(State, Path, PartsTotal, Hash) -> ok when
State :: revault_file:multipart_state(),
Path :: file:filename(),
PartsTotal :: pos_integer(),
Hash :: revault_file:hash().
multipart_final({state, Path, _PartsSeen, PartsTotal, Hash,
{Fd, HashState}},
Path, PartsTotal, Hash) ->
ok = file:close(Fd),
case crypto:hash_final(HashState) of
Hash -> ok;
_BadHash -> error(invalid_hash)
end.

%%%%%%%%%%%%%%%%%%%%%
%%% FILE WRAPPERS %%%
%%%%%%%%%%%%%%%%%%%%%
Expand Down
Loading
Loading