From 23bb19644b169e6ed74d74bafdd1c8b3337a8490 Mon Sep 17 00:00:00 2001 From: "Caleb J. Gardner" Date: Thu, 15 Jan 2026 11:22:02 -0600 Subject: [PATCH] Finished (?) decompression, metadata reader, and Table. --- src/decomp.zig | 193 ++++++++++++++++++++++++++++++++++++++++++ src/table.zig | 7 +- src/util/metadata.zig | 65 +++++++++++++- 3 files changed, 259 insertions(+), 6 deletions(-) diff --git a/src/decomp.zig b/src/decomp.zig index 13b618b..07d0f77 100644 --- a/src/decomp.zig +++ b/src/decomp.zig @@ -1,5 +1,19 @@ const std = @import("std"); const compress = std.compress; +const Reader = std.Io.Reader; +const Thread = std.Thread; +const Futex = Thread.Futex; +const Mutex = Thread.Mutex; +const Condition = Thread.Condition; +const Node = std.DoublyLinkedList.Node; + +const Atomic = std.atomic.Value(u32); + +const DecompError = error{ + ThreadClosed, + LzoUnsupported, + Lz4Unsupported, +}; pub const CompressionType = enum(u16) { gzig = 1, @@ -9,3 +23,182 @@ pub const CompressionType = enum(u16) { lz4, zstd, }; + +pub const DecompThread = struct { + mgr: *DecompMgr, + + /// Current thread status & signal value via Futex. + /// 0 - Unstarted, 1 - Waiting, 2 - Working, 3 - Closed, + status: Atomic = .{ .raw = 0 }, + thr: Thread = undefined, + node: Node = .{}, + buf: []u8, + + dat: []u8 = &[0]u8{}, + rdr: ?*Reader = null, + res: []u8 = &[0]u8{}, + res_size: anyerror!usize = 0, + + pub fn init(mgr: *DecompMgr) !DecompThread { + return .{ + .mgr = mgr, + .buf = switch (mgr.comp_type) { + .gzip => try mgr.alloc.alloc(u8, compress.flate.max_window_len), + .zstd => try mgr.alloc.alloc(u8, compress.zstd.default_window_len), + .lzma, .xz => &[0]u8{}, + else => unreachable, + }, + }; + } + + pub fn close(self: *DecompThread) void { + if (self.status.raw == 0) return; + while (self.status.raw == 2) Futex.wait(&self.status, 2); + &self.status.store(3, .release); + Futex.wake(&self.status, 1); + self.thr.join(); + } + + pub fn submitData(self: *DecompThread, dat: []u8, res: []u8) anyerror!usize { + if (self.status.raw == 3) return DecompError.ThreadClosed; + if (self.status.raw == 0) { + self.status.raw = 1; + self.thr = try .spawn(.{}, thread, .{self}); + } + self.dat = dat; + defer self.dat = &[0]u8{}; + self.res = res; + while (self.status.raw == 2) Futex.wait(&self.status, 2); + return self.res_size; + } + pub fn submitReader(self: *DecompThread, rdr: *Reader, res: []u8) anyerror!usize { + if (self.status.raw == 3) return DecompError.ThreadClosed; + if (self.status.raw == 0) { + self.status.raw = 1; + self.thr = try .spawn(.{}, thread, .{self}); + } + self.rdr = rdr; + defer self.rdr = null; + self.res = res; + while (self.status.raw == 2) Futex.wait(&self.status, 2); + return self.res_size; + } + + pub fn thread(self: *DecompThread) void { + const comp_type = self.mgr.comp_type; + while (self.status.raw != 3) { + while (self.status.raw == 1) Futex.wait(&self.status, 1); + if (self.status.raw == 3) return; + var rdr: *Reader = if (self.rdr != null) self.rdr.? else &Reader.fixed(self.dat); + self.res_size = blk: switch (comp_type) { + .gzip => { + var decomp_rdr = compress.flate.Decompress.init(rdr, .zlib, self.buf); + break :blk decomp_rdr.reader.readSliceAll(self.res); + }, + .lzma => { + var decomp_rdr = compress.lzma.decompress(self.mgr.alloc, rdr.adaptToOldInterface()) catch |err| { + break :blk err; + }; + break :blk decomp_rdr.read(self.res); + }, + .xz => { + var decomp_rdr = compress.xz.decompress(self.mgr.alloc, rdr.adaptToOldInterface()) catch |err| { + break :blk err; + }; + break :blk decomp_rdr.read(self.res); + }, + .zstd => { + var decomp_rdr = compress.zstd.Decompress.init(rdr, self.buf, .{}); + break :blk decomp_rdr.reader.readSliceAll(self.res); + }, + else => unreachable, + }; + const orig = self.status.swap(1, .release); + Futex.wake(&self.status); + if (orig == 3) return; + } + } +}; + +const DecompMgr = @This(); + +alloc: std.mem.Allocator, +comp_type: CompressionType, + +threads: []DecompThread, +queue: std.DoublyLinkedList = .{}, +mut: Mutex = .{}, +cond: Condition = .{}, +to_start: usize, + +pub fn init(alloc: std.mem.Allocator, comp_type: CompressionType, threads: usize) !DecompMgr { + return switch (comp_type) { + .lzo => DecompError.LzoUnsupported, + .lz4 => DecompError.Lz4Unsupported, + else => .{ + .alloc = alloc, + .comp_type = comp_type, + .threads = try alloc.alloc(threads), + .to_start = threads, + }, + }; +} + +pub fn deinit(self: DecompMgr) void { + self.alloc.free(self.threads); +} + +pub fn decompSlice(self: *DecompMgr, dat: []u8, res: []u8) !usize { + self.mut.lock(); + var thr: *DecompThread = undefined; + var node = self.queue.popFirst(); + if (self.node != null) { + self.mut.unlock(); + thr = @fieldParentPtr("node", node.?); + } else blk: { + defer self.mut.unlock(); + if (self.to_start > 0) { + self.threads[self.to_start - 1] = .init(self); + thr = &self.threads[self.to_start - 1]; + self.to_start -= 1; + break :blk; + } + while (node == null) { + self.cond.wait(&self.mut); + node = self.queue.popFirst(); + } + thr = @fieldParentPtr("node", node.?); + } + defer { + self.queue.append(&thr.node); + self.cond.signal(); + } + return thr.submitData(dat, res); +} +pub fn decompReader(self: *DecompMgr, rdr: *Reader, res: []u8) !usize { + self.mut.lock(); + var thr: *DecompThread = undefined; + var node = self.queue.popFirst(); + if (self.node != null) { + self.mut.unlock(); + thr = @fieldParentPtr("node", node.?); + } else blk: { + defer self.mut.unlock(); + if (self.to_start > 0) { + self.threads[self.to_start - 1] = .init(self); + thr = &self.threads[self.to_start - 1]; + self.to_start -= 1; + break :blk; + } + while (node == null) { + self.cond.wait(&self.mut); + node = self.queue.popFirst(); + } + thr = @fieldParentPtr("node", node.?); + } + defer { + self.queue.append(&thr.node); + self.cond.signal(); + } + return thr.submitReader(rdr, res); +} diff --git a/src/table.zig b/src/table.zig index c8b6264..8ad3092 100644 --- a/src/table.zig +++ b/src/table.zig @@ -38,6 +38,10 @@ pub fn Table(T: anytype) type { } pub fn deinit(self: *This) void { + var iter = self.tab.valueIterator(); + for (iter.next()) |s| { + self.alloc.free(s); + } self.tab.deinit(); } @@ -65,8 +69,7 @@ pub fn Table(T: anytype) type { rdr = try self.fil.readerAt(offset, &[0]u8{}); var meta: MetadataReader = .init(&rdr.interface, self.decomp); try meta.interface.readSliceAll(@ptrCast(slice)); - //TODO: read & decompress block. - self.tab.put(block_num, slice); + try self.tab.put(block_num, slice); } }; } diff --git a/src/util/metadata.zig b/src/util/metadata.zig index 97a8553..ac10431 100644 --- a/src/util/metadata.zig +++ b/src/util/metadata.zig @@ -6,8 +6,14 @@ const StreamError = std.Io.Reader.StreamError; const DecompMgr = @import("../decomp.zig"); +const BlockHeader = packed struct { + uncompressed: bool, + size: u15, +}; + const This = @This(); +alloc: std.mem.Allocator, rdr: Reader, decomp: *DecompMgr, @@ -16,8 +22,9 @@ buf: [8192]u8 = undefined, interface: Reader, err: anyerror = 0, -pub fn init(rdr: Reader, decomp: *DecompMgr) This { +pub fn init(alloc: std.mem.Allocator, rdr: Reader, decomp: *DecompMgr) This { return .{ + .alloc = alloc, .rdr = rdr, .decomp = decomp, .interface = .{ @@ -26,14 +33,64 @@ pub fn init(rdr: Reader, decomp: *DecompMgr) This { .seek = 0, .vtable = &.{ .stream = stream, + .discard = discard, + .readVec = readVec, }, }, }; } -fn advance(self: *This) !void {} +fn advance(self: *This) !void { + self.interface.seek = 0; + var hdr: BlockHeader = undefined; + try self.rdr.readSliceAll(@ptrCast(&hdr)); + if (hdr.uncompressed) { + try self.rdr.readSliceAll(&self.buf[0..hdr.size]); + self.interface.end = hdr.size; + self.interface.buffer = self.buf[0..hdr.size]; + return; + } + var limit_rdr = self.rdr.limited(@enumFromInt(hdr.size), &[0]u8); + self.interface.end = try self.decomp.decompReader(&limit_rdr.interface, &self.buf); + self.interface.buffer = self.buf[0..self.interface.end]; +} fn stream(rdr: *Reader, wrt: *Writer, limit: Limit) StreamError!usize { - const this: *This = @fieldParentPtr("interface", rdr); - if (rdr.end == rdr.seek) try this.advance(); + const self: *This = @fieldParentPtr("interface", rdr); + if (rdr.end == rdr.seek) self.advance() catch |err| { + self.err = err; + return StreamError.ReadFailed; + }; + if (@intFromEnum(limit) == 0) return 0; + const to_write = @min(rdr.end - rdr.seek, @intFromEnum(limit)); + const wrote = try wrt.write(self.buf[rdr.seek .. rdr.seek + to_write]); + self.interface.seek += wrote; + return wrote; +} +fn discard(rdr: *Reader, limit: Limit) error{ EndOfStream, ReadFailed }!usize { + const self: *This = @fieldParentPtr("interface", rdr); + if (rdr.end == rdr.seek) self.advance() catch |err| { + self.err = err; + return StreamError.ReadFailed; + }; + if (@intFromEnum(limit) == 0) return 0; + const to_skip = @min(rdr.end - rdr.seek, @intFromEnum(limit)); + rdr.seek += to_skip; + return to_skip; +} +fn readVec(rdr: *Reader, vec: [][]u8) error{ EndOfStream, ReadFailed }!usize { + const self: *This = @fieldParentPtr("interface", rdr); + if (rdr.end == rdr.seek) self.advance() catch |err| { + self.err = err; + return StreamError.ReadFailed; + }; + var cur_red: usize = 0; + for (vec) |s| { + const to_copy: usize = @min(rdr.end - rdr.seek, s.len); + @memcpy(s[0..to_copy], self.buf[rdr.seek .. rdr.seek + to_copy]); + rdr.seek += to_copy; + cur_red += to_copy; + if (rdr.end == rdr.seek) break; + } + return cur_red; }