diff --git a/src/reader/data.zig b/src/reader/data.zig index e34cc70..ce0826f 100644 --- a/src/reader/data.zig +++ b/src/reader/data.zig @@ -1,378 +1,106 @@ const std = @import("std"); +const Inode = @import("../inode.zig"); const PRead = @import("p_read.zig").PRead; +const SfsReader = @import("../reader.zig").SfsReader; const FragEntry = @import("../fragment.zig").FragEntry; const BlockSize = @import("../inode/file.zig").BlockSize; const Compression = @import("../superblock.zig").Compression; const DataReaderError = error{ EOF, - ThreadPoolNotSet, InvalidIndex, }; -const DataBlock = struct { - data: [1024 * 1024]u8, // Blocks can be up to 1MB in size. - len: usize, -}; - pub fn DataReader(comptime T: type) type { return struct { const Self = @This(); alloc: std.mem.Allocator, - pool: ?*std.Thread.Pool = null, - rdr: PRead(T), comp: Compression, - offsets: []u64, - - file_size: u64, block_size: u32, + sizes: []BlockSize, + offsets: []u64, + file_size: u64, - frag: DataBlock = DataBlock{ .data = &[0]u8, .len = 0 }, + frag: ?[]u8 = null, - read_block: DataBlock = DataBlock{ .data = &[0]u8, .len = 0 }, - read_offset: u64 = 0, - read_idx: u32 = 0, - - pub fn init( - alloc: std.mem.Allocator, - rdr: PRead(T), - comp: Compression, - init_offset: u64, - file_size: u64, - sizes: []BlockSize, - block_size: u32, - ) !Self { - var cur_offset = init_offset; - const offsets = try alloc.alloc(u64, sizes.len); - for (0..sizes.len) |i| { - offsets[i] = cur_offset; - cur_offset += sizes[i].size; + pub fn init(rdr: *SfsReader(T), inode: Inode) !Self { + var sizes: []BlockSize = undefined; + var file_size: u64 = 0; + var offsets: []u64 = undefined; + switch (inode.data) { + .file => |f| { + sizes = f.block_sizes; + file_size = f.size; + offsets = try rdr.alloc.alloc(u64, sizes.len); + if (sizes.len > 0) offsets[0] = f.block; + }, + .ext_file => |f| { + sizes = f.block_sizes; + file_size = f.size; + offsets = try rdr.alloc.alloc(u64, sizes.len); + if (sizes.len > 0) offsets[0] = f.block; + }, + else => unreachable, + } + for (1..offsets.len) |i| { + offsets[i] = offsets[i - 1] + sizes[i - 1].size; } return .{ - .alloc = alloc, - .rdr = rdr, - .comp = comp, - .offsets = offsets, - .file_size = file_size, - .block_size = block_size, + .alloc = rdr.alloc, + .rdr = rdr.rdr, + .comp = rdr.super.comp, + .block_size = rdr.super.block_size, .sizes = sizes, + .offsets = offsets, + .files_size = file_size, }; } pub fn deinit(self: Self) void { self.alloc.free(self.offsets); - self.alloc.free(self.frag); - if (self.read_idx < self.sizes.len) self.alloc.free(self.read_block); } - pub fn addFragment(self: *Self, entry: FragEntry, offset: u32) !void { - self.frag.len = self.file_size % self.block_size; - errdefer self.frag.len = 0; - if (entry.size.size == 0) { - @memset(self.frag.data, 0); - return; - } else if (entry.size.uncompressed) { - _ = try self.rdr.pread(self.frag.data, entry.block + offset); - return; - } - const block: [1024 * 1024]u8 = undefined; - _ = try self.comp.decompress( - 1024 * 1024, - self.alloc, - self.rdr.readerAt(entry.block).reader(), - block, - ); - @memcpy(self.frag.data, block[offset..]); + pub fn addFragment(self: Self, data: []u8) void { + self.frag = data; } - pub fn setPool(self: *Self, pool: *std.Thread.Pool) void { - self.pool = pool; + pub fn writeTo(self: Self, wrt: anytype) !void { + comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); } - - fn blockAt(self: Self, idx: usize) !DataBlock { - if (self.frag.len > 0 and idx == self.sizes.len) return self.frag; - if (idx >= self.sizes.len) return DataReaderError.InvalidIndex; - const out: DataBlock = undefined; - out.len = blk: { - if (idx == self.sizes.len - 1 and self.frag.len == 0) { - break :blk self.file_size % self.block_size; - } - break :blk self.block_size; - }; - if (self.sizes[idx].size == 0) { - @memset(out.data[0..out.len], 0); - return out; - } else if (self.sizes[idx].uncompressed) { - _ = try self.rdr.pread(out.data[0..out.len], self.offsets[idx]); - return out; - } - _ = try self.comp.decompress( - 1024 * 1024, - self.alloc, - self.rdr.readerAt(self.offsets[idx]).reader(), - out.data[0..out.len], - ); - return out; + pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void { + comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite")); } fn numBlocks(self: Self) usize { var out = self.sizes.len; - if (self.frag.len > 0) out += 1; + if (self.frag != null) out += 1; return out; } - const Reader = std.io.GenericReader(*Self, anyerror, read); - - pub fn read(self: *Self, buf: []u8) !usize { - var cur_red: usize = 0; - var to_read: usize = 0; - while (cur_red < buf.len) { - if (self.read_offset >= self.read_block.len) { - if (self.read_idx == self.sizes.len or (self.frag.len == 0 and self.read_idx == self.sizes.len - 1)) { - self.block_size = self.file_size % self.block_size; - } - self.read_block = self.blockAt(self.read_idx) catch |err| { - if (err == DataReaderError.EOF) return cur_red; - return err; - }; - self.read_idx += 1; - } - to_read = @min(buf.len - cur_red, self.block_size - self.read_offset); - @memcpy(buf[cur_red .. cur_red + to_read], self.read_block.data[self.read_offset .. self.read_offset + to_read]); - cur_red += to_read; - self.read_offset += to_read; - } - return cur_red; - } - pub fn reader(self: *Self) Reader { - return .{ .context = self }; - } - - /// Write the entire file's contents to the writer using multiple threads. - /// If availble, pwrite will be used. - pub fn writeTo(self: Self, writer: anytype) !usize { - if (self.pool == null) return DataReaderError.ThreadPoolNotSet; - var mut: std.Thread.Mutex = .{}; - var cur_idx: usize = 0; - var wg: std.Thread.WaitGroup = .{}; - var completed: std.AutoHashMap(usize, DataBlock) = .init(self.alloc); - defer completed.deinit(); - var errs: std.ArrayList(anyerror) = .init(self.alloc); - defer errs.deinit(); - for (0..self.numBlocks()) |i| { - wg.start(); - self.pool.?.spawn( - comptime blk: { - if (std.meta.hasFn(@TypeOf(writer), "pwrite")) { - break :blk writeToThreadPWrite; - } - break :blk writeToThread; - }, - blk: { - if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) { - break :blk .{ self, &wg, &errs, i, writer }; - } - break :blk .{ self, &wg, &mut, &cur_idx, &errs, &completed, i, writer }; - }, - ); - } - wg.wait(); - if (errs.items.len > 0) return errs.items[0]; - return self.file_size; - } - /// Similiar to writeTo, but does not block until finished. - /// Calls on_finish when all blocks have been written. - pub fn writeToNoBlock( - self: Self, - errs: *std.ArrayList(anyerror), - writer: anytype, - comptime on_finish: anytype, - on_finish_args: anytype, - ) !void { - if (self.pool == null) return DataReaderError.ThreadPoolNotSet; - if (self.numBlocks() == 0) { - @call(.auto, on_finish, on_finish_args); + fn blockAt(self: Self, idx: usize) ![]u8 { + if (idx >= self.sizes.len) return DataReaderError.InvalidIndex; + const block = try self.alloc.alloc(u8, blk: { + if (idx == self.numBlocks() - 1) break :blk self.file_size % self.block_size; + break :blk self.block_size; + }); + if (idx == self.sizes.len and self.frag != null) { + @memcpy(block, self.frag.?); return; } - var mut: std.Thread.Mutex = .{}; - var cur_idx: usize = 0; - var block_wg = try self.alloc.create(std.Thread.WaitGroup); - block_wg.* = .{}; - const finish_mut = try self.alloc.create(std.Thread.Mutex); - finish_mut.* = .{}; - var completed: ?std.AutoHashMap(usize, DataBlock) = null; - if (!comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) { - completed = std.AutoHashMap(usize, DataBlock).init(self.alloc); - } - block_wg.startMany(self.numBlocks()); - for (0..self.numBlocks()) |i| { - var thr = try std.Thread.spawn( - .{ .allocator = self.alloc }, - comptime blk: { - if (std.meta.hasFn(@TypeOf(writer), "pwrite")) { - break :blk noBlockThreadPWrite; - } - break :blk noBlockThread; - }, - blk: { - if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) { - break :blk .{ self, block_wg, errs, i, writer, finish_mut, on_finish, on_finish_args }; - } else { - break :blk .{ self, block_wg, &mut, &cur_idx, errs, &completed.?, i, writer, finish_mut, on_finish, on_finish_args }; - } - }, - ); - thr.detach(); - } - } - - fn writeBlockTo( - self: Self, - mut: *std.Thread.Mutex, - cur_idx: *usize, - errs: *std.ArrayList(anyerror), - completed: *std.AutoHashMap(usize, DataBlock), - idx: usize, - writer: anytype, - ) void { - //TODO: We can marginally reduce memory usage if we don't store sparse blocks in completed. - if (errs.items.len > 0) return; // Indicates an error has occured in another thread. - const block = self.blockAt(idx) catch |err| { - errs.append(err) catch {}; + if (self.sizes[idx].uncompressed) { + _ = try self.rdr.pread(block, self.offsets[idx]); return; - }; - defer if (idx < self.sizes.len) { - self.alloc.free(block); - }; - mut.lock(); - defer mut.unlock(); - if (cur_idx.* == idx) { - _ = writer.write(block) catch |err| { - errs.append(err) catch {}; - return; - }; - } else { - completed.put(idx, block) catch |err| { - errs.append(err) catch {}; - return; - }; - } - if (completed.count() == 0) return; - for (cur_idx.*..self.numBlocks()) |i| { - const val = completed.get(i); - if (val == null) return; - _ = writer.write(val.?) catch |err| { - errs.append(err) catch {}; - return; - }; - _ = completed.remove(i); - cur_idx.* += 1; - if (completed.count() == 0) return; - } - } - fn writeBlockToPWrite( - self: Self, - errs: *std.ArrayList(anyerror), - idx: usize, - writer: anytype, - ) void { - if (errs.items.len > 0) return; - if (idx < self.sizes.len and self.sizes[idx].size == 0) { - var pos = idx * self.block_size; - if (self.frag.len == 0 and idx == self.sizes.len - 1) { - pos += self.file_size % self.block_size; - } else { - pos += self.block_size; - } - _ = writer.pwrite(&[1]u8{0}, pos - 1) catch |err| { - errs.append(err) catch {}; - }; - } else { - const block = self.blockAt(idx) catch |err| { - errs.append(err) catch {}; - return; - }; - defer if (idx < self.sizes.len) { - self.alloc.free(block); - }; - _ = writer.pwrite(block, idx * self.block_size) catch |err| { - errs.append(err) catch {}; - return; - }; - } - } - - fn writeToThread( - self: Self, - wg: *std.Thread.WaitGroup, - mut: *std.Thread.Mutex, - cur_idx: *usize, - errs: *std.ArrayList(anyerror), - completed: *std.AutoHashMap(usize, DataBlock), - idx: usize, - writer: anytype, - ) void { - self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer); - wg.finish(); - } - fn writeToThreadPWrite( - self: Self, - wg: *std.Thread.WaitGroup, - errs: std.ArrayList(anyerror), - idx: usize, - writer: anytype, - ) void { - self.writeBlockToPWrite(errs, idx, writer); - wg.finish(); - } - - fn noBlockThread( - self: Self, - block_wg: *std.Thread.WaitGroup, - mut: *std.Thread.Mutex, - cur_idx: *usize, - errs: *std.ArrayList(anyerror), - completed: *std.AutoHashMap(usize, DataBlock), - idx: usize, - writer: anytype, - finish_mut: *std.Thread.Mutex, - comptime on_finish: anytype, - on_finish_args: anytype, - ) void { - self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer); - finish_mut.lock(); - block_wg.finish(); - defer finish_mut.unlock(); - if (block_wg.isDone()) { - @call(.auto, on_finish, on_finish_args); - completed.deinit(); - } - } - fn noBlockThreadPWrite( - self: Self, - block_wg: *std.Thread.WaitGroup, - errs: *std.ArrayList(anyerror), - idx: usize, - writer: anytype, - finish_mut: *std.Thread.Mutex, - comptime on_finish: anytype, - on_finish_args: anytype, - ) void { - self.writeBlockToPWrite(errs, idx, writer); - finish_mut.lock(); - block_wg.finish(); - const isDone = block_wg.isDone(); - defer { - finish_mut.unlock(); - if (isDone) self.alloc.destroy(finish_mut); - } - if (isDone) { - self.alloc.destroy(block_wg); - @call(.auto, on_finish, on_finish_args); } + _ = try self.comp.decompress( + 1024 * 1024, + self.alloc, + self.rdr.readerAt(self.offsets[idx]), + block, + ); + return block; } }; }