Fixes and optimizations

Added FragManager so each frag block only gets decompressed once
Returned to C for decompression (only zstd stateless ATM)
This commit is contained in:
Caleb Gardner
2026-05-22 06:09:06 -05:00
parent 84a9cf17b9
commit 2b49395ab2
10 changed files with 163 additions and 100 deletions
+16 -45
View File
@@ -10,7 +10,7 @@ const OffsetFile = @import("offset_file.zig");
// const SharedCache = @import("shared_cache.zig");
pub const Error = error{OutOfMemory} || Io.File.Reader.SeekError || Io.Writer.Error;
pub const Error = error{OutOfMemory} || Io.File.Reader.SeekError || Io.Writer.Error || Io.File.Writer.Error;
const DataExtractor = @This();
@@ -23,7 +23,7 @@ start: u64,
blocks: []BlockSize,
frag_offset: u32 = 0,
frag_entry: ?FragEntry = null,
frag_block: ?[]u8 = null,
err: ?Error = null,
@@ -38,14 +38,14 @@ pub fn init(fil: OffsetFile, decomp: *const Decompressor, block_size: u32, file_
.blocks = blocks,
};
}
pub fn addFrag(self: *DataExtractor, frag_offset: u32, entry: FragEntry) void {
pub fn addFrag(self: *DataExtractor, frag_offset: u32, block: []u8) void {
self.frag_offset = frag_offset;
self.frag_entry = entry;
self.frag_block = block;
}
fn numBlocks(self: DataExtractor) usize {
var num = self.blocks.len;
if (self.frag_entry != null) num += 1;
if (self.frag_block != null) num += 1;
return num;
}
@@ -60,8 +60,8 @@ pub fn extractAsync(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil:
group.async(io, blockThread, .{ self, alloc, io, fil, read_offset, idx, &err });
read_offset += self.blocks[idx].size;
}
if (self.frag_entry != null)
group.async(io, fragThread, .{ self, alloc, io, fil, &err });
if (self.frag_block != null)
group.async(io, fragThread, .{ self, io, fil, &err });
group.await(io) catch |cancel| return err orelse cancel;
}
@@ -126,55 +126,26 @@ fn blockThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.Fi
};
}
}
fn fragThread(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File, ret_err: *?Error) Io.Cancelable!void {
const frag = self.frag_entry.?;
fn fragThread(self: DataExtractor, io: Io, fil: Io.File, ret_err: *?Error) Io.Cancelable!void {
const cur_block_size = self.file_size % self.block_size;
var wrt = fil.writer(io, &[0]u8{});
var write_buf: [10 * 1024]u8 = undefined;
var wrt = fil.writer(io, &write_buf);
wrt.seekTo(self.blocks.len * self.block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
defer wrt.flush() catch {};
var rdr = self.fil.readerAt(io, frag.start, &[0]u8{}) catch |err| {
wrt.interface.writeAll(self.frag_block.?[self.frag_offset .. self.frag_offset + cur_block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
if (frag.size.uncompressed) {
rdr.interface.discardAll(self.frag_offset) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
rdr.interface.streamExact(&wrt.interface, cur_block_size) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
return;
} else {
@branchHint(.likely);
var cache: [1024 * 1024]u8 = undefined;
var tmp: [1024 * 1024]u8 = undefined;
rdr.interface.readSliceAll(cache[0..frag.size.size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
_ = self.decomp.Decompress(alloc, cache[0..frag.size.size], tmp[0..self.block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
wrt.interface.writeAll(tmp[self.frag_offset .. self.frag_offset + cur_block_size]) catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
}
wrt.flush() catch |err| {
ret_err.* = err;
if (err == error.Canceled) io.recancel();
return Io.Cancelable.Canceled;
};
}