From 13f92f2e83c7156ff8f42a16559428fdb6f52354 Mon Sep 17 00:00:00 2001 From: Caleb Gardner Date: Wed, 30 Jul 2025 20:54:11 -0500 Subject: [PATCH] Re-writing data reader. Again. --- src/reader/data.zig | 91 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 17 deletions(-) diff --git a/src/reader/data.zig b/src/reader/data.zig index 84b7565..62eee6f 100644 --- a/src/reader/data.zig +++ b/src/reader/data.zig @@ -71,23 +71,60 @@ pub fn DataReader(comptime T: type) type { pub fn writeTo(self: Self, wrt: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); - var wg: std.Thread.WaitGroup = .{}; - wg.startMany(self.numBlocks()); var map: CompletionMap = .init(self.alloc); defer map.deinit(); var mut: std.Thread.Mutex = .{}; var cond: std.Thread.Condition = .{}; - std.Thread.spawn(.{ .allocator = self.alloc }, writeThread, .{ self, wrt, &map, &mut, &cond, null, null }); - for (0..self.numBlocks()) |i| {} - wg.wait(); + var errs: std.ArrayList(anyerror) = .init(self.alloc); + defer errs.deinit(); + var write_thr = try std.Thread.spawn( + .{ .allocator = self.alloc }, + writeThread, + .{ self, wrt, &errs, &map, &mut, &cond, null, null }, + ); + for (0..self.numBlocks()) |i| { + var thr = std.Thread.spawn( + .{ .allocator = self.alloc }, + decompThread, + .{ self, i, &errs, &map, &mut, &cond }, + ) catch |err| { + errs.append(err) catch {}; + }; + thr.detach(); + } + write_thr.join(); + if (errs.items.len > 0) return errs.items[0]; } pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); - _ = self; - _ = finish; - _ = finish_args; - return error{TODO}.TODO; + var map: CompletionMap = .init(self.alloc); + errdefer map.deinit(); + var mut = try self.alloc.create(std.Thread.Mutex); + errdefer self.alloc.destroy(mut); + mut.* = .{}; + var cond = try self.alloc.create(std.Thread.Condition); + errdefer self.alloc.destroy(cond); + cond.* = .{}; + var errs: std.ArrayList(anyerror) = .init(self.alloc); + errdefer errs.deinit(); + var write_thr = try std.Thread.spawn( + .{ .allocator = self.alloc }, + writeThread, + .{ self, wrt, &errs, &map, &mut, &cond, finish, finish_args }, + ); + write_thr.detach(); + for (0..self.numBlocks()) |i| { + var thr = std.Thread.spawn( + .{ .allocator = self.alloc }, + decompThread, + .{ self, i, &errs, &map, &mut, &cond }, + ) catch |err| { + errs.append(err) catch {}; + }; + thr.detach(); + } } + fn writeToNoBlockFinish(comptime finish: anytype, finish_args: anytype) void {} fn numBlocks(self: Self) usize { var out = self.sizes.len; @@ -124,6 +161,7 @@ pub fn DataReader(comptime T: type) type { fn writeThread( self: Self, wrt: anytype, + errs: *std.ArrayList(anyerror), map: *CompletionMap, mut: *std.Thread.Mutex, cond: *std.Thread.Condition, @@ -133,7 +171,7 @@ pub fn DataReader(comptime T: type) type { var cur_idx: usize = 0; mut.lock(); defer mut.unlock(); - while (cur_idx < self.numBlocks()) { + while (cur_idx < self.numBlocks() and errs.items.len == 0) { cond.wait(mut); if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) { for (map.keys()) |k| { @@ -141,13 +179,13 @@ pub fn DataReader(comptime T: type) type { defer self.alloc.free(blk); if (blk.len > 0) { _ = wrt.pwrite(map.fetchSwapRemove(k).?.value, self.block_size * k) catch |err| { - std.debug.print("ERROR: {}\n", .{err}); - //TODO: handle properly. + errs.append(err) catch {}; + break; }; } else { _ = wrt.pwrite(&[1]u8{0}, (self.block_size * (k + 1)) - 1) catch |err| { - std.debug.print("ERROR: {}\n", .{err}); - //TODO: handle properly. + errs.append(err) catch {}; + break; }; } cur_idx += 1; @@ -159,8 +197,14 @@ pub fn DataReader(comptime T: type) type { defer self.alloc.free(blk); if (blk.len > 0) { _ = wrt.write(blk) catch |err| { - std.debug.print("ERROR: {}\n", .{err}); - //TODO: handle properly. + errs.append(err) catch {}; + break; + }; + } else { + const blank: [1024 * 1024]u8 = [1]u8{0} ** (1024 * 1024); + _ = wrt.write(blank[0..self.block_size]) catch |err| { + errs.append(err) catch {}; + break; }; } cur_idx += 1; @@ -171,9 +215,22 @@ pub fn DataReader(comptime T: type) type { fn decompThread( self: Self, idx: usize, + errs: *std.ArrayList(anyerror), map: *CompletionMap, mut: *std.Thread.Mutex, cond: *std.Thread.Condition, - ) void {} + ) void { + if (errs.items.len > 0) return; + const block = self.blockAt(idx) catch |err| { + errs.append(err) catch {}; + return; + }; + mut.lock(); + defer mut.unlock(); + map.put(idx, block) catch |err| { + errs.append(err) catch {}; + }; + cond.signal(); + } }; }