diff --git a/run_tests.sh b/run_tests.sh index ec0ff6e..b3cbdf4 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,3 +1,10 @@ #!/bin/sh -zig test -lc -lzstd src/test.zig +zig test \ + -lc \ + -lz \ + -llzma \ + -lminilzo \ + -llz4 \ + -lzstd \ + src/test.zig diff --git a/src/archive.zig b/src/archive.zig index c273b67..4116097 100644 --- a/src/archive.zig +++ b/src/archive.zig @@ -56,7 +56,6 @@ pub fn init(alloc: std.mem.Allocator, fil: File) !Archive { fil, 0, try std.Thread.getCpuCount(), - @min(DEFAULT_MEM_SIZE, try std.process.totalSystemMemory() / 2), ); } /// Create the Archive dictating the amount of threads & memory used. diff --git a/src/bin/unsquashfs.zig b/src/bin/unsquashfs.zig index a6e79da..ef4640d 100644 --- a/src/bin/unsquashfs.zig +++ b/src/bin/unsquashfs.zig @@ -12,8 +12,11 @@ const help_mgs = \\ \\Options: \\ -d Extract to the given location instead of "squashfs-root" + \\ \\ -o Start reading the archive at the given offset. \\ + \\ -p Specify how many threads to use. If no present, the system's logical cores count is used. + \\ \\ --help Display this messages \\ --version Display the version \\ @@ -24,6 +27,7 @@ const errors = error{InvalidArguments}; var archive: []const u8 = ""; var extLoc: []const u8 = "squashfs-root"; var offset: u64 = 0; +var threads: u32 = 0; pub fn main() !void { const alloc = std.heap.smp_allocator; @@ -38,7 +42,7 @@ pub fn main() !void { } var fil: std.fs.File = try std.fs.cwd().openFile(archive, .{}); //TODO: Handle error gracefully. defer fil.close(); - var arc: squashfs.Archive = try .initAdvanced(alloc, fil, offset, try std.Thread.getCpuCount(), 0); //TODO: Update when memory size matters. //TODO: Handle error gracefully. + var arc: squashfs.Archive = try .initAdvanced(alloc, fil, offset, threads); //TODO: Update when memory size matters. //TODO: Handle error gracefully. defer arc.deinit(); try arc.extract(extLoc, .Default); //TODO: Handle error gracefully. } @@ -67,6 +71,17 @@ fn handleArgs(alloc: std.mem.Allocator, out: *Writer) !void { } extLoc = nxt.?; continue; + } else if (std.mem.eql(u8, arg, "-p")) { + const nxt = args.next(); + if (nxt == null or nxt.?.len == 0) { + try out.print("-p must be followed by a number\n", .{}); + return errors.InvalidArguments; + } + threads = std.fmt.parseInt(u32, nxt.?, 10) catch { + try out.print("-p must be followed by a number\n", .{}); + return errors.InvalidArguments; + }; + continue; } else if (std.mem.eql(u8, arg, "--version")) { try out.print("zig-unsquashfs version ", .{}); try config.version.format(out); diff --git a/src/file.zig b/src/file.zig index 6963244..7e012a5 100644 --- a/src/file.zig +++ b/src/file.zig @@ -56,7 +56,7 @@ pub fn deinit(self: SfsFile) void { } fn getEntries(self: SfsFile) ![]DirEntry { - return self.inode.dirEntries(self.archive); + return self.inode.dirEntries(self.archive.allocator(), self.archive.*); } pub fn ownerUid(self: SfsFile) !u16 { diff --git a/src/inode.zig b/src/inode.zig index dab4ce5..d4bf349 100644 --- a/src/inode.zig +++ b/src/inode.zig @@ -12,6 +12,7 @@ const dir = @import("inode_data/dir.zig"); const file = @import("inode_data/file.zig"); const misc = @import("inode_data/misc.zig"); const DataReader = @import("util/data.zig"); +const ThreadedDataReader = @import("util/data_threaded.zig"); const MetadataReader = @import("util/metadata.zig"); pub const Ref = packed struct { @@ -109,29 +110,43 @@ pub fn deinit(self: Inode, alloc: std.mem.Allocator) void { } /// Get the data reader for a file inode. -pub fn dataReader(self: Inode, archive: *Archive) !DataReader { +pub fn dataReader(self: Inode, alloc: std.mem.Allocator, archive: *Archive) !DataReader { return switch (self.hdr.inode_type) { - .file => readerFromData(archive, self.data.file), - .ext_file => readerFromData(archive, self.data.ext_file), + .file => readerFromData(alloc, archive, self.data.file), + .ext_file => readerFromData(alloc, archive, self.data.ext_file), else => error.NotRegularFile, }; } -fn readerFromData(archive: *Archive, data: anytype) !DataReader { - var out: DataReader = .init(archive, data.block_sizes, data.block_start, data.size); +fn readerFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) !DataReader { + var out: DataReader = .init(alloc, archive.*, data.block_sizes, data.block_start, data.size); + if (data.frag_idx != 0xFFFFFFFF) + out.addFragment(try archive.frag(data.frag_idx), data.frag_block_offset); + return out; +} +/// Get a threaded data reader for a file inode. +pub fn threadedDataReader(self: Inode, alloc: std.mem.Allocator, archive: *Archive) !ThreadedDataReader { + return switch (self.hdr.inode_type) { + .file => threadedReaderFromData(alloc, archive, self.data.file), + .ext_file => threadedReaderFromData(alloc, archive, self.data.ext_file), + else => error.NotRegularFile, + }; +} +fn threadedReaderFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) !ThreadedDataReader { + var out: ThreadedDataReader = .init(alloc, archive.*, data.block_sizes, data.block_start, data.size); if (data.frag_idx != 0xFFFFFFFF) out.addFragment(try archive.frag(data.frag_idx), data.frag_block_offset); return out; } /// Get the directory entries for a directory inode. -pub fn dirEntries(self: Inode, alloc: std.mem.Allocator, archive: *Archive) ![]DirEntry { +pub fn dirEntries(self: Inode, alloc: std.mem.Allocator, archive: Archive) ![]DirEntry { return switch (self.hdr.inode_type) { .dir => entriesFromData(alloc, archive, self.data.dir), .ext_dir => entriesFromData(alloc, archive, self.data.ext_dir), else => error.NotDirectory, }; } -fn entriesFromData(alloc: std.mem.Allocator, archive: *Archive, data: anytype) ![]DirEntry { +fn entriesFromData(alloc: std.mem.Allocator, archive: Archive, data: anytype) ![]DirEntry { var rdr = try archive.fil.readerAt(archive.super.dir_start + data.block_start, &[0]u8{}); var meta: MetadataReader = .init(alloc, &rdr.interface, archive.decomp); try meta.interface.discardAll(data.block_offset); @@ -148,7 +163,7 @@ pub fn extractTo(self: Inode, archive: *Archive, path: []const u8, options: Extr if (err != std.fs.Dir.MakeError.PathAlreadyExists) return err; }; var alloc = archive.allocator(); - const entries = try self.dirEntries(archive); + const entries = try self.dirEntries(alloc, archive.*); defer { for (entries) |entry| entry.deinit(alloc); alloc.free(entries); @@ -160,12 +175,12 @@ pub fn extractTo(self: Inode, archive: *Archive, path: []const u8, options: Extr new_path[path.len] = '/'; defer alloc.free(new_path); - var inode: Inode = try readFromEntry(archive, entry); + var inode: Inode = try readFromEntry(alloc, archive, entry); defer inode.deinit(alloc); try inode.extractTo(archive, new_path, options); } }, - .file, .ext_file => try self.extractRegFile(archive, path, options), + .file, .ext_file => try self.extractRegFile(archive.allocator(), archive, path, options), .symlink, .ext_symlink => try self.extractSymlink(path), else => try self.extractDevice(archive, path, options), } @@ -186,59 +201,174 @@ pub fn extractToThreaded(self: Inode, archive: *Archive, path: []const u8, optio if (threads <= 1) return self.extractTo(archive, path, options); switch (self.hdr.inode_type) { .dir, .ext_dir => { + // Removing any trailing separators since that's the easiest path forward. + if (path[path.len - 1] == '/') return self.extractToThreaded(archive, path[0 .. path.len - 1], options, threads); + var arena_alloc: std.heap.ArenaAllocator = .init(archive.allocator()); defer arena_alloc.deinit(); - var alloc = arena_alloc.allocator(); + const alloc = arena_alloc.allocator(); var wg: WaitGroup = .{}; var perms: ?std.ArrayList(Perms) = if (options.ignore_permissions) null else try .initCapacity(alloc, 100); // defer if(!options.ignore_permissions) perms.?.deinit(alloc); We don't need to do this due to ArenaAllocator var pool: Pool = undefined; - try pool.init(.{ .n_jobs = threads }); + try pool.init(.{ .allocator = alloc, .n_jobs = threads - 1 }); + defer pool.deinit(); + var out_err: ?anyerror = null; + + wg.start(); + self.extractThread(alloc, archive, path, options, &wg, &pool, &out_err, &perms); + pool.waitAndWork(&wg); + if (out_err != null) return out_err.?; - const entries = try self.dirEntries(archive); - var files: std.ArrayList(*DirEntry) = try .initCapacity(alloc, 100); - // defer files.deinit(alloc); We don't need to do this due to ArenaAllocator - try self.extractThread(alloc, archive, path, options, &wg, &pool, if (perms == null) null else &perms); - wg.wait(); if (perms != null) { - for (perms.items) |p| { + var i = perms.?.items.len - 1; + while (i >= 0) { + const p = perms.?.items[i]; var fil = try std.fs.cwd().openFile(p.path, .{}); try fil.chmod(p.perm); try fil.chown(p.uid, p.gid); + i -= 1; } } }, .file, .ext_file => { - return error.TODO; + const alloc = archive.allocator(); + + var pool: Pool = undefined; + try pool.init(.{ .allocator = alloc, .n_jobs = threads }); + defer pool.deinit(); + + try self.extractRegFileThreaded(alloc, archive, path, options, &pool); + + if (!options.ignore_permissions) { + var fil = try std.fs.cwd().openFile(path, .{}); + try fil.chmod(self.hdr.permissions); + try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx)); + } }, .symlink, .ext_symlink => try self.extractSymlink(path), else => try self.extractDevice(archive, path, options), } - return error.TODO; +} + +fn extractThreadEntry( + entry: DirEntry, + alloc: std.mem.Allocator, + archive: *Archive, + path: []const u8, + options: ExtractionOptions, + wg: *WaitGroup, + pool: *Pool, + out_err: *?anyerror, + perms: *?std.ArrayList(Perms), +) void { + var new_path = alloc.alloc(u8, path.len + entry.name.len + 1) catch |err| { + wg.finish(); + out_err.* = err; + return; + }; + @memcpy(new_path[0..path.len], path); + @memcpy(new_path[path.len + 1 ..], entry.name); + new_path[path.len] = '/'; + var inode = readFromEntry(alloc, archive, entry) catch |err| { + out_err.* = err; + wg.finish(); + return; + }; + inode.extractThread(alloc, archive, new_path, options, wg, pool, out_err, perms); } /// Extract threadedly the inode to the path. -fn extractThread(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions, wg: *WaitGroup, pool: *Pool, perms: ?*std.ArrayList(Perms)) !void { - _ = pool; - _ = perms; - _ = archive; +fn extractThread( + self: Inode, + alloc: std.mem.Allocator, + archive: *Archive, + path: []const u8, + options: ExtractionOptions, + wg: *WaitGroup, + pool: *Pool, + out_err: *?anyerror, + perms: *?std.ArrayList(Perms), +) void { + defer wg.finish(); + if (out_err.* != null) return; switch (self.hdr.inode_type) { .dir, .ext_dir => { - //TOOD - return error.TODO; + std.fs.cwd().makeDir(path) catch |err| { + if (err != std.fs.Dir.MakeError.PathAlreadyExists) { + out_err.* = err; + return; + } + }; + + const entries = self.dirEntries(alloc, archive.*) catch |err| { + out_err.* = err; + return; + }; + wg.startMany(entries.len); + // defer files.deinit(alloc); We don't need to do this due to ArenaAllocator + for (entries) |entry| { + if (entry.inode_type == .dir) { + extractThreadEntry(entry, alloc, archive, path, options, wg, pool, out_err, perms); + continue; + } + pool.spawn( + extractThreadEntry, + .{ + entry, + alloc, + archive, + path, + options, + wg, + pool, + out_err, + perms, + }, + ) catch |err| { + wg.finish(); + out_err.* = err; + continue; + }; + } + if (!options.ignore_permissions) { + const new_val = perms.*.?.addOne(alloc) catch |err| { + out_err.* = err; + return; + }; + new_val.* = .{ + .path = path, + .uid = archive.id(self.hdr.uid_idx) catch |err| { + out_err.* = err; + return; + }, + .gid = archive.id(self.hdr.gid_idx) catch |err| { + out_err.* = err; + return; + }, + .perm = self.hdr.permissions, + }; + } }, .file, .ext_file => { - //TOOD - return error.TODO; + self.extractRegFileThreaded(alloc, archive, path, options, pool) catch |err| { + out_err.* = err; + return; + }; }, .symlink, .ext_symlink => { - defer wg.finish(); - try self.extractSymlink(path); + self.extractSymlink(path) catch |err| { + wg.finish(); + out_err.* = err; + }; }, else => { - defer wg.finish(); - try self.extractDevice(path, options.ignore_permissions); + self.extractDevice(archive, path, options) catch |err| { + wg.finish(); + out_err.* = err; + return; + }; }, } } @@ -246,12 +376,11 @@ fn extractThread(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: /// Optionally set owner & permissions. /// /// Assumes the inode is a file or ext_file type. -fn extractRegFile(self: Inode, archive: *Archive, path: []const u8, options: ExtractionOptions) !void { +fn extractRegFile(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions) !void { var fil = try std.fs.cwd().createFile(path, .{}); defer fil.close(); - var buf: [8192]u8 = undefined; - var wrt = fil.writer(&buf); - var dat_rdr = try self.dataReader(archive); + var wrt = fil.writer(&[0]u8{}); + var dat_rdr = try self.dataReader(alloc, archive); defer dat_rdr.deinit(); _ = try dat_rdr.interface.streamRemaining(&wrt.interface); try wrt.interface.flush(); @@ -262,24 +391,19 @@ fn extractRegFile(self: Inode, archive: *Archive, path: []const u8, options: Ext try fil.chmod(self.hdr.permissions); try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx)); } - if (!options.ignore_xattr) { - // TODO - } } -/// TODO: not implemented -/// Extract the inode file contents to the given path. -/// The extraction will be done threaded using pool for threads and will call wg.finish() when done. +/// Extract the inode file contents to the given path threadedly. +/// pool is used to spawn threads. /// -/// Optionally set owner & permissions. /// Assumes the inode is a file or ext_file type. -fn extractRegFileThreaded(self: Inode, archive: *Archive, path: []const u8, options: ExtractionOptions, pool: *Pool, wg: *WaitGroup) !void { - _ = self; - _ = archive; - _ = path; - _ = options; - _ = pool; - _ = wg; - return error.TODO; +fn extractRegFileThreaded(self: Inode, alloc: std.mem.Allocator, archive: *Archive, path: []const u8, options: ExtractionOptions, pool: *Pool) !void { + var fil = try std.fs.cwd().createFile(path, .{}); + var data = try self.threadedDataReader(alloc, archive); + try data.extractThreaded(fil, pool); + if (!options.ignore_permissions) { + try fil.chmod(self.hdr.permissions); + try fil.chown(try archive.id(self.hdr.uid_idx), try archive.id(self.hdr.gid_idx)); + } } /// Creates the symlink described by the inode. /// diff --git a/src/util/data.zig b/src/util/data.zig index 2cde849..177e7c1 100644 --- a/src/util/data.zig +++ b/src/util/data.zig @@ -29,9 +29,9 @@ interface: Reader, cur_offset: u64, block_idx: u32 = 0, -pub fn init(archive: *Archive, blocks: []BlockSize, start: u64, size: u64) DataReader { +pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, start: u64, size: u64) DataReader { return .{ - .alloc = archive.allocator(), + .alloc = alloc, .fil = archive.fil, .decomp = archive.decomp, .block_size = archive.super.block_size, diff --git a/src/util/data_threaded.zig b/src/util/data_threaded.zig new file mode 100644 index 0000000..68ec14f --- /dev/null +++ b/src/util/data_threaded.zig @@ -0,0 +1,183 @@ +//! Similiar to DataReader, but set-up for threaded writing to files. + +const std = @import("std"); +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const Limit = std.Io.Limit; +const WaitGroup = std.Thread.WaitGroup; +const Pool = std.Thread.Pool; + +const Archive = @import("../archive.zig"); +const FragEntry = Archive.FragEntry; +const DecompFn = @import("../decomp.zig").DecompFn; +const BlockSize = @import("../inode_data/file.zig").BlockSize; +const OffsetFile = @import("offset_file.zig"); + +const ThreadedDataReader = @This(); + +alloc: std.mem.Allocator, +fil: OffsetFile, +decomp: DecompFn, +block_size: u32, + +blocks: []BlockSize, + +frag: ?FragEntry = null, // TODO: do something better? +frag_offset: u32 = 0, +size: u64, + +start_offset: u64, + +pub fn init(alloc: std.mem.Allocator, archive: Archive, blocks: []BlockSize, start: u64, size: u64) ThreadedDataReader { + return .{ + .alloc = alloc, + .fil = archive.fil, + .decomp = archive.decomp, + .block_size = archive.super.block_size, + .blocks = blocks, + .size = size, + .start_offset = start, + }; +} + +pub fn addFragment(self: *ThreadedDataReader, entry: FragEntry, frag_offset: u32) void { + self.frag = entry; + self.frag_offset = frag_offset; +} + +fn numBlocks(self: ThreadedDataReader) usize { + var res = self.blocks.len; + if (self.frag != null) res += 1; + return res; +} + +/// Extract the data to the file threadedly, using pool to spawn threads. +/// If multiple errors occur, thread spawning errors will have, then the last decompression error that occurs; +/// +/// The function must be called from an unused DataReader. The DataReader is still usable afterwards. +/// If only extractThreaded is used, there is no need to call deinit() afterwards. +/// +/// The file will always be written to starting at 0. +pub fn extractThreaded(self: ThreadedDataReader, file: std.fs.File, pool: *Pool) !void { + var wg: WaitGroup = .{}; + wg.startMany(self.numBlocks()); + var out_err: ?anyerror = null; + + var cur_write_offset: u64 = 0; + var cur_read_offset: u64 = self.start_offset; + for (0..self.blocks.len) |i| { + const cur_block_size = if (i == self.numBlocks() - 1) self.size % self.block_size else self.block_size; + try pool.spawn(workThreadBlocks, .{ self, file, cur_write_offset, cur_read_offset, self.blocks[i], cur_block_size, &wg, &out_err }); + cur_write_offset += cur_block_size; + cur_read_offset += self.blocks[i].size; + } + if (self.frag != null) { + try pool.spawn(workThreadFragment, .{ self, file, cur_write_offset, &wg, &out_err }); + } + pool.waitAndWork(&wg); + if (out_err != null) return out_err.?; +} + +fn workThreadBlocks(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, read_offset: u64, block: BlockSize, cur_block_size: u64, wg: *WaitGroup, out_err: *?anyerror) void { + defer wg.finish(); + var wrt = fil.writer(&[0]u8{}); + wrt.seekTo(write_offset) catch |err| { + out_err.* = err; + return; + }; + defer wrt.interface.flush() catch |err| { + out_err.* = err; + }; + if (block.size == 0) { + wrt.interface.splatByteAll(0, cur_block_size) catch |err| { + out_err.* = err; + return; + }; + return; + } + var rdr = self.fil.readerAt(read_offset, &[0]u8{}) catch |err| { + out_err.* = err; + return; + }; + if (block.uncompressed) { + rdr.interface.streamExact(&wrt.interface, block.size) catch |err| { + out_err.* = err; + return; + }; + return; + } + // TODO: shared buffers + const read_buf = self.alloc.alloc(u8, block.size) catch |err| { + out_err.* = err; + return; + }; + defer self.alloc.free(read_buf); + rdr.interface.readSliceAll(read_buf) catch |err| { + out_err.* = err; + return; + }; + // TODO: shared buffers + const res_buf = self.alloc.alloc(u8, cur_block_size) catch |err| { + out_err.* = err; + return; + }; + defer self.alloc.free(res_buf); + _ = self.decomp(self.alloc, read_buf, res_buf) catch |err| { + out_err.* = err; + return; + }; + wrt.interface.writeAll(res_buf) catch |err| { + out_err.* = err; + return; + }; +} +fn workThreadFragment(self: ThreadedDataReader, fil: std.fs.File, write_offset: u64, wg: *WaitGroup, out_err: *?anyerror) void { + defer wg.finish(); + + var wrt = fil.writer(&[0]u8{}); + wrt.seekTo(write_offset) catch |err| { + out_err.* = err; + return; + }; + defer wrt.interface.flush() catch |err| { + out_err.* = err; + }; + + var rdr = self.fil.readerAt(self.frag.?.start, &[0]u8{}) catch |err| { + out_err.* = err; + return; + }; + if (self.frag.?.size.uncompressed) { + rdr.interface.discardAll(self.frag_offset) catch |err| { + out_err.* = err; + return; + }; + rdr.interface.streamExact(&wrt.interface, self.size % self.block_size) catch |err| { + out_err.* = err; + return; + }; + return; + } + const tmp_buf = self.alloc.alloc(u8, self.frag.?.size.size) catch |err| { + out_err.* = err; + return; + }; + defer self.alloc.free(tmp_buf); + rdr.interface.readSliceAll(tmp_buf) catch |err| { + out_err.* = err; + return; + }; + const needed_block = self.alloc.alloc(u8, self.block_size) catch |err| { + out_err.* = err; + return; + }; + defer self.alloc.free(needed_block); + _ = self.decomp(self.alloc, tmp_buf, needed_block) catch |err| { + out_err.* = err; + return; + }; + wrt.interface.writeAll(needed_block[self.frag_offset .. self.frag_offset + (self.size % self.block_size)]) catch |err| { + out_err.* = err; + return; + }; +}