diff --git a/src/reader/data.zig b/src/reader/data.zig index ce0826f..84b7565 100644 --- a/src/reader/data.zig +++ b/src/reader/data.zig @@ -7,6 +7,8 @@ const FragEntry = @import("../fragment.zig").FragEntry; const BlockSize = @import("../inode/file.zig").BlockSize; const Compression = @import("../superblock.zig").Compression; +const CompletionMap = std.ArrayHashMap(usize, []u8); + const DataReaderError = error{ EOF, InvalidIndex, @@ -69,9 +71,22 @@ pub fn DataReader(comptime T: type) type { pub fn writeTo(self: Self, wrt: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); + var wg: std.Thread.WaitGroup = .{}; + wg.startMany(self.numBlocks()); + var map: CompletionMap = .init(self.alloc); + defer map.deinit(); + var mut: std.Thread.Mutex = .{}; + var cond: std.Thread.Condition = .{}; + std.Thread.spawn(.{ .allocator = self.alloc }, writeThread, .{ self, wrt, &map, &mut, &cond, null, null }); + for (0..self.numBlocks()) |i| {} + wg.wait(); } pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void { comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); + _ = self; + _ = finish; + _ = finish_args; + return error{TODO}.TODO; } fn numBlocks(self: Self) usize { @@ -79,9 +94,12 @@ pub fn DataReader(comptime T: type) type { if (self.frag != null) out += 1; return out; } - + /// Returns the decompressed data block at the given idx. + /// If the block is sparse (filled with 0s), a zero length slice is returned. fn blockAt(self: Self, idx: usize) ![]u8 { - if (idx >= self.sizes.len) return DataReaderError.InvalidIndex; + if (idx >= self.numBlocks()) return DataReaderError.InvalidIndex; + const size = self.sizes[idx]; + if (size.size == 0) return &[0]u8{}; const block = try self.alloc.alloc(u8, blk: { if (idx == self.numBlocks() - 1) break :blk self.file_size % self.block_size; break :blk self.block_size; @@ -90,7 +108,7 @@ pub fn DataReader(comptime T: type) type { @memcpy(block, self.frag.?); return; } - if (self.sizes[idx].uncompressed) { + if (size.uncompressed) { _ = try self.rdr.pread(block, self.offsets[idx]); return; } @@ -102,5 +120,60 @@ pub fn DataReader(comptime T: type) type { ); return block; } + + fn writeThread( + self: Self, + wrt: anytype, + map: *CompletionMap, + mut: *std.Thread.Mutex, + cond: *std.Thread.Condition, + comptime finish: anytype, + finish_args: anytype, + ) void { + var cur_idx: usize = 0; + mut.lock(); + defer mut.unlock(); + while (cur_idx < self.numBlocks()) { + cond.wait(mut); + if (comptime std.meta.hasFn(@TypeOf(wrt), "pwrite")) { + for (map.keys()) |k| { + const blk = map.fetchSwapRemove(k).?.value; + defer self.alloc.free(blk); + if (blk.len > 0) { + _ = wrt.pwrite(map.fetchSwapRemove(k).?.value, self.block_size * k) catch |err| { + std.debug.print("ERROR: {}\n", .{err}); + //TODO: handle properly. + }; + } else { + _ = wrt.pwrite(&[1]u8{0}, (self.block_size * (k + 1)) - 1) catch |err| { + std.debug.print("ERROR: {}\n", .{err}); + //TODO: handle properly. + }; + } + cur_idx += 1; + } + continue; + } + while (map.contains(cur_idx)) { + const blk = map.fetchSwapRemove(cur_idx).?.value; + defer self.alloc.free(blk); + if (blk.len > 0) { + _ = wrt.write(blk) catch |err| { + std.debug.print("ERROR: {}\n", .{err}); + //TODO: handle properly. + }; + } + cur_idx += 1; + } + } + if (comptime @TypeOf(finish) != @TypeOf(null) and @TypeOf(finish_args) != @TypeOf(null)) @call(.auto, finish, finish_args); + } + fn decompThread( + self: Self, + idx: usize, + map: *CompletionMap, + mut: *std.Thread.Mutex, + cond: *std.Thread.Condition, + ) void {} }; }