Finished (?) data reader
This commit is contained in:
+127
-4
@@ -5,6 +5,11 @@ const FragEntry = @import("../fragment.zig").FragEntry;
|
||||
const BlockSize = @import("../inode/file.zig").BlockSize;
|
||||
const Compression = @import("../superblock.zig").Compression;
|
||||
|
||||
const DataReaderError = error{
|
||||
EOF,
|
||||
ThreadPoolNotSet,
|
||||
};
|
||||
|
||||
pub fn DataReader(comptime T: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
@@ -77,11 +82,14 @@ pub fn DataReader(comptime T: type) type {
|
||||
);
|
||||
@memcpy(self.frag, block[offset..]);
|
||||
}
|
||||
|
||||
pub fn setPool(self: *Self, pool: *std.Thread.Pool) void {
|
||||
self.pool = pool;
|
||||
}
|
||||
|
||||
fn blockAt(self: Self, idx: u32) ![]u8 {
|
||||
if (self.frag.len > 0 and idx == self.sizes.len) return self.frag;
|
||||
if (idx >= self.sizes.len) return DataReaderError.InvalidIndex;
|
||||
const size = if (idx == self.sizes.len - 1 and self.frag.len == 0) {
|
||||
self.file_size % self.block_size;
|
||||
} else {
|
||||
@@ -107,20 +115,135 @@ pub fn DataReader(comptime T: type) type {
|
||||
return block;
|
||||
}
|
||||
|
||||
fn numBlocks(self: Self) usize {
|
||||
var out = self.sizes.len;
|
||||
if (self.frag.len > 0) out += 1;
|
||||
return out;
|
||||
}
|
||||
|
||||
pub fn read(self: *Self, buf: []u8) !usize {
|
||||
var cur_red: usize = 0;
|
||||
var to_read: usize = 0;
|
||||
while (cur_red < buf.len) {
|
||||
if (self.read_offset >= self.read_block.len) {
|
||||
//TODO:
|
||||
if (self.read_idx == self.sizes.len or (self.frag.len == 0 and self.read_idx == self.sizes.len - 1)) {
|
||||
self.block_size = self.file_size % self.block_size;
|
||||
}
|
||||
self.read_block = self.blockAt(self.read_idx) catch |err| {
|
||||
if (err == DataReaderError.EOF) return cur_red;
|
||||
return err;
|
||||
};
|
||||
self.read_idx += 1;
|
||||
}
|
||||
//TODO:
|
||||
to_read = @min(buf.len - cur_red, self.block_size - self.read_offset);
|
||||
@memcpy(buf[cur_red .. cur_red + to_read], self.read_block[self.read_offset .. self.read_offset + to_read]);
|
||||
cur_red += to_read;
|
||||
self.read_offset += to_read;
|
||||
}
|
||||
return cur_red;
|
||||
}
|
||||
|
||||
/// Write the entire file's contents to the writer.
|
||||
/// If availble, pwrite will be used.
|
||||
/// If a thread pool is not set via setPool, one is created based on cpu thread count.
|
||||
pub fn writeTo(self: Self, writer: anytype) !usize {}
|
||||
pub fn writeTo(self: Self, writer: anytype) !usize {
|
||||
if (comptime self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
||||
const mut: std.Thread.Mutex = .{};
|
||||
var cur_idx: usize = 0;
|
||||
const wg: std.Thread.WaitGroup = .{};
|
||||
const completed = comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
std.ArrayList(anyerror).init(self.alloc);
|
||||
} else {
|
||||
std.AutoArrayHashMap(usize, anyerror![]u8).init(self.alloc);
|
||||
};
|
||||
defer completed.deinit();
|
||||
for (0..self.numBlocks()) |i| {
|
||||
wg.start();
|
||||
self.pool.?.spawn(
|
||||
comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
extractThreadedPWrite;
|
||||
} else {
|
||||
extractThreaded;
|
||||
},
|
||||
comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
.{ self, &wg, &completed, i, writer };
|
||||
} else {
|
||||
.{ self, &mut, &cur_idx, &wg, &completed, i, writer };
|
||||
},
|
||||
);
|
||||
}
|
||||
wg.wait();
|
||||
if (completed.items.len > 0) {
|
||||
return completed.items.get(0);
|
||||
}
|
||||
return self.file_size;
|
||||
}
|
||||
fn extractThreaded(
|
||||
self: Self,
|
||||
mut: *std.Thread.Mutex,
|
||||
cur_idx: *usize,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
if (cur_idx.* >= self.sizes.len + 1) return;
|
||||
defer wg.finish();
|
||||
const block = self.blockAt(idx) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
return;
|
||||
};
|
||||
defer if (idx < self.sizes.len) {
|
||||
self.alloc.free(block);
|
||||
};
|
||||
mut.lock();
|
||||
defer mut.unlock();
|
||||
if (cur_idx.* == idx) {
|
||||
_ = writer.write(block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
return;
|
||||
};
|
||||
} else {
|
||||
completed.put(idx, block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
return;
|
||||
};
|
||||
}
|
||||
if (completed.count() == 0) return;
|
||||
for (cur_idx.*..self.numBlocks()) |i| {
|
||||
const val = completed.get(i);
|
||||
if (val == null) return;
|
||||
_ = writer.write(block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(i, err) catch {};
|
||||
return;
|
||||
};
|
||||
cur_idx.* += 1;
|
||||
if (completed.count() == 0) return;
|
||||
}
|
||||
}
|
||||
fn extractThreadedPWrite(
|
||||
self: Self,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
completed: *std.ArrayList(anyerror),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
if (completed.items.len > 0) return;
|
||||
defer wg.finish();
|
||||
const block = self.blockAt(idx) catch |err| {
|
||||
completed.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
defer if (idx < self.sizes.len) {
|
||||
self.alloc.free(block);
|
||||
};
|
||||
_ = writer.pwrite(idx * self.block_size, block) catch |err| {
|
||||
completed.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user