Finished adding multi-threaded extraction.

Added option in unsquashfs to specify the number of threads used.
Changed some functions to accept an allocator instead of just using
Archive's
Fixed run_tests.sh due to new c libraries
This commit is contained in:
Caleb J. Gardner
2026-02-08 15:09:02 -06:00
parent b892adacd7
commit 5ec12b5786
7 changed files with 385 additions and 57 deletions
+8 -1
View File
@@ -1,3 +1,10 @@
#!/bin/sh
zig test -lc -lzstd src/test.zig
zig test \
-lc \
-lz \
-llzma \
-lminilzo \
-llz4 \
-lzstd \
src/test.zig
-1
View File
@@ -56,7 +56,6 @@ pub fn init(alloc: std.mem.Allocator, fil: File) !Archive {
fil,
0,
try std.Thread.getCpuCount(),
@min(DEFAULT_MEM_SIZE, try std.process.totalSystemMemory() / 2),
);
}
/// Create the Archive dictating the amount of threads & memory used.
+16 -1
View File
@@ -12,8 +12,11 @@ const help_mgs =
\\
\\Options:
\\ -d <location> Extract to the given location instead of "squashfs-root"
\\
\\ -o <offset> Start reading the archive at the given offset.
\\
\\ -p <threads> Specify how many threads to use. If no present, the system's logical cores count is used.
\\
\\ --help Display this messages
\\ --version Display the version
\\
@@ -24,6 +27,7 @@ const errors = error{InvalidArguments};
var archive: []const u8 = "";
var extLoc: []const u8 = "squashfs-root";
var offset: u64 = 0;
var threads: u32 = 0;
pub fn main() !void {
const alloc = std.heap.smp_allocator;
@@ -38,7 +42,7 @@ pub fn main() !void {
}
var fil: std.fs.File = try std.fs.cwd().openFile(archive, .{}); //TODO: Handle error gracefully.
defer fil.close();
var arc: squashfs.Archive = try .initAdvanced(alloc, fil, offset, try std.Thread.getCpuCount(), 0); //TODO: Update when memory size matters. //TODO: Handle error gracefully.
var arc: squashfs.Archive = try .initAdvanced(alloc, fil, offset, threads); //TODO: Update when memory size matters. //TODO: Handle error gracefully.
defer arc.deinit();
try arc.extract(extLoc, .Default); //TODO: Handle error gracefully.
}
@@ -67,6 +71,17 @@ fn handleArgs(alloc: std.mem.Allocator, out: *Writer) !void {
}
extLoc = nxt.?;
continue;
} else if (std.mem.eql(u8, arg, "-p")) {
const nxt = args.next();
if (nxt == null or nxt.?.len == 0) {
try out.print("-p must be followed by a number\n", .{});
return errors.InvalidArguments;
}
threads = std.fmt.parseInt(u32, nxt.?, 10) catch {
try out.print("-p must be followed by a number\n", .{});
return errors.InvalidArguments;
};
continue;
} else if (std.mem.eql(u8, arg, "--version")) {
try out.print("zig-unsquashfs version ", .{});
try config.version.format(out);
+1 -1
View File
@@ -56,7 +56,7 @@ pub fn deinit(self: SfsFile) void {
}
fn getEntries(self: SfsFile) ![]DirEntry {
return self.inode.dirEntries(self.archive);
return self.inode.dirEntries(self.archive.allocator(), self.archive.*);
}
pub fn ownerUid(self: SfsFile) !u16 {
+175 -51
View File
@@ -12,6 +12,7 @@ const dir = @import("inode_data/dir.zig");
const file = @import("inode_data/file.zig");
const misc = @import("inode_data/misc.zig");
const DataReader = @import("util/data.zig");
const ThreadedDataReader = @import("util/data_threaded.zig");
const MetadataReader = @import("util/metadata.zig");
pub const Ref = packed struct {
@@ -109,29 +110,43 @@ pub fn deinit(self: Inode, alloc: std.mem.Allocator) void {
}
/// Get the data reader for a file inode.
pub fn dataReader(self: Inode, archive: *Archive) !DataReader {
pub fn dataReader(self: Inode, alloc: std.mem.Allocator, archive: *Archive) !DataReader {
return switch (self.hdr.inode_type) {
.file => readerFromData(archive, self.data.file),
.ext_file => readerFromData(archive, self.data.ext_file),
.file => readerFromData(alloc, archive, self.data.file),
.ext_file => readerFromData(alloc, archive, self.data.ext_file),
else => error.NotRegularFile,
};
}
fn readerFromData(archive: *Archive, data: anytype) !DataReader {
var out: DataReader = .init(archive, data.block_sizes, data.block_start, data.size);
fn readerFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) !DataReader {
var out: DataReader = .init(alloc, archive.*, data.block_sizes, data.block_start, data.size);
if (data.frag_idx != 0xFFFFFFFF)
out.addFragment(try archive.frag(data.frag_idx), data.frag_block_offset);
return out;
}
/// Get a threaded data reader for a file inode.
pub fn threadedDataReader(self: Inode, alloc: std.mem.Allocator, archive: *Archive) !ThreadedDataReader {
return switch (self.hdr.inode_type) {
.file => threadedReaderFromData(alloc, archive, self.data.file),
.ext_file => threadedReaderFromData(alloc, archive, self.data.ext_file),
else => error.NotRegularFile,
};
}
fn threadedReaderFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) !ThreadedDataReader {
var out: ThreadedDataReader = .init(alloc, archive.*, data.block_sizes, data.block_start, data.size);
if (data.frag_idx != 0xFFFFFFFF)
out.addFragment(try archive.frag(data.frag_idx), data.frag_block_offset);
return out;
}
/// Get the directory entries for a directory inode.
pub fn dirEntries(self: Inode, alloc: std.mem.Allocator, archive: *Archive) ![]DirEntry {
pub fn dirEntries(self: Inode, alloc: std.mem.Allocator, archive: Archive) ![]DirEntry {
return switch (self.hdr.inode_type) {
.dir => entriesFromData(alloc, archive, self.data.dir),
.ext_dir => entriesFromData(alloc, archive, self.data.ext_dir),
else => error.NotDirectory,
};
}
fn entriesFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) ![]DirEntry {
fn entriesFromData(alloc: std.mem.Allocator, archive: Archive, data: anytype) ![]DirEntry {
var rdr = try archive.fil.readerAt(archive.super.dir_start + data.block_start, &[0]u8{});
var meta: MetadataReader = .init(alloc, &rdr.interface, archive.decomp);
try meta.interface.discardAll(data.block_offset);
@@ -148,7 +163,7 @@ pub fn extractTo(self: Inode, archive: *Archive, path: []const u8, options: Extr
if (err != std.fs.Dir.MakeError.PathAlreadyExists) return err;
};
var alloc = archive.allocator();
const entries = try self.dirEntries(archive);
const entries = try self.dirEntries(alloc, archive.*);
defer {
for (entries) |entry| entry.deinit(alloc);
alloc.free(entries);
@@ -160,12 +175,12 @@ pub fn extractTo(self: Inode, archive: *Archive, path: []const u8, options: Extr
new_path[path.len] = '/';
defer alloc.free(new_path);
var inode: Inode = try readFromEntry(archive, entry);
var inode: Inode = try readFromEntry(alloc, archive, entry);
defer inode.deinit(alloc);
try inode.extractTo(archive, new_path, options);
}
},
.file, .ext_file => try self.extractRegFile(archive, path, options),
.file, .ext_file => try self.extractRegFile(archive.allocator(), archive, path, options),
.symlink, .ext_symlink => try self.extractSymlink(path),
else => try self.extractDevice(archive, path, options),
}
@@ -186,59 +201,174 @@ pub fn extractToThreaded(self: Inode, archive: *Archive, path: []const u8, optio
if (threads <= 1) return self.extractTo(archive, path, options);
switch (self.hdr.inode_type) {
.dir, .ext_dir => {
// Removing any trailing separators since that's the easiest path forward.
if (path[path.len - 1] == '/') return self.extractToThreaded(archive, path[0 .. path.len - 1], options, threads);
var arena_alloc: std.heap.ArenaAllocator = .init(archive.allocator());
defer arena_alloc.deinit();
var alloc = arena_alloc.allocator();
const alloc = arena_alloc.allocator();
var wg: WaitGroup = .{};
var perms: ?std.ArrayList(Perms) = if (options.ignore_permissions) null else try .initCapacity(alloc, 100);
// defer if(!options.ignore_permissions) perms.?.deinit(alloc); We don't need to do this due to ArenaAllocator
var pool: Pool = undefined;
try pool.init(.{ .n_jobs = threads });
try pool.init(.{ .allocator = alloc, .n_jobs = threads - 1 });
defer pool.deinit();
var out_err: ?anyerror = null;
wg.start();
self.extractThread(alloc, archive, path, options, &wg, &pool, &out_err, &perms);
pool.waitAndWork(&wg);
if (out_err != null) return out_err.?;
const entries = try self.dirEntries(archive);
var files: std.ArrayList(*DirEntry) = try .initCapacity(alloc, 100);
// defer files.deinit(alloc); We don't need to do this due to ArenaAllocator
try self.extractThread(alloc, archive, path, options, &wg, &pool, if (perms == null) null else &perms);
wg.wait();
if (perms != null) {
for (perms.items) |p| {
var i = perms.?.items.len - 1;
while (i >= 0) {
const p = perms.?.items[i];
var fil = try std.fs.cwd().openFile(p.path, .{});
try fil.chmod(p.perm);
try fil.chown(p.uid, p.gid);
i -= 1;
}
}
},
.file, .ext_file => {
return error.TODO;
const alloc = archive.allocator();
var pool: Pool = undefined;
try pool.init(.{ .allocator = alloc, .n_jobs = threads });
defer pool.deinit();
try self.extractRegFileThreaded(alloc, archive, path, options, &pool);
if (!options.ignore_permissions) {
var fil = try std.fs.cwd().openFile(path, .{});
try fil.chmod(self.hdr.permissions);
try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx));
}
},
.symlink, .ext_symlink => try self.extractSymlink(path),
else => try self.extractDevice(archive, path, options),
}
return error.TODO;
}
fn extractThreadEntry(
entry: DirEntry,
alloc: std.mem.Allocator,
archive: *Archive,
path: []const u8,
options: ExtractionOptions,
wg: *WaitGroup,
pool: *Pool,
out_err: *?anyerror,
perms: *?std.ArrayList(Perms),
) void {
var new_path = alloc.alloc(u8, path.len + entry.name.len + 1) catch |err| {
wg.finish();
out_err.* = err;
return;
};
@memcpy(new_path[0..path.len], path);
@memcpy(new_path[path.len + 1 ..], entry.name);
new_path[path.len] = '/';
var inode = readFromEntry(alloc, archive, entry) catch |err| {
out_err.* = err;
wg.finish();
return;
};
inode.extractThread(alloc, archive, new_path, options, wg, pool, out_err, perms);
}
/// Extract threadedly the inode to the path.
fn extractThread(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions, wg: *WaitGroup, pool: *Pool, perms: ?*std.ArrayList(Perms)) !void {
_ = pool;
_ = perms;
_ = archive;
fn extractThread(
self: Inode,
alloc: std.mem.Allocator,
archive: *Archive,
path: []const u8,
options: ExtractionOptions,
wg: *WaitGroup,
pool: *Pool,
out_err: *?anyerror,
perms: *?std.ArrayList(Perms),
) void {
defer wg.finish();
if (out_err.* != null) return;
switch (self.hdr.inode_type) {
.dir, .ext_dir => {
//TOOD
return error.TODO;
std.fs.cwd().makeDir(path) catch |err| {
if (err != std.fs.Dir.MakeError.PathAlreadyExists) {
out_err.* = err;
return;
}
};
const entries = self.dirEntries(alloc, archive.*) catch |err| {
out_err.* = err;
return;
};
wg.startMany(entries.len);
// defer files.deinit(alloc); We don't need to do this due to ArenaAllocator
for (entries) |entry| {
if (entry.inode_type == .dir) {
extractThreadEntry(entry, alloc, archive, path, options, wg, pool, out_err, perms);
continue;
}
pool.spawn(
extractThreadEntry,
.{
entry,
alloc,
archive,
path,
options,
wg,
pool,
out_err,
perms,
},
) catch |err| {
wg.finish();
out_err.* = err;
continue;
};
}
if (!options.ignore_permissions) {
const new_val = perms.*.?.addOne(alloc) catch |err| {
out_err.* = err;
return;
};
new_val.* = .{
.path = path,
.uid = archive.id(self.hdr.uid_idx) catch |err| {
out_err.* = err;
return;
},
.gid = archive.id(self.hdr.gid_idx) catch |err| {
out_err.* = err;
return;
},
.perm = self.hdr.permissions,
};
}
},
.file, .ext_file => {
//TOOD
return error.TODO;
self.extractRegFileThreaded(alloc, archive, path, options, pool) catch |err| {
out_err.* = err;
return;
};
},
.symlink, .ext_symlink => {
defer wg.finish();
try self.extractSymlink(path);
self.extractSymlink(path) catch |err| {
wg.finish();
out_err.* = err;
};
},
else => {
defer wg.finish();
try self.extractDevice(path, options.ignore_permissions);
self.extractDevice(archive, path, options) catch |err| {
wg.finish();
out_err.* = err;
return;
};
},
}
}
@@ -246,12 +376,11 @@ fn extractThread(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path:
/// Optionally set owner & permissions.
///
/// Assumes the inode is a file or ext_file type.
fn extractRegFile(self: Inode, archive: *Archive, path: []const u8, options: ExtractionOptions) !void {
fn extractRegFile(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions) !void {
var fil = try std.fs.cwd().createFile(path, .{});
defer fil.close();
var buf: [8192]u8 = undefined;
var wrt = fil.writer(&buf);
var dat_rdr = try self.dataReader(archive);
var wrt = fil.writer(&[0]u8{});
var dat_rdr = try self.dataReader(alloc, archive);
defer dat_rdr.deinit();
_ = try dat_rdr.interface.streamRemaining(&wrt.interface);
try wrt.interface.flush();
@@ -262,24 +391,19 @@ fn extractRegFile(self: Inode, archive: *Archive, path: []const u8, options: Ext
try fil.chmod(self.hdr.permissions);
try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx));
}
if (!options.ignore_xattr) {
// TODO
}
}
/// TODO: not implemented
/// Extract the inode file contents to the given path.
/// The extraction will be done threaded using pool for threads and will call wg.finish() when done.
/// Extract the inode file contents to the given path threadedly.
/// pool is used to spawn threads.
///
/// Optionally set owner & permissions.
/// Assumes the inode is a file or ext_file type.
fn extractRegFileThreaded(self: Inode, archive: *Archive, path: []const u8, options: ExtractionOptions, pool: *Pool, wg: *WaitGroup) !void {
_ = self;
_ = archive;
_ = path;
_ = options;
_ = pool;
_ = wg;
return error.TODO;
fn extractRegFileThreaded(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions, pool: *Pool) !void {
var fil = try std.fs.cwd().createFile(path, .{});
var data = try self.threadedDataReader(alloc, archive);
try data.extractThreaded(fil, pool);
if (!options.ignore_permissions) {
try fil.chmod(self.hdr.permissions);
try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx));
}
}
/// Creates the symlink described by the inode.
///
+2 -2
View File
@@ -29,9 +29,9 @@ interface: Reader,
cur_offset: u64,
block_idx: u32 = 0,
pub fn init(archive: *Archive, blocks: []BlockSize, start: u64, size: u64) DataReader {
pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, start: u64, size: u64) DataReader {
return .{
.alloc = archive.allocator(),
.alloc = alloc,
.fil = archive.fil,
.decomp = archive.decomp,
.block_size = archive.super.block_size,
+183
View File
@@ -0,0 +1,183 @@
//! Similiar to DataReader, but set-up for threaded writing to files.
const std = @import("std");
const Reader = std.Io.Reader;
const Writer = std.Io.Writer;
const Limit = std.Io.Limit;
const WaitGroup = std.Thread.WaitGroup;
const Pool = std.Thread.Pool;
const Archive = @import("../archive.zig");
const FragEntry = Archive.FragEntry;
const DecompFn = @import("../decomp.zig").DecompFn;
const BlockSize = @import("../inode_data/file.zig").BlockSize;
const OffsetFile = @import("offset_file.zig");
const ThreadedDataReader = @This();
alloc: std.mem.Allocator,
fil: OffsetFile,
decomp: DecompFn,
block_size: u32,
blocks: []BlockSize,
frag: ?FragEntry = null, // TODO: do something better?
frag_offset: u32 = 0,
size: u64,
start_offset: u64,
pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, start: u64, size: u64) ThreadedDataReader {
return .{
.alloc = alloc,
.fil = archive.fil,
.decomp = archive.decomp,
.block_size = archive.super.block_size,
.blocks = blocks,
.size = size,
.start_offset = start,
};
}
pub fn addFragment(self: *ThreadedDataReader, entry: FragEntry, frag_offset: u32) void {
self.frag = entry;
self.frag_offset = frag_offset;
}
fn numBlocks(self: ThreadedDataReader) usize {
var res = self.blocks.len;
if (self.frag != null) res += 1;
return res;
}
/// Extract the data to the file threadedly, using pool to spawn threads.
/// If multiple errors occur, thread spawning errors will have, then the last decompression error that occurs;
///
/// The function must be called from an unused DataReader. The DataReader is still usable afterwards.
/// If only extractThreaded is used, there is no need to call deinit() afterwards.
///
/// The file will always be written to starting at 0.
pub fn extractThreaded(self: ThreadedDataReader, file: std.fs.File, pool: *Pool) !void {
var wg: WaitGroup = .{};
wg.startMany(self.numBlocks());
var out_err: ?anyerror = null;
var cur_write_offset: u64 = 0;
var cur_read_offset: u64 = self.start_offset;
for (0..self.blocks.len) |i| {
const cur_block_size = if (i == self.numBlocks() - 1) self.size % self.block_size else self.block_size;
try pool.spawn(workThreadBlocks, .{ self, file, cur_write_offset, cur_read_offset, self.blocks[i], cur_block_size, &wg, &out_err });
cur_write_offset += cur_block_size;
cur_read_offset += self.blocks[i].size;
}
if (self.frag != null) {
try pool.spawn(workThreadFragment, .{ self, file, cur_write_offset, &wg, &out_err });
}
pool.waitAndWork(&wg);
if (out_err != null) return out_err.?;
}
fn workThreadBlocks(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, read_offset: u64, block: BlockSize, cur_block_size: u64, wg: *WaitGroup, out_err: *?anyerror) void {
defer wg.finish();
var wrt = fil.writer(&[0]u8{});
wrt.seekTo(write_offset) catch |err| {
out_err.* = err;
return;
};
defer wrt.interface.flush() catch |err| {
out_err.* = err;
};
if (block.size == 0) {
wrt.interface.splatByteAll(0, cur_block_size) catch |err| {
out_err.* = err;
return;
};
return;
}
var rdr = self.fil.readerAt(read_offset, &[0]u8{}) catch |err| {
out_err.* = err;
return;
};
if (block.uncompressed) {
rdr.interface.streamExact(&wrt.interface, block.size) catch |err| {
out_err.* = err;
return;
};
return;
}
// TODO: shared buffers
const read_buf = self.alloc.alloc(u8, block.size) catch |err| {
out_err.* = err;
return;
};
defer self.alloc.free(read_buf);
rdr.interface.readSliceAll(read_buf) catch |err| {
out_err.* = err;
return;
};
// TODO: shared buffers
const res_buf = self.alloc.alloc(u8, cur_block_size) catch |err| {
out_err.* = err;
return;
};
defer self.alloc.free(res_buf);
_ = self.decomp(self.alloc, read_buf, res_buf) catch |err| {
out_err.* = err;
return;
};
wrt.interface.writeAll(res_buf) catch |err| {
out_err.* = err;
return;
};
}
fn workThreadFragment(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, wg: *WaitGroup, out_err: *?anyerror) void {
defer wg.finish();
var wrt = fil.writer(&[0]u8{});
wrt.seekTo(write_offset) catch |err| {
out_err.* = err;
return;
};
defer wrt.interface.flush() catch |err| {
out_err.* = err;
};
var rdr = self.fil.readerAt(self.frag.?.start, &[0]u8{}) catch |err| {
out_err.* = err;
return;
};
if (self.frag.?.size.uncompressed) {
rdr.interface.discardAll(self.frag_offset) catch |err| {
out_err.* = err;
return;
};
rdr.interface.streamExact(&wrt.interface, self.size % self.block_size) catch |err| {
out_err.* = err;
return;
};
return;
}
const tmp_buf = self.alloc.alloc(u8, self.frag.?.size.size) catch |err| {
out_err.* = err;
return;
};
defer self.alloc.free(tmp_buf);
rdr.interface.readSliceAll(tmp_buf) catch |err| {
out_err.* = err;
return;
};
const needed_block = self.alloc.alloc(u8, self.block_size) catch |err| {
out_err.* = err;
return;
};
defer self.alloc.free(needed_block);
_ = self.decomp(self.alloc, tmp_buf, needed_block) catch |err| {
out_err.* = err;
return;
};
wrt.interface.writeAll(needed_block[self.frag_offset .. self.frag_offset + (self.size % self.block_size)]) catch |err| {
out_err.* = err;
return;
};
}