diff --git a/src/reader/data.zig b/src/reader/data.zig index d71d924..2d67237 100644 --- a/src/reader/data.zig +++ b/src/reader/data.zig @@ -5,6 +5,11 @@ const FragEntry = @import("../fragment.zig").FragEntry; const BlockSize = @import("../inode/file.zig").BlockSize; const Compression = @import("../superblock.zig").Compression; +const DataReaderError = error{ + EOF, + ThreadPoolNotSet, +}; + pub fn DataReader(comptime T: type) type { return struct { const Self = @This(); @@ -77,11 +82,14 @@ pub fn DataReader(comptime T: type) type { ); @memcpy(self.frag, block[offset..]); } + pub fn setPool(self: *Self, pool: *std.Thread.Pool) void { self.pool = pool; } fn blockAt(self: Self, idx: u32) ![]u8 { + if (self.frag.len > 0 and idx == self.sizes.len) return self.frag; + if (idx >= self.sizes.len) return DataReaderError.InvalidIndex; const size = if (idx == self.sizes.len - 1 and self.frag.len == 0) { self.file_size % self.block_size; } else { @@ -107,20 +115,135 @@ pub fn DataReader(comptime T: type) type { return block; } + fn numBlocks(self: Self) usize { + var out = self.sizes.len; + if (self.frag.len > 0) out += 1; + return out; + } + pub fn read(self: *Self, buf: []u8) !usize { var cur_red: usize = 0; + var to_read: usize = 0; while (cur_red < buf.len) { if (self.read_offset >= self.read_block.len) { - //TODO: + if (self.read_idx == self.sizes.len or (self.frag.len == 0 and self.read_idx == self.sizes.len - 1)) { + self.block_size = self.file_size % self.block_size; + } + self.read_block = self.blockAt(self.read_idx) catch |err| { + if (err == DataReaderError.EOF) return cur_red; + return err; + }; + self.read_idx += 1; } - //TODO: + to_read = @min(buf.len - cur_red, self.block_size - self.read_offset); + @memcpy(buf[cur_red .. cur_red + to_read], self.read_block[self.read_offset .. self.read_offset + to_read]); + cur_red += to_read; + self.read_offset += to_read; } return cur_red; } /// Write the entire file's contents to the writer. /// If availble, pwrite will be used. - /// If a thread pool is not set via setPool, one is created based on cpu thread count. - pub fn writeTo(self: Self, writer: anytype) !usize {} + pub fn writeTo(self: Self, writer: anytype) !usize { + if (comptime self.pool == null) return DataReaderError.ThreadPoolNotSet; + const mut: std.Thread.Mutex = .{}; + var cur_idx: usize = 0; + const wg: std.Thread.WaitGroup = .{}; + const completed = comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) { + std.ArrayList(anyerror).init(self.alloc); + } else { + std.AutoArrayHashMap(usize, anyerror![]u8).init(self.alloc); + }; + defer completed.deinit(); + for (0..self.numBlocks()) |i| { + wg.start(); + self.pool.?.spawn( + comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) { + extractThreadedPWrite; + } else { + extractThreaded; + }, + comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) { + .{ self, &wg, &completed, i, writer }; + } else { + .{ self, &mut, &cur_idx, &wg, &completed, i, writer }; + }, + ); + } + wg.wait(); + if (completed.items.len > 0) { + return completed.items.get(0); + } + return self.file_size; + } + fn extractThreaded( + self: Self, + mut: *std.Thread.Mutex, + cur_idx: *usize, + wg: *std.Thread.WaitGroup, + completed: *std.AutoArrayHashMap(usize, anyerror![]u8), + idx: usize, + writer: anytype, + ) void { + if (cur_idx.* >= self.sizes.len + 1) return; + defer wg.finish(); + const block = self.blockAt(idx) catch |err| { + cur_idx.* = self.sizes.len + 1; + completed.put(idx, err) catch {}; + return; + }; + defer if (idx < self.sizes.len) { + self.alloc.free(block); + }; + mut.lock(); + defer mut.unlock(); + if (cur_idx.* == idx) { + _ = writer.write(block) catch |err| { + cur_idx.* = self.sizes.len + 1; + completed.put(idx, err) catch {}; + return; + }; + } else { + completed.put(idx, block) catch |err| { + cur_idx.* = self.sizes.len + 1; + completed.put(idx, err) catch {}; + return; + }; + } + if (completed.count() == 0) return; + for (cur_idx.*..self.numBlocks()) |i| { + const val = completed.get(i); + if (val == null) return; + _ = writer.write(block) catch |err| { + cur_idx.* = self.sizes.len + 1; + completed.put(i, err) catch {}; + return; + }; + cur_idx.* += 1; + if (completed.count() == 0) return; + } + } + fn extractThreadedPWrite( + self: Self, + wg: *std.Thread.WaitGroup, + completed: *std.ArrayList(anyerror), + idx: usize, + writer: anytype, + ) void { + if (completed.items.len > 0) return; + defer wg.finish(); + const block = self.blockAt(idx) catch |err| { + completed.append(err) catch {}; + return; + }; + defer if (idx < self.sizes.len) { + self.alloc.free(block); + }; + _ = writer.pwrite(idx * self.block_size, block) catch |err| { + completed.append(err) catch {}; + return; + }; + } }; }