Re-doing extraction with learning about Io

This commit is contained in:
Caleb Gardner
2026-05-17 12:32:58 -05:00
parent 10e9b66ac6
commit 69ce562b6c
2 changed files with 183 additions and 333 deletions
+88 -30
View File
@@ -14,7 +14,6 @@ const DataExtractor = @This();
fil: OffsetFile,
decomp: *const Decompressor,
cache: *Io.Queue([1024 * 1024]u8),
block_size: u32,
file_size: u64,
@@ -24,11 +23,12 @@ blocks: []BlockSize,
frag_offset: u32 = 0,
frag_entry: ?FragEntry = null,
pub fn init(fil: OffsetFile, decomp: *const Decompressor, cache: *Io.Queue([1024 * 1024]u8), block_size: u32, file_size: u64, data_start: u64, blocks: []BlockSize) DataExtractor {
err: ?anyerror = null,
pub fn init(fil: OffsetFile, decomp: *const Decompressor, block_size: u32, file_size: u64, data_start: u64, blocks: []BlockSize) DataExtractor {
return .{
.fil = fil,
.decomp = decomp,
.cache = cache,
.block_size = block_size,
.file_size = file_size,
@@ -48,17 +48,23 @@ fn numBlocks(self: DataExtractor) usize {
}
/// Starts extracting the data using the given group to spawn async tasks.
pub fn extractAsync(self: DataExtractor, alloc: std.mem.Allocator, io: Io, group: *Io.Group, fil: Io.File) void {
pub fn extractAsync(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File) !void {
var group: Io.Group = .init;
defer group.cancel(io);
var err: ?anyerror = null;
var read_offset: u64 = self.start;
for (0..self.blocks.len) |idx| {
group.async(io, blockThread, .{ self, alloc, io, fil, read_offset, idx });
group.async(io, blockThread, .{ self, alloc, io, fil, read_offset, idx, &err });
read_offset += self.blocks[idx].size;
}
if (self.frag_entry != null)
group.async(io, fragThread, .{ self, alloc, io, fil });
group.async(io, fragThread, .{ self, alloc, io, fil, &err });
group.await(io) catch |cancel| return err orelse cancel;
}
fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File, read_offset: u64, idx: usize) !void {
fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File, read_offset: u64, idx: usize, ret_err: *?anyerror) Io.Cancelable!void {
const block = self.blocks[idx];
const cur_block_size = if (idx == self.numBlocks() - 1)
@@ -67,54 +73,106 @@ fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.Fi
self.block_size;
var wrt = fil.writer(io, &[0]u8{});
try wrt.seekTo(self.block_size * idx);
wrt.seekTo(self.block_size * idx) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
defer wrt.flush() catch {};
if (block.size == 0) {
try wrt.interface.splatByteAll(0, cur_block_size);
wrt.interface.splatByteAll(0, cur_block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
return;
}
var rdr = try self.fil.readerAt(io, read_offset, &[0]u8{});
var rdr = self.fil.readerAt(io, read_offset, &[0]u8{}) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
if (block.uncompressed) {
try rdr.interface.streamExact(&wrt.interface, cur_block_size);
rdr.interface.streamExact(&wrt.interface, cur_block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
return;
} else {
@branchHint(.likely);
var cache = try self.cache.getOne(io);
defer self.cache.putOne(io, cache) catch {};
var tmp = try self.cache.getOne(io);
defer self.cache.putOne(io, tmp) catch {};
var cache: [1024 * 1024]u8 = undefined;
var tmp: [1024 * 1024]u8 = undefined;
try rdr.interface.readSliceAll(cache[0..block.size]);
_ = try self.decomp.Decompress(alloc, cache[0..block.size], tmp[0..cur_block_size]);
try wrt.interface.writeAll(tmp[0..cur_block_size]);
rdr.interface.readSliceAll(cache[0..block.size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
_ = self.decomp.Decompress(alloc, cache[0..block.size], tmp[0..cur_block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
wrt.interface.writeAll(tmp[0..cur_block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
}
}
fn fragThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File) !void {
fn fragThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File, ret_err: *?anyerror) Io.Cancelable!void {
const frag = self.frag_entry.?;
const cur_block_size = self.file_size % self.block_size;
var wrt = fil.writer(io, &[0]u8{});
try wrt.seekTo(self.blocks.len * self.block_size);
wrt.seekTo(self.blocks.len * self.block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
defer wrt.flush() catch {};
var rdr = try self.fil.readerAt(io, frag.start, &[0]u8{});
var rdr = self.fil.readerAt(io, frag.start, &[0]u8{}) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
if (frag.size.uncompressed) {
try rdr.interface.discardAll(self.frag_offset);
try rdr.interface.streamExact(&wrt.interface, cur_block_size);
rdr.interface.discardAll(self.frag_offset) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
rdr.interface.streamExact(&wrt.interface, cur_block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
return;
} else {
@branchHint(.likely);
var cache = try self.cache.getOne(io);
defer self.cache.putOne(io, cache) catch {};
var tmp = try self.cache.getOne(io);
defer self.cache.putOne(io, tmp) catch {};
var cache: [1024 * 1024]u8 = undefined;
var tmp: [1024 * 1024]u8 = undefined;
try rdr.interface.readSliceAll(cache[0..frag.size.size]);
_ = try self.decomp.Decompress(alloc, cache[0..frag.size.size], tmp[0..self.block_size]);
try wrt.interface.writeAll(tmp[0..cur_block_size]);
rdr.interface.readSliceAll(cache[0..frag.size.size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
_ = self.decomp.Decompress(alloc, cache[0..frag.size.size], tmp[0..self.block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
wrt.interface.writeAll(tmp[0..cur_block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
}
}