Re-writing data reader. Again.

This commit is contained in:
Caleb Gardner
2025-07-30 20:54:11 -05:00
parent 3fb95dd3fa
commit 13f92f2e83
+74 -17
View File
@@ -71,23 +71,60 @@ pub fn DataReader(comptime T: type) type {
pub fn writeTo(self: Self, wrt: anytype) !void { pub fn writeTo(self: Self, wrt: anytype) !void {
comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite"));
var wg: std.Thread.WaitGroup = .{};
wg.startMany(self.numBlocks());
var map: CompletionMap = .init(self.alloc); var map: CompletionMap = .init(self.alloc);
defer map.deinit(); defer map.deinit();
var mut: std.Thread.Mutex = .{}; var mut: std.Thread.Mutex = .{};
var cond: std.Thread.Condition = .{}; var cond: std.Thread.Condition = .{};
std.Thread.spawn(.{ .allocator = self.alloc }, writeThread, .{ self, wrt, &map, &mut, &cond, null, null }); var errs: std.ArrayList(anyerror) = .init(self.alloc);
for (0..self.numBlocks()) |i| {} defer errs.deinit();
wg.wait(); var write_thr = try std.Thread.spawn(
.{ .allocator = self.alloc },
writeThread,
.{ self, wrt, &errs, &map, &mut, &cond, null, null },
);
for (0..self.numBlocks()) |i| {
var thr = std.Thread.spawn(
.{ .allocator = self.alloc },
decompThread,
.{ self, i, &errs, &map, &mut, &cond },
) catch |err| {
errs.append(err) catch {};
};
thr.detach();
}
write_thr.join();
if (errs.items.len > 0) return errs.items[0];
} }
pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void { pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void {
comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite"));
_ = self; var map: CompletionMap = .init(self.alloc);
_ = finish; errdefer map.deinit();
_ = finish_args; var mut = try self.alloc.create(std.Thread.Mutex);
return error{TODO}.TODO; errdefer self.alloc.destroy(mut);
mut.* = .{};
var cond = try self.alloc.create(std.Thread.Condition);
errdefer self.alloc.destroy(cond);
cond.* = .{};
var errs: std.ArrayList(anyerror) = .init(self.alloc);
errdefer errs.deinit();
var write_thr = try std.Thread.spawn(
.{ .allocator = self.alloc },
writeThread,
.{ self, wrt, &errs, &map, &mut, &cond, finish, finish_args },
);
write_thr.detach();
for (0..self.numBlocks()) |i| {
var thr = std.Thread.spawn(
.{ .allocator = self.alloc },
decompThread,
.{ self, i, &errs, &map, &mut, &cond },
) catch |err| {
errs.append(err) catch {};
};
thr.detach();
}
} }
fn writeToNoBlockFinish(comptime finish: anytype, finish_args: anytype) void {}
fn numBlocks(self: Self) usize { fn numBlocks(self: Self) usize {
var out = self.sizes.len; var out = self.sizes.len;
@@ -124,6 +161,7 @@ pub fn DataReader(comptime T: type) type {
fn writeThread( fn writeThread(
self: Self, self: Self,
wrt: anytype, wrt: anytype,
errs: *std.ArrayList(anyerror),
map: *CompletionMap, map: *CompletionMap,
mut: *std.Thread.Mutex, mut: *std.Thread.Mutex,
cond: *std.Thread.Condition, cond: *std.Thread.Condition,
@@ -133,7 +171,7 @@ pub fn DataReader(comptime T: type) type {
var cur_idx: usize = 0; var cur_idx: usize = 0;
mut.lock(); mut.lock();
defer mut.unlock(); defer mut.unlock();
while (cur_idx < self.numBlocks()) { while (cur_idx < self.numBlocks() and errs.items.len == 0) {
cond.wait(mut); cond.wait(mut);
if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) { if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) {
for (map.keys()) |k| { for (map.keys()) |k| {
@@ -141,13 +179,13 @@ pub fn DataReader(comptime T: type) type {
defer self.alloc.free(blk); defer self.alloc.free(blk);
if (blk.len > 0) { if (blk.len > 0) {
_ = wrt.pwrite(map.fetchSwapRemove(k).?.value, self.block_size * k) catch |err| { _ = wrt.pwrite(map.fetchSwapRemove(k).?.value, self.block_size * k) catch |err| {
std.debug.print("ERROR: {}\n", .{err}); errs.append(err) catch {};
//TODO: handle properly. break;
}; };
} else { } else {
_ = wrt.pwrite(&[1]u8{0}, (self.block_size * (k + 1)) - 1) catch |err| { _ = wrt.pwrite(&[1]u8{0}, (self.block_size * (k + 1)) - 1) catch |err| {
std.debug.print("ERROR: {}\n", .{err}); errs.append(err) catch {};
//TODO: handle properly. break;
}; };
} }
cur_idx += 1; cur_idx += 1;
@@ -159,8 +197,14 @@ pub fn DataReader(comptime T: type) type {
defer self.alloc.free(blk); defer self.alloc.free(blk);
if (blk.len > 0) { if (blk.len > 0) {
_ = wrt.write(blk) catch |err| { _ = wrt.write(blk) catch |err| {
std.debug.print("ERROR: {}\n", .{err}); errs.append(err) catch {};
//TODO: handle properly. break;
};
} else {
const blank: [1024 * 1024]u8 = [1]u8{0} ** (1024 * 1024);
_ = wrt.write(blank[0..self.block_size]) catch |err| {
errs.append(err) catch {};
break;
}; };
} }
cur_idx += 1; cur_idx += 1;
@@ -171,9 +215,22 @@ pub fn DataReader(comptime T: type) type {
fn decompThread( fn decompThread(
self: Self, self: Self,
idx: usize, idx: usize,
errs: *std.ArrayList(anyerror),
map: *CompletionMap, map: *CompletionMap,
mut: *std.Thread.Mutex, mut: *std.Thread.Mutex,
cond: *std.Thread.Condition, cond: *std.Thread.Condition,
) void {} ) void {
if (errs.items.len > 0) return;
const block = self.blockAt(idx) catch |err| {
errs.append(err) catch {};
return;
};
mut.lock();
defer mut.unlock();
map.put(idx, block) catch |err| {
errs.append(err) catch {};
};
cond.signal();
}
}; };
} }