More work on data reader

This commit is contained in:
Caleb Gardner
2025-08-07 20:47:04 -05:00
parent b3f4a02b72
commit 1269d3e30d
3 changed files with 64 additions and 91 deletions
+6 -24
View File
@@ -63,15 +63,7 @@ pub fn File(comptime T: type) type {
out.entries = try dir.readDirectory(rdr.alloc, &meta, d.size);
},
.file => |f| {
out.data_reader = try .init(
rdr.alloc,
rdr.rdr,
rdr.super.comp,
f.block,
f.size,
f.block_sizes,
rdr.super.block_size,
);
out.data_reader = try .init(rdr, inode);
if (f.hasFragment()) {
try out.data_reader.?.addFragment(
try rdr.frag_table.get(f.frag_idx),
@@ -80,15 +72,7 @@ pub fn File(comptime T: type) type {
}
},
.ext_file => |f| {
out.data_reader = try .init(
rdr.alloc,
rdr.rdr,
rdr.super.comp,
f.block,
f.size,
f.block_sizes,
rdr.super.block_size,
);
out.data_reader = try .init(rdr, inode);
if (f.hasFragment()) {
try out.data_reader.?.addFragment(
try rdr.frag_table.get(f.frag_idx),
@@ -112,7 +96,7 @@ pub fn File(comptime T: type) type {
const inode: Inode = try .init(&meta, rdr.alloc, rdr.super.block_size);
return .init(rdr, inode, ent.name);
}
pub fn deinit(self: Self) void {
pub fn deinit(self: *Self) void {
self.rdr.alloc.free(self.name);
self.inode.deinit(self.rdr.alloc);
if (self.entries != null) {
@@ -190,7 +174,7 @@ pub fn File(comptime T: type) type {
pub const ExtractError = error{FileExists};
pub fn extract(self: Self, op: ExtractionOptions, path: []const u8) !void {
pub fn extract(self: *Self, op: ExtractionOptions, path: []const u8) !void {
var exists = true;
var stat: ?std.fs.File.Stat = null;
if (std.fs.cwd().statFile(path)) |s| {
@@ -224,7 +208,7 @@ pub fn File(comptime T: type) type {
if (errs.items.len > 0) return errs.items[0];
}
fn extractReal(
self: Self,
self: *Self,
op: ExtractionOptions,
path: []const u8,
errs: *std.ArrayList(anyerror),
@@ -365,9 +349,7 @@ pub fn File(comptime T: type) type {
defer if (!complete) self.rdr.alloc.destroy(fil_errs);
fil_errs.* = .init(self.rdr.alloc);
defer if (!complete) fil_errs.deinit();
@constCast(&self.data_reader.?).setPool(pol);
self.data_reader.?.writeToNoBlock(
errs,
ext_fil,
extractRegFinish,
.{
@@ -409,7 +391,7 @@ pub fn File(comptime T: type) type {
};
}
fn extractDirWait(
self: Self,
self: *Self,
op: ExtractionOptions,
path: []const u8,
dir_wg: *WaitGroup,
+55 -64
View File
@@ -15,7 +15,7 @@ const DataReaderError = error{
const DecompCompletion = struct {
errs: std.ArrayList(anyerror),
map: std.ArrayHashMap(usize, []u8),
map: std.AutoArrayHashMap(usize, []u8),
mut: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
@@ -25,13 +25,14 @@ const DecompCompletion = struct {
.map = .init(alloc),
};
}
fn deinit(self: *DecompCompletion) !void {
self.active = false;
fn deinit(self: *DecompCompletion) void {
self.errs.deinit();
self.map.deinit();
}
fn clear(self: *DecompCompletion) void {
self.mut.lock();
defer self.mut.unlock();
self.errs.clearAndFree();
self.map.clearAndFree();
}
@@ -39,20 +40,29 @@ const DecompCompletion = struct {
fn add(self: *DecompCompletion, idx: usize, data: []u8) !void {
self.mut.lock();
defer self.mut.unlock();
defer self.cond.signal();
try self.map.put(idx, data);
}
fn addErr(self: *DecompCompletion, err: anyerror) void {
self.mut.lock();
defer self.mut.unlock();
defer self.cond.signal();
self.errs.append(err) catch {};
}
fn getBlock(self: *DecompCompletion, idx: usize) ?[]u8 {
self.mut.lock();
defer self.mut.unlock();
const res = self.map.fetchSwapRemove(idx);
if(res == null) return null;
if (res == null) return null;
return res.?.value;
}
fn hasErrs(self: DecompCompletion) bool{
fn hasErrs(self: DecompCompletion) bool {
return self.errs.items.len > 0;
}
fn condWait(self: *DecompCompletion) void {
self.cond.wait(self.mut);
}
};
pub fn DataReader(comptime T: type) type {
@@ -101,7 +111,7 @@ pub fn DataReader(comptime T: type) type {
.block_size = rdr.super.block_size,
.sizes = sizes,
.offsets = offsets,
.files_size = file_size,
.file_size = file_size,
.completion = .init(rdr.alloc),
};
}
@@ -133,34 +143,25 @@ pub fn DataReader(comptime T: type) type {
thr.detach();
}
write_thr.join();
if () return errs.items[0];
if (self.completion.hasErrs()) return self.completion.errs.items[0];
}
pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void {
pub fn writeToNoBlock(self: *Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void {
comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite"));
var map: DecompCompletion = .init(self.alloc);
errdefer map.deinit();
var mut = try self.alloc.create(std.Thread.Mutex);
errdefer self.alloc.destroy(mut);
mut.* = .{};
var cond = try self.alloc.create(std.Thread.Condition);
errdefer self.alloc.destroy(cond);
cond.* = .{};
var errs: std.ArrayList(anyerror) = .init(self.alloc);
errdefer errs.deinit();
errdefer self.completions.clear();
var write_thr = try std.Thread.spawn(
.{ .allocator = self.alloc },
writeThread,
.{ self, wrt, &errs, &map, &mut, &cond, finish, finish_args },
.{ self, wrt, finish, finish_args },
);
write_thr.detach();
for (0..self.numBlocks()) |i| {
var thr = std.Thread.spawn(
.{ .allocator = self.alloc },
decompThread,
.{ self, i, &errs, &map, &mut, &cond },
.{ self, i },
) catch |err| {
errs.append(err) catch {};
self.completion.addErr(err) catch {};
};
thr.detach();
}
@@ -199,79 +200,69 @@ pub fn DataReader(comptime T: type) type {
}
fn writeThread(
self: Self,
self: *Self,
wrt: anytype,
errs: *std.ArrayList(anyerror),
map: *DecompCompletion,
mut: *std.Thread.Mutex,
cond: *std.Thread.Condition,
comptime finish: anytype,
finish_args: anytype,
) void {
var cur_idx: usize = 0;
mut.lock();
defer mut.unlock();
while (cur_idx < self.numBlocks() and errs.items.len == 0) {
cond.wait(mut);
if (errs.items.len > 0) break;
self.completion.mut.lock();
defer self.completion.mut.unlock();
while (cur_idx < self.numBlocks() and !self.completion.hasErrs()) {
self.completion.condWait();
if (self.completion.hasErrs()) break;
if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) {
for (map.keys()) |k| {
const blk = map.fetchSwapRemove(k).?.value;
for (self.completion.map.keys()) |k| {
const blk = self.completion.getBlock(k).?;
defer self.alloc.free(blk);
if (blk.len > 0) {
_ = wrt.pwrite(map.fetchSwapRemove(k).?.value, self.block_size * k) catch |err| {
errs.append(err) catch {};
_ = wrt.pwrite(blk, self.block_size * k) catch |err| {
self.completion.addErr(err);
break;
};
} else {
_ = wrt.pwrite(&[1]u8{0}, (self.block_size * (k + 1)) - 1) catch |err| {
errs.append(err) catch {};
self.completion.addErr(err);
break;
};
}
cur_idx += 1;
}
continue;
}
while (map.contains(cur_idx)) {
const blk = map.fetchSwapRemove(cur_idx).?.value;
defer self.alloc.free(blk);
if (blk.len > 0) {
_ = wrt.write(blk) catch |err| {
errs.append(err) catch {};
break;
};
} else {
const blank: [1024 * 1024]u8 = [1]u8{0} ** (1024 * 1024);
_ = wrt.write(blank[0..self.block_size]) catch |err| {
errs.append(err) catch {};
break;
};
} else {
while (self.completion.getBlock(cur_idx)) |blk| {
defer self.alloc.free(blk);
if (blk.len > 0) {
_ = wrt.write(blk) catch |err| {
self.completion.addErr(err);
break;
};
} else {
const blank: [1024 * 1024]u8 = [1]u8{0} ** (1024 * 1024);
_ = wrt.write(blank[0..self.block_size]) catch |err| {
self.completion.addErr(err);
break;
};
}
cur_idx += 1;
}
cur_idx += 1;
}
}
if (comptime @TypeOf(finish) != @TypeOf(null) and @TypeOf(finish_args) != @TypeOf(null)) @call(.auto, finish, finish_args);
}
fn decompThread(
self: Self,
self: *Self,
idx: usize,
errs: *std.ArrayList(anyerror),
map: *DecompCompletion,
mut: *std.Thread.Mutex,
cond: *std.Thread.Condition,
) void {
if (errs.items.len > 0) return;
if (self.completion.hasErrs()) return;
defer self.completion.cond.signal();
const block = self.blockAt(idx) catch |err| {
errs.append(err) catch {};
self.completion.addErr(err);
return;
};
mut.lock();
defer mut.unlock();
map.put(idx, block) catch |err| {
errs.append(err) catch {};
self.completion.add(idx, block) catch |err| {
self.completion.addErr(err);
};
cond.signal();
}
};
}
+3 -3
View File
@@ -16,10 +16,10 @@ test "OpenFile" {
_ = try rdr.id_table.get(rdr.super.id_count - 1);
_ = try rdr.export_table.get(rdr.super.inode_count - 1);
std.debug.print("{}\n", .{rdr.super});
const root = try rdr.root();
var root = try rdr.root();
defer root.deinit();
var iter = root.iterate();
while (try iter.next()) |f| {
while (try iter.next()) |*f| {
defer f.deinit();
std.debug.print("{s}\n", .{f.name});
}
@@ -34,7 +34,7 @@ test "ExtractSingleFile" {
defer sfs_fil.close();
var rdr: SfsFile = try .init(std.testing.allocator, sfs_fil, 0);
defer rdr.deinit();
const fil = try rdr.open(single_file);
var fil = try rdr.open(single_file);
defer fil.deinit();
var op: ExtractionOptions = try .init();
op.verbose = true;