Finished (?) file extraction
This commit is contained in:
+172
-58
@@ -8,6 +8,7 @@ const Compression = @import("../superblock.zig").Compression;
|
||||
const DataReaderError = error{
|
||||
EOF,
|
||||
ThreadPoolNotSet,
|
||||
InvalidIndex,
|
||||
};
|
||||
|
||||
pub fn DataReader(comptime T: type) type {
|
||||
@@ -85,13 +86,14 @@ pub fn DataReader(comptime T: type) type {
|
||||
self.pool = pool;
|
||||
}
|
||||
|
||||
fn blockAt(self: Self, idx: u32) ![]u8 {
|
||||
fn blockAt(self: Self, idx: usize) ![]u8 {
|
||||
if (self.frag.len > 0 and idx == self.sizes.len) return self.frag;
|
||||
if (idx >= self.sizes.len) return DataReaderError.InvalidIndex;
|
||||
const size = if (idx == self.sizes.len - 1 and self.frag.len == 0) {
|
||||
self.file_size % self.block_size;
|
||||
} else {
|
||||
self.block_size;
|
||||
const size = blk: {
|
||||
if (idx == self.sizes.len - 1 and self.frag.len == 0) {
|
||||
break :blk self.file_size % self.block_size;
|
||||
}
|
||||
break :blk self.block_size;
|
||||
};
|
||||
const block = try self.alloc.alloc(u8, size);
|
||||
errdefer self.alloc.free(block);
|
||||
@@ -143,56 +145,91 @@ pub fn DataReader(comptime T: type) type {
|
||||
return .{ .context = self };
|
||||
}
|
||||
|
||||
/// Write the entire file's contents to the writer.
|
||||
/// Write the entire file's contents to the writer using multiple threads.
|
||||
/// If availble, pwrite will be used.
|
||||
pub fn writeTo(self: Self, writer: anytype) !usize {
|
||||
if (comptime self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
||||
const mut: std.Thread.Mutex = .{};
|
||||
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
||||
var mut: std.Thread.Mutex = .{};
|
||||
var cur_idx: usize = 0;
|
||||
const wg: std.Thread.WaitGroup = .{};
|
||||
const completed = comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
std.ArrayList(anyerror).init(self.alloc);
|
||||
} else {
|
||||
std.AutoArrayHashMap(usize, anyerror![]u8).init(self.alloc);
|
||||
};
|
||||
var wg: std.Thread.WaitGroup = .{};
|
||||
var completed: std.AutoArrayHashMap(usize, anyerror![]u8) = .init(self.alloc);
|
||||
defer completed.deinit();
|
||||
var errs: std.ArrayList(anyerror) = .init(self.alloc);
|
||||
defer errs.deinit();
|
||||
for (0..self.numBlocks()) |i| {
|
||||
wg.start();
|
||||
self.pool.?.spawn(
|
||||
comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
extractThreadedPWrite;
|
||||
} else {
|
||||
extractThreaded;
|
||||
comptime blk: {
|
||||
if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
break :blk writeToThreadPWrite;
|
||||
}
|
||||
break :blk writeToThread;
|
||||
},
|
||||
comptime if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
.{ self, &wg, &completed, i, writer };
|
||||
} else {
|
||||
.{ self, &mut, &cur_idx, &wg, &completed, i, writer };
|
||||
blk: {
|
||||
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
break :blk .{ self, &wg, &errs, i, writer };
|
||||
}
|
||||
break :blk .{ self, &wg, &mut, &cur_idx, &completed, i, writer };
|
||||
},
|
||||
);
|
||||
}
|
||||
std.Thread.yield();
|
||||
wg.wait();
|
||||
if (completed.items.len > 0) {
|
||||
return completed.items.get(0);
|
||||
}
|
||||
if (errs.items.len > 0) return errs.items[0];
|
||||
return self.file_size;
|
||||
}
|
||||
pub fn writeToThreaded(self: Self, errs: *std.ArrayList(anyerror), wg: *std.Thread.WaitGroup, writer: anytype) void {}
|
||||
fn extractThreaded(
|
||||
/// Similiar to writeTo, but does not block until finished.
|
||||
/// When all blocks have been written, on_finish and wg.finish() (in that order) will be called.
|
||||
/// NOTE: wg.start() is not called;
|
||||
pub fn writeToNoBlock(
|
||||
self: Self,
|
||||
errs: *std.ArrayList(anyerror),
|
||||
writer: anytype,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
comptime on_finish: anytype,
|
||||
on_finish_args: anytype,
|
||||
) !void {
|
||||
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
||||
const mut: std.Thread.Mutex = .{};
|
||||
var cur_idx: usize = 0;
|
||||
var block_wg: std.Thread.WaitGroup = .{};
|
||||
var finish_mut: std.Thread.Mutex = .{};
|
||||
var completed: ?std.AutoHashMap(usize, []u8) = null;
|
||||
if (!comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
completed = std.AutoHashMap(usize, []u8).init(self.alloc);
|
||||
}
|
||||
block_wg.startMany(self.numBlocks());
|
||||
for (0..self.numBlocks()) |i| {
|
||||
try self.pool.?.spawn(
|
||||
comptime blk: {
|
||||
if (std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
break :blk noBlockThreadPWrite;
|
||||
}
|
||||
break :blk noBlockThread;
|
||||
},
|
||||
blk: {
|
||||
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||
break :blk .{ self, &block_wg, errs, i, writer, wg, &finish_mut, on_finish, on_finish_args };
|
||||
} else {
|
||||
break :blk .{ self, &block_wg, &mut, &cur_idx, errs, &completed.?, i, writer, wg, &finish_mut, on_finish, on_finish_args };
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn writeBlockTo(
|
||||
self: Self,
|
||||
mut: *std.Thread.Mutex,
|
||||
cur_idx: *usize,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
||||
errs: *std.ArrayList(anyerror),
|
||||
completed: *std.AutoHashMap(usize, []u8),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
if (cur_idx.* >= self.sizes.len + 1) return;
|
||||
defer wg.finish();
|
||||
) !void {
|
||||
//TODO: We can marginally reduce memory usage if we don't store sparse blocks in completed.
|
||||
if (errs.items.len > 0) return; // Indicates an error has occured in another thread.
|
||||
const block = self.blockAt(idx) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
defer if (idx < self.sizes.len) {
|
||||
@@ -202,14 +239,12 @@ pub fn DataReader(comptime T: type) type {
|
||||
defer mut.unlock();
|
||||
if (cur_idx.* == idx) {
|
||||
_ = writer.write(block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
} else {
|
||||
completed.put(idx, block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(idx, err) catch {};
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
}
|
||||
@@ -217,35 +252,114 @@ pub fn DataReader(comptime T: type) type {
|
||||
for (cur_idx.*..self.numBlocks()) |i| {
|
||||
const val = completed.get(i);
|
||||
if (val == null) return;
|
||||
_ = writer.write(block) catch |err| {
|
||||
cur_idx.* = self.sizes.len + 1;
|
||||
completed.put(i, err) catch {};
|
||||
_ = writer.write(val.?) catch |err| {
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
_ = completed.remove(i);
|
||||
cur_idx.* += 1;
|
||||
if (completed.count() == 0) return;
|
||||
}
|
||||
}
|
||||
fn extractThreadedPWrite(
|
||||
fn writeBlockToPWrite(
|
||||
self: Self,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
completed: *std.ArrayList(anyerror),
|
||||
errs: *std.ArrayList(anyerror),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
if (completed.items.len > 0) return;
|
||||
defer wg.finish();
|
||||
const block = self.blockAt(idx) catch |err| {
|
||||
completed.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
defer if (idx < self.sizes.len) {
|
||||
self.alloc.free(block);
|
||||
};
|
||||
_ = writer.pwrite(idx * self.block_size, block) catch |err| {
|
||||
completed.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
if (errs.items.len > 0) return;
|
||||
if (self.sizes[idx].size == 0) {
|
||||
var pos = idx * self.block_size;
|
||||
if (self.frag.len == 0 and idx == self.sizes.len - 1) {
|
||||
pos += self.file_size % self.block_size;
|
||||
} else {
|
||||
pos += self.block_size;
|
||||
}
|
||||
_ = writer.pwrite(&[1]u8{0}, pos - 1) catch |err| {
|
||||
errs.append(err) catch {};
|
||||
};
|
||||
} else {
|
||||
const block = self.blockAt(idx) catch |err| {
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
defer if (idx < self.sizes.len) {
|
||||
self.alloc.free(block);
|
||||
};
|
||||
_ = writer.pwrite(block, idx * self.block_size) catch |err| {
|
||||
errs.append(err) catch {};
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn writeToThread(
|
||||
self: Self,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
mut: *std.Thread.Mutex,
|
||||
cur_idx: *usize,
|
||||
errs: *std.ArrayList(anyerror),
|
||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer);
|
||||
wg.finish();
|
||||
}
|
||||
fn writeToThreadPWrite(
|
||||
self: Self,
|
||||
wg: *std.Thread.WaitGroup,
|
||||
errs: *std.ArrayList(anyerror),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
) void {
|
||||
self.writeBlockToPWrite(errs, idx, writer);
|
||||
wg.finish();
|
||||
}
|
||||
|
||||
fn noBlockThread(
|
||||
self: Self,
|
||||
block_wg: *std.Thread.WaitGroup,
|
||||
mut: *std.Thread.Mutex,
|
||||
cur_idx: *usize,
|
||||
errs: *std.ArrayList(anyerror),
|
||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
finish_wg: *std.Thread.WaitGroup,
|
||||
finish_mut: *std.Thread.Mutex,
|
||||
comptime on_finish: anytype,
|
||||
on_finish_args: anytype,
|
||||
) void {
|
||||
self.writeBlockTo(mut, cur_idx, errs, completed, idx, writer);
|
||||
block_wg.finish();
|
||||
finish_mut.lock();
|
||||
defer finish_mut.unlock();
|
||||
if (block_wg.isDone()) {
|
||||
@call(.auto, on_finish, on_finish_args);
|
||||
finish_wg.finish();
|
||||
completed.deinit();
|
||||
}
|
||||
}
|
||||
fn noBlockThreadPWrite(
|
||||
self: Self,
|
||||
block_wg: *std.Thread.WaitGroup,
|
||||
errs: *std.ArrayList(anyerror),
|
||||
idx: usize,
|
||||
writer: anytype,
|
||||
finish_wg: *std.Thread.WaitGroup,
|
||||
finish_mut: *std.Thread.Mutex,
|
||||
comptime on_finish: anytype,
|
||||
on_finish_args: anytype,
|
||||
) void {
|
||||
self.writeBlockToPWrite(errs, idx, writer);
|
||||
block_wg.finish();
|
||||
finish_mut.lock();
|
||||
defer finish_mut.unlock();
|
||||
if (block_wg.isDone()) {
|
||||
@call(.auto, on_finish, on_finish_args);
|
||||
finish_wg.finish();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user