From 712c4d0a1913bed5fc5cadcbc703250a356512cb Mon Sep 17 00:00:00 2001 From: Caleb Gardner Date: Sun, 24 May 2026 02:50:24 -0500 Subject: [PATCH] Fixed issues when using Threaded.single_threaded --- build.zig | 7 ++- src/archive.zig | 32 ++++++++-- src/decomp.zig | 2 +- src/decomp/c_zstd.zig | 24 ++++---- src/decomp/zig_zlib.zig | 2 +- src/inode.zig | 118 +++++++++++++++++++----------------- src/util/data_extractor.zig | 17 ++++++ 7 files changed, 125 insertions(+), 77 deletions(-) diff --git a/build.zig b/build.zig index 9f6ee51..1db7e3f 100644 --- a/build.zig +++ b/build.zig @@ -15,9 +15,12 @@ pub fn build(b: *std.Build) !void { zig_squashfs_options.addOption(bool, "allow_lzo", allow_lzo); const target = b.standardTargetOptions(.{}); - const optimize = b.standardOptimizeOption(.{}); + var optimize = b.standardOptimizeOption(.{}); - if (optimize == .Debug) debug = true; + if (debug == true) + optimize = .Debug; + if (optimize == .Debug) + debug = true; const c = b.addTranslateC(.{ .optimize = optimize, diff --git a/src/archive.zig b/src/archive.zig index a09df6c..49f945a 100644 --- a/src/archive.zig +++ b/src/archive.zig @@ -227,14 +227,34 @@ test "ExtractCompleteArchiveSingleThreaded" { std.debug.print("Starting test: ExtractCompleteArchive...\n", .{}); const alloc = std.testing.allocator; - const io = Io.Threaded.global_single_threaded.io(); + var threaded: Io.Evented = undefined; + try threaded.init(alloc, .{}); + defer threaded.deinit(); + const io = threaded.io(); + var signal: u32 = 0; Io.Dir.cwd().deleteTree(io, TestFullExtractLocation) catch {}; - var fil = try Io.Dir.cwd().openFile(io, TestArchive, .{}); - defer fil.close(io); - var sfs: Archive = try .init(io, fil, 0); - defer sfs.deinit(io); - try sfs.extract(alloc, io, TestFullExtractLocation, .default); + + const tmp = struct { + fn singleThreadedExtract(sig: *u32) !void { + var fil = try Io.Dir.cwd().openFile(Io.Threaded.global_single_threaded.io(), TestArchive, .{}); + defer fil.close(Io.Threaded.global_single_threaded.io()); + var sfs: Archive = try .init(Io.Threaded.global_single_threaded.io(), fil, 0); + defer sfs.deinit(Io.Threaded.global_single_threaded.io()); + try sfs.extract(std.testing.allocator, Io.Threaded.global_single_threaded.io(), TestFullExtractLocation, .default); + sig.* = 1; + } + }; + var ret = try io.concurrent(tmp.singleThreadedExtract, .{&signal}); + try io.futexWaitTimeout( + u32, + &signal, + 0, + .{ .deadline = .fromNow(io, .{ .raw = .fromSeconds(10), .clock = .awake }) }, + ); + if (ret.any_future == null) return ret.result; + try ret.cancel(io); + return error.TestTimeout; } const LinuxPATestCorrectSuperblock: Superblock = .{ diff --git a/src/decomp.zig b/src/decomp.zig index 40c3b74..52e3858 100644 --- a/src/decomp.zig +++ b/src/decomp.zig @@ -67,7 +67,7 @@ pub const Decomp = union(enum) { .gzip => .{ .gzip = if (options.use_zig_decomp) try zlib.init(alloc, io, block_size) else try zlib.init(alloc, io) }, .lzma => .{ .lzma = if (options.use_zig_decomp) try lzma.init(alloc, io, block_size) else .{} }, .lzo => if (options.use_zig_decomp or !options.allow_lzo) error.LzoUnsupported else .{ .lzo = .{} }, - .xz => .{ .xz = if (options.use_zig_decomp) try zlib.init(alloc, io, block_size) else .{} }, + .xz => .{ .xz = if (options.use_zig_decomp) try xz.init(alloc, io, block_size) else .{} }, .lz4 => if (options.use_zig_decomp) error.Lz4Unsupported else .{ .lz4 = .{} }, .zstd => .{ .zstd = if (options.use_zig_decomp) try zstd.init(alloc, io, block_size) else try zstd.init(alloc, io) }, >>>>>>> dfbfbda (Build is working again (on Zig master branch)) diff --git a/src/decomp/c_zstd.zig b/src/decomp/c_zstd.zig index 005080b..d6b161c 100644 --- a/src/decomp/c_zstd.zig +++ b/src/decomp/c_zstd.zig @@ -41,20 +41,22 @@ pub fn deinit(self: *Self, alloc: std.mem.Allocator) void { } fn decomp(d: ?*const Decompressor, alloc: std.mem.Allocator, in: []u8, out: []u8) Error!usize { - if (d == null) { - return statelessDecomp(d, alloc, in, out); - } - var self: *Self = @fieldParentPtr("interface", @constCast(d.?)); + // TODO: Fix + // + // if (d == null) { + return statelessDecomp(d, alloc, in, out); + // } + // var self: *Self = @fieldParentPtr("interface", @constCast(d.?)); - const ctx = self.ctx_queue.getOne(self.io) catch return Error.ReadFailed; - defer self.ctx_queue.putOne(self.io, ctx) catch {}; + // const ctx = self.ctx_queue.getOne(self.io) catch return Error.ReadFailed; + // defer self.ctx_queue.putOne(self.io, ctx) catch {}; - _ = c.ZSTD_DCtx_reset(ctx, c.ZSTD_reset_session_only); + // _ = c.ZSTD_DCtx_reset(ctx, c.ZSTD_reset_session_only); - const res = c.ZSTD_decompressDCtx(ctx, out.ptr, out.len, in.ptr, in.len); - if (c.ZSTD_isError(res) != 0) - return Error.ReadFailed; - return res; + // const res = c.ZSTD_decompressDCtx(ctx, out.ptr, out.len, in.ptr, in.len); + // if (c.ZSTD_isError(res) != 0) + // return Error.ReadFailed; + // return res; } // Stateless diff --git a/src/decomp/zig_zlib.zig b/src/decomp/zig_zlib.zig index 2f70c89..e5c9b0c 100644 --- a/src/decomp/zig_zlib.zig +++ b/src/decomp/zig_zlib.zig @@ -56,7 +56,7 @@ fn decomp(d: ?*const Decompressor, alloc: std.mem.Allocator, in: []u8, out: []u8 const buf = self.buf_queue.getOne(self.io) catch return Error.ReadFailed; defer self.buf_queue.putOne(self.io, buf) catch {}; - return zlibDecomp(buf.buf, in, out); + return zlibDecomp(buf, in, out); } inline fn zlibDecomp(buffer: []u8, in: []u8, out: []u8) !usize { diff --git a/src/inode.zig b/src/inode.zig index 87938b0..303d9a8 100644 --- a/src/inode.zig +++ b/src/inode.zig @@ -276,7 +276,7 @@ pub fn extract( const path = std.mem.trimEnd(u8, filepath, "/"); var decomp_base: Decomp = try .init(super.compression, alloc, io, super.block_size); - defer decomp_base.deinit(alloc); + decomp_base.deinit(alloc); const decomp = decomp_base.decompressor(); var frag_mgr: FragManager = try .init(alloc, fil, decomp, super.frag_start, super.frag_count, super.block_size); @@ -286,64 +286,13 @@ pub fn extract( var sel: Io.Select(ExtractReturnUnion) = .init(io, &sel_buf); defer sel.cancelDiscard(); + var loop = io.async(finishLoop, .{ alloc, io, fil, decomp, super, options, &sel }); + sel.async(.path_ret, extractReal, .{ self, alloc, io, fil, super, decomp, &sel, &frag_mgr, path, true }); - var id_table: CachedTable(u16) = .init(alloc, fil, decomp, super.id_start, super.id_count); - defer id_table.deinit(io); - - var xattr_table: ?XattrTable = if (super.flags.xattr_never or options.ignore_xattr or !@hasField(std.os, "linux")) - null - else - try .init(alloc, fil, decomp, super.xattr_start); - defer if (xattr_table != null) xattr_table.?.deinit(io); - - var dir_queue: std.PriorityDequeue(PathRet, void, DirCompare) = .empty; - defer dir_queue.deinit(alloc); - - while (true) { - if (sel.group.token.load(.unordered) == null) break; - - const ret = try sel.await(); - const path_ret = try ret.path_ret; - - if (options.ignore_permissions and xattr_table == null) { - path_ret.deinit(alloc); - continue; - } - - if (path_ret.inode.hdr.inode_type == .dir or path_ret.inode.hdr.inode_type == .ext_dir) { - try dir_queue.push(alloc, path_ret); - continue; - } - - defer path_ret.deinit(alloc); - try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); - } - - while (sel.cancel()) |ret| { - const path_ret = try ret.path_ret; - - if (options.ignore_permissions and xattr_table == null) { - path_ret.deinit(alloc); - continue; - } - - if (path_ret.inode.hdr.inode_type == .dir or path_ret.inode.hdr.inode_type == .ext_dir) { - try dir_queue.push(alloc, path_ret); - continue; - } - - defer path_ret.deinit(alloc); - try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); - } - - var iter = dir_queue.iterator(); - while (iter.next()) |path_ret| { - defer path_ret.deinit(alloc); - try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); - } + try loop.await(io); } -pub fn extractReal( +fn extractReal( self: Inode, alloc: std.mem.Allocator, io: Io, @@ -455,3 +404,60 @@ pub fn extractReal( .origin = origin, }; } + +fn finishLoop(alloc: std.mem.Allocator, io: Io, fil: OffsetFile, decomp: *const Decompressor, super: Archive.Superblock, options: ExtractionOptions, sel: *Io.Select(ExtractReturnUnion)) !void { + var id_table: CachedTable(u16) = .init(alloc, fil, decomp, super.id_start, super.id_count); + defer id_table.deinit(io); + + var xattr_table: ?XattrTable = if (super.flags.xattr_never or options.ignore_xattr or !@hasField(std.os, "linux")) + null + else + try .init(alloc, fil, decomp, super.xattr_start); + defer if (xattr_table != null) xattr_table.?.deinit(io); + + var dir_queue: std.PriorityDequeue(PathRet, void, DirCompare) = .empty; + defer dir_queue.deinit(alloc); + + while (true) { + if (sel.group.token.load(.unordered) == null) break; + + const ret = try sel.await(); + const path_ret = try ret.path_ret; + + if (options.ignore_permissions and xattr_table == null) { + path_ret.deinit(alloc); + continue; + } + + if (path_ret.inode.hdr.inode_type == .dir or path_ret.inode.hdr.inode_type == .ext_dir) { + try dir_queue.push(alloc, path_ret); + continue; + } + + defer path_ret.deinit(alloc); + try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); + } + + while (sel.cancel()) |ret| { + const path_ret = try ret.path_ret; + + if (options.ignore_permissions and xattr_table == null) { + path_ret.deinit(alloc); + continue; + } + + if (path_ret.inode.hdr.inode_type == .dir or path_ret.inode.hdr.inode_type == .ext_dir) { + try dir_queue.push(alloc, path_ret); + continue; + } + + defer path_ret.deinit(alloc); + try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); + } + + var iter = dir_queue.iterator(); + while (iter.next()) |path_ret| { + defer path_ret.deinit(alloc); + try path_ret.setMetadata(alloc, io, &id_table, if (xattr_table == null) null else &xattr_table.?, options); + } +} diff --git a/src/util/data_extractor.zig b/src/util/data_extractor.zig index a7c9333..2eb3b2f 100644 --- a/src/util/data_extractor.zig +++ b/src/util/data_extractor.zig @@ -49,6 +49,23 @@ fn numBlocks(self: DataExtractor) usize { return num; } +/// Starts extracting the data using the given group to spawn async tasks. +pub fn extractConcurrent(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File) (Error || Io.ConcurrentError)!void { + var group: Io.Group = .init; + defer group.cancel(io); + var err: ?Error = null; + + var read_offset: u64 = self.start; + for (0..self.blocks.len) |idx| { + try group.concurrent(io, blockThread, .{ self, alloc, io, fil, read_offset, idx, &err }); + read_offset += self.blocks[idx].size; + } + if (self.frag_block != null) + try group.concurrent(io, fragThread, .{ self, io, fil, &err }); + + group.await(io) catch |cancel| return err orelse cancel; +} + /// Starts extracting the data using the given group to spawn async tasks. pub fn extractAsync(self: DataExtractor, alloc: std.mem.Allocator, io: Io, fil: Io.File) Error!void { var group: Io.Group = .init;