Moved to File.MemoryMap instead of direct file I/O
This commit is contained in:
+16
-61
@@ -10,7 +10,7 @@ const OffsetFile = @import("offset_file.zig");
|
||||
|
||||
// const SharedCache = @import("shared_cache.zig");
|
||||
|
||||
pub const Error = error{OutOfMemory} || Io.File.Reader.SeekError || Io.Writer.Error || Io.File.Writer.Error;
|
||||
pub const Error = Decompressor.Error || Io.File.MemoryMap.CreateError || Io.File.WritePositionalError;
|
||||
|
||||
const DataExtractor = @This();
|
||||
|
||||
@@ -51,22 +51,27 @@ fn numBlocks(self: DataExtractor) usize {
|
||||
|
||||
/// Starts extracting the data using the given group to spawn async tasks.
|
||||
pub fn extractAsync(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File) Error!void {
|
||||
var map = try fil.createMemoryMap(io, .{ .len = self.file_size, .protection = .{ .write = true } });
|
||||
defer map.destroy(io);
|
||||
|
||||
var group: Io.Group = .init;
|
||||
defer group.cancel(io);
|
||||
var err: ?Error = null;
|
||||
|
||||
var read_offset: u64 = self.start;
|
||||
for (0..self.blocks.len) |idx| {
|
||||
group.async(io, blockThread, .{ self, alloc, io, fil, read_offset, idx, &err });
|
||||
group.async(io, blockThread, .{ self, alloc, fil, read_offset, idx, &err });
|
||||
read_offset += self.blocks[idx].size;
|
||||
}
|
||||
if (self.frag_block != null)
|
||||
group.async(io, fragThread, .{ self, io, fil, &err });
|
||||
group.async(io, fragThread, .{ self, map });
|
||||
|
||||
group.await(io) catch |cancel| return err orelse cancel;
|
||||
|
||||
try map.write(io);
|
||||
}
|
||||
|
||||
fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File, read_offset: u64, idx: usize, ret_err: *?Error) Io.Cancelable!void {
|
||||
fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, map: Io.File.MemoryMap, read_offset: u64, idx: usize, ret_err: *?Error) Io.Cancelable!void {
|
||||
const block = self.blocks[idx];
|
||||
|
||||
const cur_block_size = if (idx == self.numBlocks() - 1)
|
||||
@@ -74,78 +79,28 @@ fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.Fi
|
||||
else
|
||||
self.block_size;
|
||||
|
||||
var wrt = fil.writer(io, &[0]u8{});
|
||||
wrt.seekTo(self.block_size * idx) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
defer wrt.flush() catch {};
|
||||
const write_offset = self.block_size * idx;
|
||||
|
||||
if (block.size == 0) {
|
||||
wrt.interface.splatByteAll(0, cur_block_size) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
@memset(map.memory[write_offset .. write_offset + cur_block_size], 0);
|
||||
return;
|
||||
}
|
||||
|
||||
var rdr = self.fil.readerAt(io, read_offset, &[0]u8{}) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
if (block.uncompressed) {
|
||||
rdr.interface.streamExact(&wrt.interface, cur_block_size) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
return;
|
||||
@memcpy(map.memory[write_offset .. write_offset + cur_block_size], self.fil.map.memory[read_offset .. read_offset + cur_block_size]);
|
||||
} else {
|
||||
@branchHint(.likely);
|
||||
|
||||
var cache: [1024 * 1024]u8 = undefined;
|
||||
var tmp: [1024 * 1024]u8 = undefined;
|
||||
|
||||
rdr.interface.readSliceAll(cache[0..block.size]) catch |err| {
|
||||
_ = self.decomp.Decompress(alloc, self.fil.map.memory[read_offset .. read_offset + block.size], map.memory[write_offset .. write_offset + cur_block_size]) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
_ = self.decomp.Decompress(alloc, cache[0..block.size], tmp[0..cur_block_size]) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
wrt.interface.writeAll(tmp[0..cur_block_size]) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
}
|
||||
}
|
||||
fn fragThread(self: DataExtractor, io: Io, fil: Io.File, ret_err: *?Error) Io.Cancelable!void {
|
||||
fn fragThread(self: DataExtractor, map: Io.File.MemoryMap) Io.Cancelable!void {
|
||||
const cur_block_size = self.file_size % self.block_size;
|
||||
|
||||
var write_buf: [10 * 1024]u8 = undefined;
|
||||
var wrt = fil.writer(io, &write_buf);
|
||||
wrt.seekTo(self.blocks.len * self.block_size) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
const write_offset = self.blocks.len * self.block_size;
|
||||
|
||||
wrt.interface.writeAll(self.frag_block.?[self.frag_offset .. self.frag_offset + cur_block_size]) catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
|
||||
wrt.flush() catch |err| {
|
||||
ret_err.* = err;
|
||||
if (err == error.Canceled) io.recancel();
|
||||
return Io.Cancelable.Canceled;
|
||||
};
|
||||
@memcpy(map.memory[write_offset .. write_offset + cur_block_size], self.frag_block.?[self.frag_offset .. self.frag_offset + cur_block_size]);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user