Expirementation with a new way to finish threads. Currently not working.

This commit is contained in:
Caleb J. Gardner
2026-03-04 06:39:44 -06:00
parent 4515610082
commit edfe919c1b
4 changed files with 224 additions and 136 deletions
+2 -3
View File
@@ -30,7 +30,6 @@ Sets the version of `unsquashfs` shown when `--version` is passed.
Most features are present except for the following:
* xattrs are not applied on extraction
* When using Zig decompression libraries then lzo and lz4 compression types are unavailable. I don't _currently_ plan on spending the time to find and validate a library since neither is popular.
* When using C decompression libraries, lzo is not supported by default due to [some issues](#build-considerations). If it's needed it's trivial to fix, but it's easiest to just leave it disabled.
@@ -39,10 +38,10 @@ Most features are present except for the following:
This is some basic observation's I've made about this library's performance when compared to `unsquashfs`. Unless otherwise stated, most observations were made when extracting my test archive (which is fairly small and uses zstd compression) and with `--release=fast`.
* Under ideal circumstances, my library is ~70% slower (.11s vs .18s)
* Mutli-threading on small archives noticably increases extraction times (when using C libraries) (.18s vs .57s). This should theoretically reverse on larger archives with many inodes, but I haven't tested that yet.
* Mutli-threading on small archives significantly increases extraction times (when using C libraries) (.18s vs .57s). This should theoretically reverse on larger archives with many inodes, but I haven't tested that yet.
* Using Zig libraries *significantly* increases decompression time by ~600% under ideal circumstances.
Times:
Example Times:
* *unsquashfs*: .11s
* *C-libs, single-threaded*: .18s
+84 -85
View File
@@ -14,8 +14,9 @@ const file = @import("inode_data/file.zig");
const misc = @import("inode_data/misc.zig");
const DataReader = @import("util/data.zig");
const ThreadedDataReader = @import("util/data_threaded.zig");
const InodeFinish = @import("util/inode_finish.zig");
const FinishUnion = InodeFinish.FinishUnion;
const MetadataReader = @import("util/metadata.zig");
const XattrTable = @import("xattr.zig");
pub const Ref = packed struct {
block_offset: u16,
@@ -155,7 +156,7 @@ fn entriesFromData(alloc: std.mem.Allocator, archive: Archive, data: anytype) ![
return DirEntry.readDir(alloc, &meta.interface, data.size);
}
/// Returns the xattr index for the given inode. If the inode isn't an extended variant or doesn't have any, the returned value is the max u32 value (0xFFFFFFFF)
/// Returns the xattr index for the given inode. If the inode isn't an extended variant or doesn't have any, the u32 max is returned (0xFFFFFFFF).
pub fn xattrIdx(self: Inode) u32 {
return switch (self.data) {
.ext_dir => |d| d.xattr_id,
@@ -167,7 +168,9 @@ pub fn xattrIdx(self: Inode) u32 {
};
}
inline fn setPermissionAndXattr(self: Inode, alloc: std.mem.Allocator, archive: *Archive, fil: std.fs.File, options: ExtractionOptions) !void {
/// Applies the Inode's metadata to the given File.
/// Mod time is always set, but permissions and xattrs are set based on the given ExtractionOptions.
pub fn setMetadata(self: Inode, alloc: std.mem.Allocator, archive: *Archive, fil: std.fs.File, options: ExtractionOptions) !void {
const time = @as(i128, self.hdr.mod_time) * 1000000000;
try fil.updateTimes(time, time);
if (!options.ignore_permissions) {
@@ -223,7 +226,7 @@ pub fn extractTo(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path:
var fil = try std.fs.cwd().openFile(path, .{});
defer fil.close();
try self.setPermissionAndXattr(alloc, archive, fil, options);
try self.setMetadata(alloc, archive, fil, options);
},
.file, .ext_file => try self.extractRegFile(alloc, archive, path, options),
.symlink, .ext_symlink => try self.extractSymlink(path),
@@ -231,46 +234,6 @@ pub fn extractTo(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path:
}
}
const Parent = struct {
alloc: std.mem.Allocator,
inode: Inode,
path: []const u8,
archive: *Archive,
options: ExtractionOptions,
wg: WaitGroup = .{},
mut: Mutex = .{},
fn create(alloc: std.mem.Allocator, inode: Inode, path: []const u8, archive: *Archive, options: ExtractionOptions, dir_size: usize) !*Parent {
const out = try alloc.create(Parent);
errdefer alloc.destroy(out);
out.* = .{
.alloc = alloc,
.inode = inode,
.path = path,
.archive = archive,
.options = options,
};
out.wg.startMany(dir_size);
return out;
}
fn finish(p: *Parent) !void {
p.mut.lock();
{
defer p.mut.unlock();
p.wg.finish();
if (!p.wg.isDone()) return;
}
defer p.alloc.destroy(p);
var fil = try std.fs.cwd().openFile(p.path, .{});
defer fil.close();
try p.inode.setPermissionAndXattr(p.alloc, p.archive, fil, p.options);
}
};
/// Extract the inode to the given path. Multi-threaded.
/// Functions identically to extractTo on all but regular files and directories.
fn extractToThreaded(self: Inode, allocator: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions) !void {
@@ -294,13 +257,13 @@ fn extractToThreaded(self: Inode, allocator: std.mem.Allocator, archive: *Archiv
var out_err: ?anyerror = null;
wg.start();
self.extractThread(alloc, archive, path, options, &wg, &pool, &out_err, null);
self.extractThread(alloc, archive, path, options, .{ .wg = &wg }, &pool, &out_err);
pool.waitAndWork(&wg);
if (out_err != null) return out_err.?;
var fil = try std.fs.cwd().openFile(path, .{});
defer fil.close();
try self.setPermissionAndXattr(alloc, archive, fil, options);
try self.setMetadata(alloc, archive, fil, options);
},
.file, .ext_file => {
var pool: Pool = undefined;
@@ -314,11 +277,17 @@ fn extractToThreaded(self: Inode, allocator: std.mem.Allocator, archive: *Archiv
var thread_alloc: std.heap.ThreadSafeAllocator = .{ .child_allocator = arena_alloc.allocator() };
const alloc = thread_alloc.allocator();
try self.extractRegFileThreaded(alloc, archive, path, options, &pool);
var wg: WaitGroup = .{};
var out_err: ?anyerror = null;
self.extractThread(alloc, archive, path, options, .{ .wg = &wg }, &pool, &out_err);
pool.waitAndWork(&wg);
if (out_err != null) return out_err.?;
var fil = try std.fs.cwd().openFile(path, .{});
defer fil.close();
try self.setPermissionAndXattr(alloc, archive, fil, options);
try self.setMetadata(alloc, archive, fil, options);
},
.symlink, .ext_symlink => try self.extractSymlink(path),
else => try self.extractDevice(allocator, archive, path, options),
@@ -331,13 +300,12 @@ fn extractThreadEntry(
archive: *Archive,
path: []const u8,
options: ExtractionOptions,
wg: *WaitGroup,
finish: FinishUnion,
pool: *Pool,
out_err: *?anyerror,
parent: ?*Parent,
) void {
var new_path = alloc.alloc(u8, path.len + entry.name.len + 1) catch |err| {
wg.finish();
finish.finish();
out_err.* = err;
return;
};
@@ -346,10 +314,10 @@ fn extractThreadEntry(
new_path[path.len] = '/';
var inode = readFromEntry(alloc, archive, entry) catch |err| {
out_err.* = err;
wg.finish();
finish.finish();
return;
};
inode.extractThread(alloc, archive, new_path, options, wg, pool, out_err, parent);
inode.extractThread(alloc, archive, new_path, options, finish, pool, out_err);
}
/// Extract threadedly the inode to the path.
@@ -359,21 +327,13 @@ fn extractThread(
archive: *Archive,
path: []const u8,
options: ExtractionOptions,
wg: *WaitGroup,
finish: FinishUnion,
pool: *Pool,
out_err: *?anyerror,
parent: ?*Parent,
) void {
if (options.verbose)
options.verbose_writer.?.print("Extracting inode #{} to {s}\n", .{ self.hdr.num, path }) catch {};
defer {
if (parent != null) parent.?.finish() catch |err| {
if (options.verbose)
options.verbose_writer.?.print("Error setting folder permission to {s}: {}\n", .{ path, err }) catch {};
out_err.* = err;
};
wg.finish();
}
defer finish.finish();
if (out_err.* != null) return;
switch (self.hdr.inode_type) {
.dir, .ext_dir => {
@@ -390,15 +350,35 @@ fn extractThread(
out_err.* = err;
return;
};
const p = Parent.create(alloc, self, path, archive, options, entries.len) catch |err| {
const fin = InodeFinish.create(
alloc,
self,
path,
archive,
options,
finish,
out_err,
null,
entries.len,
) catch |err| {
if (options.verbose)
options.verbose_writer.?.print("Error allocating memory\n", .{}) catch {};
out_err.* = err;
return;
};
wg.startMany(entries.len);
// defer files.deinit(alloc); We don't need to do this due to ArenaAllocator
for (entries) |entry| {
if (entry.inode_type == .dir) {
extractThreadEntry(entry, alloc, archive, path, options, wg, pool, out_err, p);
extractThreadEntry(
entry,
alloc,
archive,
path,
options,
.{ .fin = fin },
pool,
out_err,
);
continue;
}
pool.spawn(
@@ -409,13 +389,12 @@ fn extractThread(
archive,
path,
options,
wg,
FinishUnion{ .fin = fin },
pool,
out_err,
p,
},
) catch |err| {
wg.finish();
fin.finish();
if (options.verbose)
options.verbose_writer.?.print("Error starting extraction thread: {}\n", .{err}) catch {};
out_err.* = err;
@@ -424,10 +403,42 @@ fn extractThread(
}
},
.file, .ext_file => {
self.extractRegFileThreaded(alloc, archive, path, options, pool) catch |err| {
const fil = std.fs.cwd().createFile(path, .{}) catch |err| {
if (options.verbose)
options.verbose_writer.?.print("Error extracting file inode #{} to {s}: {}\n", .{ self.hdr.num, path, err }) catch {};
options.verbose_writer.?.print("Error creating {s}: {}\n", .{ path, err }) catch {};
out_err.* = err;
return;
};
var data = self.threadedDataReader(alloc, archive) catch |err| {
if (options.verbose)
options.verbose_writer.?.print(
"Error creating data reader for inode #{} (extracting to {s}): {}\n",
.{ self.hdr.num, path, err },
) catch {};
out_err.* = err;
return;
};
const fin = InodeFinish.create(
alloc,
self,
path,
archive,
options,
finish,
out_err,
fil,
data.num_blocks,
) catch |err| {
if (options.verbose)
options.verbose_writer.?.print("Error allocating memory\n", .{}) catch {};
out_err.* = err;
return;
};
data.extractThreaded(fil, pool, fin) catch |err| {
if (options.verbose)
options.verbose_writer.?.print("Error spawning threads: {}\n", .{err}) catch {};
out_err.* = err;
return;
};
},
.symlink, .ext_symlink => {
@@ -459,19 +470,7 @@ fn extractRegFile(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path
_ = try dat_rdr.interface.streamRemaining(&wrt.interface);
try wrt.interface.flush();
try self.setPermissionAndXattr(alloc, archive, fil, options);
}
/// Extract the inode file contents to the given path threadedly.
/// pool is used to spawn threads.
///
/// Assumes the inode is a file or ext_file type.
fn extractRegFileThreaded(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions, pool: *Pool) !void {
var fil = try std.fs.cwd().createFile(path, .{});
defer fil.close();
var data = try self.threadedDataReader(alloc, archive);
try data.extractThreaded(fil, pool);
try self.setPermissionAndXattr(alloc, archive, fil, options);
try self.setMetadata(alloc, archive, fil, options);
}
/// Creates the symlink described by the inode.
///
@@ -533,5 +532,5 @@ fn extractDevice(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path:
}
var fil = try std.fs.cwd().openFile(path, .{});
defer fil.close();
try self.setPermissionAndXattr(alloc, archive, fil, options);
try self.setMetadata(alloc, archive, fil, options);
}
+45 -48
View File
@@ -11,6 +11,7 @@ const Archive = @import("../archive.zig");
const FragEntry = Archive.FragEntry;
const DecompFn = @import("../decomp.zig").DecompFn;
const BlockSize = @import("../inode_data/file.zig").BlockSize;
const InodeFinish = @import("inode_finish.zig");
const OffsetFile = @import("offset_file.zig");
const ThreadedDataReader = @This();
@@ -25,6 +26,7 @@ blocks: []BlockSize,
frag: ?FragEntry = null, // TODO: do something better?
frag_offset: u32 = 0,
size: u64,
num_blocks: usize,
start_offset: u64,
@@ -34,8 +36,12 @@ pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, sta
.fil = archive.fil,
.decomp = archive.decomp,
.block_size = archive.super.block_size,
.blocks = blocks,
.size = size,
.num_blocks = blocks.len,
.start_offset = start,
};
}
@@ -43,141 +49,132 @@ pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, sta
pub fn addFragment(self: *ThreadedDataReader, entry: FragEntry, frag_offset: u32) void {
self.frag = entry;
self.frag_offset = frag_offset;
}
fn numBlocks(self: ThreadedDataReader) usize {
var res = self.blocks.len;
if (self.frag != null) res += 1;
return res;
self.num_blocks = self.blocks.len + 1;
}
/// Extract the data to the file threadedly, using pool to spawn threads.
/// If multiple errors occur, thread spawning errors will have, then the last decompression error that occurs;
///
/// The function must be called from an unused DataReader. The DataReader is still usable afterwards.
/// If only extractThreaded is used, there is no need to call deinit() afterwards.
///
/// The file will always be written to starting at 0.
pub fn extractThreaded(self: ThreadedDataReader, file: std.fs.File, pool: *Pool) !void {
var wg: WaitGroup = .{};
wg.startMany(self.numBlocks());
var out_err: ?anyerror = null;
/// This function only returns an error if pool.spawn fails. For actual extraction errors finish.out_err will be set.
pub fn extractThreaded(self: ThreadedDataReader, file: std.fs.File, pool: *Pool, finish: *InodeFinish) !void {
var cur_write_offset: u64 = 0;
var cur_read_offset: u64 = self.start_offset;
for (0..self.blocks.len) |i| {
const cur_block_size = if (i == self.numBlocks() - 1) self.size % self.block_size else self.block_size;
try pool.spawn(workThreadBlocks, .{ self, file, cur_write_offset, cur_read_offset, self.blocks[i], cur_block_size, &wg, &out_err });
const cur_block_size = if (i == self.num_blocks - 1) self.size % self.block_size else self.block_size;
try pool.spawn(workThreadBlocks, .{ self, file, cur_write_offset, cur_read_offset, self.blocks[i], cur_block_size, finish });
cur_write_offset += cur_block_size;
cur_read_offset += self.blocks[i].size;
}
if (self.frag != null) {
try pool.spawn(workThreadFragment, .{ self, file, cur_write_offset, &wg, &out_err });
}
pool.waitAndWork(&wg);
if (out_err != null) return out_err.?;
if (self.frag != null)
try pool.spawn(workThreadFragment, .{ self, file, cur_write_offset, finish });
}
fn workThreadBlocks(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, read_offset: u64, block: BlockSize, cur_block_size: u64, wg: *WaitGroup, out_err: *?anyerror) void {
defer wg.finish();
fn workThreadBlocks(
self: ThreadedDataReader,
fil: std.fs.File,
write_offset: u64,
read_offset: u64,
block: BlockSize,
cur_block_size: u64,
finish: *InodeFinish,
) void {
defer finish.finish();
var wrt = fil.writer(&[0]u8{});
wrt.seekTo(write_offset) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer wrt.interface.flush() catch |err| {
out_err.* = err;
finish.out_err.* = err;
};
if (block.size == 0) {
wrt.interface.splatByteAll(0, cur_block_size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
return;
}
var rdr = self.fil.readerAt(read_offset, &[0]u8{}) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
if (block.uncompressed) {
rdr.interface.streamExact(&wrt.interface, block.size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
return;
}
// TODO: shared buffers
const read_buf = self.alloc.alloc(u8, block.size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer self.alloc.free(read_buf);
rdr.interface.readSliceAll(read_buf) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
// TODO: shared buffers
const res_buf = self.alloc.alloc(u8, cur_block_size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer self.alloc.free(res_buf);
_ = self.decomp(self.alloc, read_buf, res_buf) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
wrt.interface.writeAll(res_buf) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
}
fn workThreadFragment(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, wg: *WaitGroup, out_err: *?anyerror) void {
defer wg.finish();
fn workThreadFragment(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, finish: *InodeFinish) void {
defer finish.finish();
var wrt = fil.writer(&[0]u8{});
wrt.seekTo(write_offset) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer wrt.interface.flush() catch |err| {
out_err.* = err;
finish.out_err.* = err;
};
var rdr = self.fil.readerAt(self.frag.?.start, &[0]u8{}) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
if (self.frag.?.size.uncompressed) {
rdr.interface.discardAll(self.frag_offset) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
rdr.interface.streamExact(&wrt.interface, self.size % self.block_size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
return;
}
const tmp_buf = self.alloc.alloc(u8, self.frag.?.size.size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer self.alloc.free(tmp_buf);
rdr.interface.readSliceAll(tmp_buf) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
const needed_block = self.alloc.alloc(u8, self.block_size) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
defer self.alloc.free(needed_block);
_ = self.decomp(self.alloc, tmp_buf, needed_block) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
wrt.interface.writeAll(needed_block[self.frag_offset .. self.frag_offset + (self.size % self.block_size)]) catch |err| {
out_err.* = err;
finish.out_err.* = err;
return;
};
}
+93
View File
@@ -0,0 +1,93 @@
const std = @import("std");
const WaitGroup = std.Thread.WaitGroup;
const Mutex = std.Thread.Mutex;
const Archive = @import("../archive.zig");
const Inode = @import("../inode.zig");
const ExtractionOptions = @import("../options.zig");
const InodeFinish = @This();
const FinishEnum = enum {
wg,
fin,
};
pub const FinishUnion = union(FinishEnum) {
wg: *WaitGroup,
fin: *InodeFinish,
pub fn finish(self: FinishUnion) void {
switch (self) {
.wg => |wg| wg.finish(),
.fin => |fin| fin.finish(),
}
}
};
alloc: std.mem.Allocator,
inode: Inode,
path: []const u8,
archive: *Archive,
options: ExtractionOptions,
parent_finish: FinishUnion,
fil: ?std.fs.File,
out_err: *?anyerror,
wg: WaitGroup = .{},
mut: Mutex = .{},
pub fn create(
alloc: std.mem.Allocator,
inode: Inode,
path: []const u8,
archive: *Archive,
options: ExtractionOptions,
parent_finish: FinishUnion,
out_err: *?anyerror,
fil: ?std.fs.File,
work_size: usize,
) !*InodeFinish {
const out = try alloc.create(InodeFinish);
errdefer alloc.destroy(out);
out.* = .{
.alloc = alloc,
.inode = inode,
.path = path,
.archive = archive,
.options = options,
.parent_finish = parent_finish,
.out_err = out_err,
.fil = fil,
};
out.wg.startMany(work_size);
return out;
}
pub fn finish(self: *InodeFinish) void {
self.mut.lock();
{
defer self.mut.unlock();
self.wg.finish();
if (!self.wg.isDone()) return;
}
defer {
self.parent_finish.finish();
self.alloc.destroy(self);
}
if (self.fil == null)
self.fil = std.fs.cwd().openFile(self.path, .{}) catch |err| {
if (self.options.verbose)
self.options.verbose_writer.?.print("Error opening {s} to set metadata: {}\n", .{ self.path, err }) catch {};
self.out_err.* = err;
return;
};
defer self.fil.?.close();
self.inode.setMetadata(self.alloc, self.archive, self.fil.?, self.options) catch |err| {
if (self.options.verbose)
self.options.verbose_writer.?.print("Error setting metadata to {s}: {}\n", .{ self.path, err }) catch {};
self.out_err.* = err;
return;
};
}