Some fixes

This commit is contained in:
Caleb J. Gardner
2026-01-17 05:30:19 -06:00
parent f3fb8a128f
commit 7aed59b5b1
9 changed files with 70 additions and 48 deletions
+15 -12
View File
@@ -16,7 +16,7 @@ const DecompError = error{
};
pub const CompressionType = enum(u16) {
gzig = 1,
gzip = 1,
lzma,
lzo,
xz,
@@ -44,7 +44,7 @@ pub const DecompThread = struct {
.mgr = mgr,
.buf = switch (mgr.comp_type) {
.gzip => try mgr.alloc.alloc(u8, compress.flate.max_window_len),
.zstd => try mgr.alloc.alloc(u8, compress.zstd.default_window_len),
.zstd => try mgr.alloc.alloc(u8, compress.zstd.default_window_len + compress.zstd.block_size_max),
.lzma, .xz => &[0]u8{},
else => unreachable,
},
@@ -62,24 +62,24 @@ pub const DecompThread = struct {
pub fn submitData(self: *DecompThread, dat: []u8, res: []u8) anyerror!usize {
if (self.status.raw == 3) return DecompError.ThreadClosed;
if (self.status.raw == 0) {
self.status.raw = 1;
self.thr = try .spawn(.{}, thread, .{self});
}
self.dat = dat;
defer self.dat = &[0]u8{};
self.res = res;
self.status.raw = 2;
while (self.status.raw == 2) Futex.wait(&self.status, 2);
return self.res_size;
}
pub fn submitReader(self: *DecompThread, rdr: *Reader, res: []u8) anyerror!usize {
if (self.status.raw == 3) return DecompError.ThreadClosed;
if (self.status.raw == 0) {
self.status.raw = 1;
self.thr = try .spawn(.{}, thread, .{self});
}
self.rdr = rdr;
defer self.rdr = null;
self.res = res;
self.status.raw = 2;
while (self.status.raw == 2) Futex.wait(&self.status, 2);
return self.res_size;
}
@@ -89,11 +89,12 @@ pub const DecompThread = struct {
while (self.status.raw != 3) {
while (self.status.raw == 1) Futex.wait(&self.status, 1);
if (self.status.raw == 3) return;
var rdr: *Reader = if (self.rdr != null) self.rdr.? else &Reader.fixed(self.dat);
var dat_rdr: Reader = .fixed(self.dat);
var rdr: *Reader = if (self.rdr != null) self.rdr.? else &dat_rdr;
self.res_size = blk: switch (comp_type) {
.gzip => {
var decomp_rdr = compress.flate.Decompress.init(rdr, .zlib, self.buf);
break :blk decomp_rdr.reader.readSliceEndian(u8, self.res, .little);
break :blk decomp_rdr.reader.readSliceShort(self.res);
},
.lzma => {
var decomp_rdr = compress.lzma.decompress(self.mgr.alloc, rdr.adaptToOldInterface()) catch |err| {
@@ -109,12 +110,12 @@ pub const DecompThread = struct {
},
.zstd => {
var decomp_rdr = compress.zstd.Decompress.init(rdr, self.buf, .{});
break :blk decomp_rdr.reader.readSliceEndian(u8, self.res, .little);
break :blk decomp_rdr.reader.readSliceShort(self.res);
},
else => unreachable,
};
const orig = self.status.swap(1, .release);
Futex.wake(&self.status);
Futex.wake(&self.status, 1);
if (orig == 3) return;
}
}
@@ -124,6 +125,7 @@ const DecompMgr = @This();
alloc: std.mem.Allocator,
comp_type: CompressionType,
block_size: u32,
threads: []DecompThread,
queue: std.DoublyLinkedList = .{},
@@ -131,14 +133,15 @@ mut: Mutex = .{},
cond: Condition = .{},
to_start: usize,
pub fn init(alloc: std.mem.Allocator, comp_type: CompressionType, threads: usize) !DecompMgr {
pub fn init(alloc: std.mem.Allocator, comp_type: CompressionType, block_size: u32, threads: usize) !DecompMgr {
return switch (comp_type) {
.lzo => DecompError.LzoUnsupported,
.lz4 => DecompError.Lz4Unsupported,
else => .{
.alloc = alloc,
.comp_type = comp_type,
.threads = try alloc.alloc(threads),
.block_size = block_size,
.threads = try alloc.alloc(DecompThread, threads),
.to_start = threads,
},
};
@@ -179,13 +182,13 @@ pub fn decompReader(self: *DecompMgr, rdr: *Reader, res: []u8) !usize {
self.mut.lock();
var thr: *DecompThread = undefined;
var node = self.queue.popFirst();
if (self.node != null) {
if (node != null) {
self.mut.unlock();
thr = @fieldParentPtr("node", node.?);
} else blk: {
defer self.mut.unlock();
if (self.to_start > 0) {
self.threads[self.to_start - 1] = .init(self);
self.threads[self.to_start - 1] = try .init(self);
thr = &self.threads[self.to_start - 1];
self.to_start -= 1;
break :blk;