diff --git a/src/reader/data.zig b/src/reader/data.zig index 0edc749..8f6325e 100644 --- a/src/reader/data.zig +++ b/src/reader/data.zig @@ -10,19 +10,48 @@ const Compression = @import("../superblock.zig").Compression; const DataReaderError = error{ EOF, InvalidIndex, + ExtractionActive, }; -const CompletionMap = struct{ +const DecompCompletion = struct { errs: std.ArrayList(anyerror), map: std.ArrayHashMap(usize, []u8), mut: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, - fn init(alloc: std.mem.Allocator) CompletionMap{ + fn init(alloc: std.mem.Allocator) DecompCompletion { return .{ .errs = .init(alloc), - .map = .init(alloc) - } + .map = .init(alloc), + }; + } + fn deinit(self: *DecompCompletion) !void { + self.active = false; + self.errs.deinit(); + self.map.deinit(); + } + + fn clear(self: *DecompCompletion) void { + self.errs.clearAndFree(); + self.map.clearAndFree(); + } + + fn add(self: *DecompCompletion, idx: usize, data: []u8) !void { + self.mut.lock(); + defer self.mut.unlock(); + try self.map.put(idx, data); + } + fn addErr(self: *DecompCompletion, err: anyerror) void { + self.errs.append(err) catch {}; + } + + fn getBlock(self: *DecompCompletion, idx: usize) ?[]u8 { + const res = self.map.fetchSwapRemove(idx); + if(res == null) return null; + return res.?.value; + } + fn hasErrs(self: DecompCompletion) bool{ + return self.errs.items.len > 0; } }; @@ -41,7 +70,7 @@ pub fn DataReader(comptime T: type) type { frag: ?[]u8 = null, - mut: std.Thread.Mutex = .{}, + completion: DecompCompletion, pub fn init(rdr: *SfsReader(T), inode: Inode) !Self { var sizes: []BlockSize = undefined; @@ -73,46 +102,43 @@ pub fn DataReader(comptime T: type) type { .sizes = sizes, .offsets = offsets, .files_size = file_size, + .completion = .init(rdr.alloc), }; } - pub fn deinit(self: Self) void { + pub fn deinit(self: *Self) void { self.alloc.free(self.offsets); + self.completion.deinit(); } - pub fn addFragment(self: Self, data: []u8) void { + pub fn addFragment(self: *Self, data: []u8) void { self.frag = data; } - pub fn writeTo(self: Self, wrt: anytype) !void { + pub fn writeTo(self: *Self, wrt: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); - var map: CompletionMap = .init(self.alloc); - defer map.deinit(); - var mut: std.Thread.Mutex = .{}; - var cond: std.Thread.Condition = .{}; - var errs: std.ArrayList(anyerror) = .init(self.alloc); - defer errs.deinit(); var write_thr = try std.Thread.spawn( .{ .allocator = self.alloc }, writeThread, - .{ self, wrt, &errs, &map, &mut, &cond, null, null }, + .{ self, wrt, null, null }, ); + defer self.completion.clear(); for (0..self.numBlocks()) |i| { var thr = std.Thread.spawn( .{ .allocator = self.alloc }, decompThread, - .{ self, i, &errs, &map, &mut, &cond }, + .{ self, i }, ) catch |err| { - errs.append(err) catch {}; + self.completion.addErr(err); }; thr.detach(); } write_thr.join(); - if (errs.items.len > 0) return errs.items[0]; + if () return errs.items[0]; } pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); - var map: CompletionMap = .init(self.alloc); + var map: DecompCompletion = .init(self.alloc); errdefer map.deinit(); var mut = try self.alloc.create(std.Thread.Mutex); errdefer self.alloc.destroy(mut); @@ -139,7 +165,6 @@ pub fn DataReader(comptime T: type) type { thr.detach(); } } - fn writeToNoBlockFinish(comptime finish: anytype, finish_args: anytype) void {} fn numBlocks(self: Self) usize { var out = self.sizes.len; @@ -177,7 +202,7 @@ pub fn DataReader(comptime T: type) type { self: Self, wrt: anytype, errs: *std.ArrayList(anyerror), - map: *CompletionMap, + map: *DecompCompletion, mut: *std.Thread.Mutex, cond: *std.Thread.Condition, comptime finish: anytype, @@ -188,6 +213,7 @@ pub fn DataReader(comptime T: type) type { defer mut.unlock(); while (cur_idx < self.numBlocks() and errs.items.len == 0) { cond.wait(mut); + if (errs.items.len > 0) break; if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) { for (map.keys()) |k| { const blk = map.fetchSwapRemove(k).?.value; @@ -231,7 +257,7 @@ pub fn DataReader(comptime T: type) type { self: Self, idx: usize, errs: *std.ArrayList(anyerror), - map: *CompletionMap, + map: *DecompCompletion, mut: *std.Thread.Mutex, cond: *std.Thread.Condition, ) void {