Started re-work of data reader
This commit is contained in:
+57
-329
@@ -1,378 +1,106 @@
|
|||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
|
const Inode = @import("../inode.zig");
|
||||||
const PRead = @import("p_read.zig").PRead;
|
const PRead = @import("p_read.zig").PRead;
|
||||||
|
const SfsReader = @import("../reader.zig").SfsReader;
|
||||||
const FragEntry = @import("../fragment.zig").FragEntry;
|
const FragEntry = @import("../fragment.zig").FragEntry;
|
||||||
const BlockSize = @import("../inode/file.zig").BlockSize;
|
const BlockSize = @import("../inode/file.zig").BlockSize;
|
||||||
const Compression = @import("../superblock.zig").Compression;
|
const Compression = @import("../superblock.zig").Compression;
|
||||||
|
|
||||||
const DataReaderError = error{
|
const DataReaderError = error{
|
||||||
EOF,
|
EOF,
|
||||||
ThreadPoolNotSet,
|
|
||||||
InvalidIndex,
|
InvalidIndex,
|
||||||
};
|
};
|
||||||
|
|
||||||
const DataBlock = struct {
|
|
||||||
data: [1024 * 1024]u8, // Blocks can be up to 1MB in size.
|
|
||||||
len: usize,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn DataReader(comptime T: type) type {
|
pub fn DataReader(comptime T: type) type {
|
||||||
return struct {
|
return struct {
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
alloc: std.mem.Allocator,
|
alloc: std.mem.Allocator,
|
||||||
pool: ?*std.Thread.Pool = null,
|
|
||||||
|
|
||||||
rdr: PRead(T),
|
rdr: PRead(T),
|
||||||
comp: Compression,
|
comp: Compression,
|
||||||
offsets: []u64,
|
|
||||||
|
|
||||||
file_size: u64,
|
|
||||||
block_size: u32,
|
block_size: u32,
|
||||||
|
|
||||||
sizes: []BlockSize,
|
sizes: []BlockSize,
|
||||||
|
offsets: []u64,
|
||||||
|
file_size: u64,
|
||||||
|
|
||||||
frag: DataBlock = DataBlock{ .data = &[0]u8, .len = 0 },
|
frag: ?[]u8 = null,
|
||||||
|
|
||||||
read_block: DataBlock = DataBlock{ .data = &[0]u8, .len = 0 },
|
pub fn init(rdr: *SfsReader(T), inode: Inode) !Self {
|
||||||
read_offset: u64 = 0,
|
var sizes: []BlockSize = undefined;
|
||||||
read_idx: u32 = 0,
|
var file_size: u64 = 0;
|
||||||
|
var offsets: []u64 = undefined;
|
||||||
pub fn init(
|
switch (inode.data) {
|
||||||
alloc: std.mem.Allocator,
|
.file => |f| {
|
||||||
rdr: PRead(T),
|
sizes = f.block_sizes;
|
||||||
comp: Compression,
|
file_size = f.size;
|
||||||
init_offset: u64,
|
offsets = try rdr.alloc.alloc(u64, sizes.len);
|
||||||
file_size: u64,
|
if (sizes.len > 0) offsets[0] = f.block;
|
||||||
sizes: []BlockSize,
|
},
|
||||||
block_size: u32,
|
.ext_file => |f| {
|
||||||
) !Self {
|
sizes = f.block_sizes;
|
||||||
var cur_offset = init_offset;
|
file_size = f.size;
|
||||||
const offsets = try alloc.alloc(u64, sizes.len);
|
offsets = try rdr.alloc.alloc(u64, sizes.len);
|
||||||
for (0..sizes.len) |i| {
|
if (sizes.len > 0) offsets[0] = f.block;
|
||||||
offsets[i] = cur_offset;
|
},
|
||||||
cur_offset += sizes[i].size;
|
else => unreachable,
|
||||||
|
}
|
||||||
|
for (1..offsets.len) |i| {
|
||||||
|
offsets[i] = offsets[i - 1] + sizes[i - 1].size;
|
||||||
}
|
}
|
||||||
return .{
|
return .{
|
||||||
.alloc = alloc,
|
.alloc = rdr.alloc,
|
||||||
.rdr = rdr,
|
.rdr = rdr.rdr,
|
||||||
.comp = comp,
|
.comp = rdr.super.comp,
|
||||||
.offsets = offsets,
|
.block_size = rdr.super.block_size,
|
||||||
.file_size = file_size,
|
|
||||||
.block_size = block_size,
|
|
||||||
.sizes = sizes,
|
.sizes = sizes,
|
||||||
|
.offsets = offsets,
|
||||||
|
.files_size = file_size,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
pub fn deinit(self: Self) void {
|
pub fn deinit(self: Self) void {
|
||||||
self.alloc.free(self.offsets);
|
self.alloc.free(self.offsets);
|
||||||
self.alloc.free(self.frag);
|
|
||||||
if (self.read_idx < self.sizes.len) self.alloc.free(self.read_block);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn addFragment(self: *Self, entry: FragEntry, offset: u32) !void {
|
pub fn addFragment(self: Self, data: []u8) void {
|
||||||
self.frag.len = self.file_size % self.block_size;
|
self.frag = data;
|
||||||
errdefer self.frag.len = 0;
|
|
||||||
if (entry.size.size == 0) {
|
|
||||||
@memset(self.frag.data, 0);
|
|
||||||
return;
|
|
||||||
} else if (entry.size.uncompressed) {
|
|
||||||
_ = try self.rdr.pread(self.frag.data, entry.block + offset);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const block: [1024 * 1024]u8 = undefined;
|
|
||||||
_ = try self.comp.decompress(
|
|
||||||
1024 * 1024,
|
|
||||||
self.alloc,
|
|
||||||
self.rdr.readerAt(entry.block).reader(),
|
|
||||||
block,
|
|
||||||
);
|
|
||||||
@memcpy(self.frag.data, block[offset..]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setPool(self: *Self, pool: *std.Thread.Pool) void {
|
pub fn writeTo(self: Self, wrt: anytype) !void {
|
||||||
self.pool = pool;
|
comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite"));
|
||||||
}
|
}
|
||||||
|
pub fn writeToNoBlock(self: Self, wrt: anytype, comptime finish: anytype, finish_args: anytype) !void {
|
||||||
fn blockAt(self: Self, idx: usize) !DataBlock {
|
comptime std.debug.assert(std.meta.hasFn(@TypeOf(wrt), "write") or std.meta.hasFn(@TypeOf(wrt), "pwrite"));
|
||||||
if (self.frag.len > 0 and idx == self.sizes.len) return self.frag;
|
|
||||||
if (idx >= self.sizes.len) return DataReaderError.InvalidIndex;
|
|
||||||
const out: DataBlock = undefined;
|
|
||||||
out.len = blk: {
|
|
||||||
if (idx == self.sizes.len - 1 and self.frag.len == 0) {
|
|
||||||
break :blk self.file_size % self.block_size;
|
|
||||||
}
|
|
||||||
break :blk self.block_size;
|
|
||||||
};
|
|
||||||
if (self.sizes[idx].size == 0) {
|
|
||||||
@memset(out.data[0..out.len], 0);
|
|
||||||
return out;
|
|
||||||
} else if (self.sizes[idx].uncompressed) {
|
|
||||||
_ = try self.rdr.pread(out.data[0..out.len], self.offsets[idx]);
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
_ = try self.comp.decompress(
|
|
||||||
1024 * 1024,
|
|
||||||
self.alloc,
|
|
||||||
self.rdr.readerAt(self.offsets[idx]).reader(),
|
|
||||||
out.data[0..out.len],
|
|
||||||
);
|
|
||||||
return out;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn numBlocks(self: Self) usize {
|
fn numBlocks(self: Self) usize {
|
||||||
var out = self.sizes.len;
|
var out = self.sizes.len;
|
||||||
if (self.frag.len > 0) out += 1;
|
if (self.frag != null) out += 1;
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
const Reader = std.io.GenericReader(*Self, anyerror, read);
|
fn blockAt(self: Self, idx: usize) ![]u8 {
|
||||||
|
if (idx >= self.sizes.len) return DataReaderError.InvalidIndex;
|
||||||
pub fn read(self: *Self, buf: []u8) !usize {
|
const block = try self.alloc.alloc(u8, blk: {
|
||||||
var cur_red: usize = 0;
|
if (idx == self.numBlocks() - 1) break :blk self.file_size % self.block_size;
|
||||||
var to_read: usize = 0;
|
break :blk self.block_size;
|
||||||
while (cur_red < buf.len) {
|
});
|
||||||
if (self.read_offset >= self.read_block.len) {
|
if (idx == self.sizes.len and self.frag != null) {
|
||||||
if (self.read_idx == self.sizes.len or (self.frag.len == 0 and self.read_idx == self.sizes.len - 1)) {
|
@memcpy(block, self.frag.?);
|
||||||
self.block_size = self.file_size % self.block_size;
|
|
||||||
}
|
|
||||||
self.read_block = self.blockAt(self.read_idx) catch |err| {
|
|
||||||
if (err == DataReaderError.EOF) return cur_red;
|
|
||||||
return err;
|
|
||||||
};
|
|
||||||
self.read_idx += 1;
|
|
||||||
}
|
|
||||||
to_read = @min(buf.len - cur_red, self.block_size - self.read_offset);
|
|
||||||
@memcpy(buf[cur_red .. cur_red + to_read], self.read_block.data[self.read_offset .. self.read_offset + to_read]);
|
|
||||||
cur_red += to_read;
|
|
||||||
self.read_offset += to_read;
|
|
||||||
}
|
|
||||||
return cur_red;
|
|
||||||
}
|
|
||||||
pub fn reader(self: *Self) Reader {
|
|
||||||
return .{ .context = self };
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write the entire file's contents to the writer using multiple threads.
|
|
||||||
/// If availble, pwrite will be used.
|
|
||||||
pub fn writeTo(self: Self, writer: anytype) !usize {
|
|
||||||
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
|
||||||
var mut: std.Thread.Mutex = .{};
|
|
||||||
var cur_idx: usize = 0;
|
|
||||||
var wg: std.Thread.WaitGroup = .{};
|
|
||||||
var completed: std.AutoHashMap(usize, DataBlock) = .init(self.alloc);
|
|
||||||
defer completed.deinit();
|
|
||||||
var errs: std.ArrayList(anyerror) = .init(self.alloc);
|
|
||||||
defer errs.deinit();
|
|
||||||
for (0..self.numBlocks()) |i| {
|
|
||||||
wg.start();
|
|
||||||
self.pool.?.spawn(
|
|
||||||
comptime blk: {
|
|
||||||
if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
|
||||||
break :blk writeToThreadPWrite;
|
|
||||||
}
|
|
||||||
break :blk writeToThread;
|
|
||||||
},
|
|
||||||
blk: {
|
|
||||||
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
|
||||||
break :blk .{ self, &wg, &errs, i, writer };
|
|
||||||
}
|
|
||||||
break :blk .{ self, &wg, &mut, &cur_idx, &errs, &completed, i, writer };
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
wg.wait();
|
|
||||||
if (errs.items.len > 0) return errs.items[0];
|
|
||||||
return self.file_size;
|
|
||||||
}
|
|
||||||
/// Similiar to writeTo, but does not block until finished.
|
|
||||||
/// Calls on_finish when all blocks have been written.
|
|
||||||
pub fn writeToNoBlock(
|
|
||||||
self: Self,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
writer: anytype,
|
|
||||||
comptime on_finish: anytype,
|
|
||||||
on_finish_args: anytype,
|
|
||||||
) !void {
|
|
||||||
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
|
||||||
if (self.numBlocks() == 0) {
|
|
||||||
@call(.auto, on_finish, on_finish_args);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
var mut: std.Thread.Mutex = .{};
|
if (self.sizes[idx].uncompressed) {
|
||||||
var cur_idx: usize = 0;
|
_ = try self.rdr.pread(block, self.offsets[idx]);
|
||||||
var block_wg = try self.alloc.create(std.Thread.WaitGroup);
|
|
||||||
block_wg.* = .{};
|
|
||||||
const finish_mut = try self.alloc.create(std.Thread.Mutex);
|
|
||||||
finish_mut.* = .{};
|
|
||||||
var completed: ?std.AutoHashMap(usize, DataBlock) = null;
|
|
||||||
if (!comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
|
||||||
completed = std.AutoHashMap(usize, DataBlock).init(self.alloc);
|
|
||||||
}
|
|
||||||
block_wg.startMany(self.numBlocks());
|
|
||||||
for (0..self.numBlocks()) |i| {
|
|
||||||
var thr = try std.Thread.spawn(
|
|
||||||
.{ .allocator = self.alloc },
|
|
||||||
comptime blk: {
|
|
||||||
if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
|
||||||
break :blk noBlockThreadPWrite;
|
|
||||||
}
|
|
||||||
break :blk noBlockThread;
|
|
||||||
},
|
|
||||||
blk: {
|
|
||||||
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
|
||||||
break :blk .{ self, block_wg, errs, i, writer, finish_mut, on_finish, on_finish_args };
|
|
||||||
} else {
|
|
||||||
break :blk .{ self, block_wg, &mut, &cur_idx, errs, &completed.?, i, writer, finish_mut, on_finish, on_finish_args };
|
|
||||||
}
|
|
||||||
},
|
|
||||||
);
|
|
||||||
thr.detach();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn writeBlockTo(
|
|
||||||
self: Self,
|
|
||||||
mut: *std.Thread.Mutex,
|
|
||||||
cur_idx: *usize,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
completed: *std.AutoHashMap(usize, DataBlock),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
) void {
|
|
||||||
//TODO: We can marginally reduce memory usage if we don't store sparse blocks in completed.
|
|
||||||
if (errs.items.len > 0) return; // Indicates an error has occured in another thread.
|
|
||||||
const block = self.blockAt(idx) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
return;
|
||||||
};
|
|
||||||
defer if (idx < self.sizes.len) {
|
|
||||||
self.alloc.free(block);
|
|
||||||
};
|
|
||||||
mut.lock();
|
|
||||||
defer mut.unlock();
|
|
||||||
if (cur_idx.* == idx) {
|
|
||||||
_ = writer.write(block) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
completed.put(idx, block) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if (completed.count() == 0) return;
|
|
||||||
for (cur_idx.*..self.numBlocks()) |i| {
|
|
||||||
const val = completed.get(i);
|
|
||||||
if (val == null) return;
|
|
||||||
_ = writer.write(val.?) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
_ = completed.remove(i);
|
|
||||||
cur_idx.* += 1;
|
|
||||||
if (completed.count() == 0) return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn writeBlockToPWrite(
|
|
||||||
self: Self,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
) void {
|
|
||||||
if (errs.items.len > 0) return;
|
|
||||||
if (idx < self.sizes.len and self.sizes[idx].size == 0) {
|
|
||||||
var pos = idx * self.block_size;
|
|
||||||
if (self.frag.len == 0 and idx == self.sizes.len - 1) {
|
|
||||||
pos += self.file_size % self.block_size;
|
|
||||||
} else {
|
|
||||||
pos += self.block_size;
|
|
||||||
}
|
|
||||||
_ = writer.pwrite(&[1]u8{0}, pos - 1) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
const block = self.blockAt(idx) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
defer if (idx < self.sizes.len) {
|
|
||||||
self.alloc.free(block);
|
|
||||||
};
|
|
||||||
_ = writer.pwrite(block, idx * self.block_size) catch |err| {
|
|
||||||
errs.append(err) catch {};
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn writeToThread(
|
|
||||||
self: Self,
|
|
||||||
wg: *std.Thread.WaitGroup,
|
|
||||||
mut: *std.Thread.Mutex,
|
|
||||||
cur_idx: *usize,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
completed: *std.AutoHashMap(usize, DataBlock),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
) void {
|
|
||||||
self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer);
|
|
||||||
wg.finish();
|
|
||||||
}
|
|
||||||
fn writeToThreadPWrite(
|
|
||||||
self: Self,
|
|
||||||
wg: *std.Thread.WaitGroup,
|
|
||||||
errs: std.ArrayList(anyerror),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
) void {
|
|
||||||
self.writeBlockToPWrite(errs, idx, writer);
|
|
||||||
wg.finish();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn noBlockThread(
|
|
||||||
self: Self,
|
|
||||||
block_wg: *std.Thread.WaitGroup,
|
|
||||||
mut: *std.Thread.Mutex,
|
|
||||||
cur_idx: *usize,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
completed: *std.AutoHashMap(usize, DataBlock),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
finish_mut: *std.Thread.Mutex,
|
|
||||||
comptime on_finish: anytype,
|
|
||||||
on_finish_args: anytype,
|
|
||||||
) void {
|
|
||||||
self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer);
|
|
||||||
finish_mut.lock();
|
|
||||||
block_wg.finish();
|
|
||||||
defer finish_mut.unlock();
|
|
||||||
if (block_wg.isDone()) {
|
|
||||||
@call(.auto, on_finish, on_finish_args);
|
|
||||||
completed.deinit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn noBlockThreadPWrite(
|
|
||||||
self: Self,
|
|
||||||
block_wg: *std.Thread.WaitGroup,
|
|
||||||
errs: *std.ArrayList(anyerror),
|
|
||||||
idx: usize,
|
|
||||||
writer: anytype,
|
|
||||||
finish_mut: *std.Thread.Mutex,
|
|
||||||
comptime on_finish: anytype,
|
|
||||||
on_finish_args: anytype,
|
|
||||||
) void {
|
|
||||||
self.writeBlockToPWrite(errs, idx, writer);
|
|
||||||
finish_mut.lock();
|
|
||||||
block_wg.finish();
|
|
||||||
const isDone = block_wg.isDone();
|
|
||||||
defer {
|
|
||||||
finish_mut.unlock();
|
|
||||||
if (isDone) self.alloc.destroy(finish_mut);
|
|
||||||
}
|
|
||||||
if (isDone) {
|
|
||||||
self.alloc.destroy(block_wg);
|
|
||||||
@call(.auto, on_finish, on_finish_args);
|
|
||||||
}
|
}
|
||||||
|
_ = try self.comp.decompress(
|
||||||
|
1024 * 1024,
|
||||||
|
self.alloc,
|
||||||
|
self.rdr.readerAt(self.offsets[idx]),
|
||||||
|
block,
|
||||||
|
);
|
||||||
|
return block;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user