Data reader fixes
Re-started extraction logic
This commit is contained in:
+96
-6
@@ -23,6 +23,7 @@ pub fn File(comptime T: type) type {
|
|||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
rdr: *SfsReader(T),
|
rdr: *SfsReader(T),
|
||||||
|
// parent: *File(T),
|
||||||
|
|
||||||
inode: Inode,
|
inode: Inode,
|
||||||
name: []const u8,
|
name: []const u8,
|
||||||
@@ -220,37 +221,126 @@ pub fn File(comptime T: type) type {
|
|||||||
defer pol.deinit();
|
defer pol.deinit();
|
||||||
var errs: std.ArrayList(anyerror) = .init(self.rdr.alloc);
|
var errs: std.ArrayList(anyerror) = .init(self.rdr.alloc);
|
||||||
defer errs.deinit();
|
defer errs.deinit();
|
||||||
try self.extractReal(op, &errs, &wg, &pol, path, true);
|
try self.extractReal(op, path, &errs, &wg, &pol, true);
|
||||||
wg.wait();
|
wg.wait();
|
||||||
if (errs.items.len > 0) return errs.items[0];
|
if (errs.items.len > 0) return errs.items[0];
|
||||||
}
|
}
|
||||||
fn extractReal(
|
fn extractReal(
|
||||||
self: Self,
|
self: Self,
|
||||||
op: ExtractionOptions,
|
op: ExtractionOptions,
|
||||||
|
path: []const u8,
|
||||||
errs: *std.ArrayList(anyerror),
|
errs: *std.ArrayList(anyerror),
|
||||||
wg: *WaitGroup,
|
wg: *WaitGroup,
|
||||||
pol: *Pool,
|
pol: *Pool,
|
||||||
path: []const u8,
|
|
||||||
first: bool,
|
first: bool,
|
||||||
) !void {
|
) !void {
|
||||||
|
if (errs.items.len > 0) return;
|
||||||
if (op.verbose) {
|
if (op.verbose) {
|
||||||
std.fmt.format(op.verbose_logger, "extracting inode {} \"{s}\" to {s}...\n", .{ self.inode.hdr.num, self.name, path }) catch {};
|
std.fmt.format(op.verbose_logger, "extracting inode {} \"{s}\" to {s}...\n", .{ self.inode.hdr.num, self.name, path }) catch {};
|
||||||
}
|
}
|
||||||
return switch (self.inode.hdr.type) {
|
return switch (self.inode.hdr.type) {
|
||||||
.dir, .ext_dir => {},
|
.dir, .ext_dir => {
|
||||||
.file, .ext_file => {},
|
wg.start();
|
||||||
|
errdefer wg.finish();
|
||||||
|
for (self.entries.?) |ent| {
|
||||||
|
var fil = initFromEntry(self.rdr, ent) catch |err| {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
.file, .ext_file => {
|
||||||
|
wg.start();
|
||||||
|
errdefer wg.finish();
|
||||||
|
var ext_fil = try std.fs.cwd().createFile(path, .{});
|
||||||
|
errdefer ext_fil.close();
|
||||||
|
var fil_errs: std.ArrayList(anyerror) = .init(self.rdr.alloc);
|
||||||
|
errdefer fil_errs.deinit();
|
||||||
|
@constCast(&self.data_reader.?).setPool(pol);
|
||||||
|
try self.data_reader.?.writeToNoBlock(errs, ext_fil, filExtractFinish, .{
|
||||||
|
self,
|
||||||
|
op,
|
||||||
|
path,
|
||||||
|
&fil_errs,
|
||||||
|
errs,
|
||||||
|
wg,
|
||||||
|
ext_fil,
|
||||||
|
first,
|
||||||
|
});
|
||||||
|
},
|
||||||
.symlink, .ext_symlink => {},
|
.symlink, .ext_symlink => {},
|
||||||
.block_dev, .ext_block_dev, .char_dev, .ext_char_dev, .fifo, .ext_fifo => {},
|
.block_dev, .ext_block_dev, .char_dev, .ext_char_dev, .fifo, .ext_fifo => {
|
||||||
|
//TODO: check for all oses that accept unix permissions.
|
||||||
|
},
|
||||||
else => {
|
else => {
|
||||||
if (op.verbose) {
|
if (op.verbose) {
|
||||||
std.fmt.format(
|
std.fmt.format(
|
||||||
op.verbose_logger,
|
op.verbose_logger,
|
||||||
"inode {} \"{s}\" is a socket file. Ignoring.\n",
|
"inode {} \"{s}\" is a socket file. Ignoring.\n",
|
||||||
.{ self.inode.hdr.num, self.name },
|
.{ self.inode.hdr.num, path },
|
||||||
) catch {};
|
) catch {};
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
fn filExtractFinish(
|
||||||
|
self: Self,
|
||||||
|
op: ExtractionOptions,
|
||||||
|
path: []const u8,
|
||||||
|
fil_errs: *std.ArrayList(anyerror),
|
||||||
|
errs: *std.ArrayList(anyerror),
|
||||||
|
wg: *WaitGroup,
|
||||||
|
fil: std.fs.File,
|
||||||
|
first: bool,
|
||||||
|
) void {
|
||||||
|
defer wg.finish();
|
||||||
|
defer fil.close();
|
||||||
|
defer if (!first) self.deinit();
|
||||||
|
if (fil_errs.items.len > 0) {
|
||||||
|
if (op.verbose) {
|
||||||
|
for (fil_errs.items) |err| {
|
||||||
|
std.fmt.format(
|
||||||
|
op.verbose_logger,
|
||||||
|
"error extracting inode {} to \"{s}\": {}\n",
|
||||||
|
.{ self.inode.hdr.num, path, err },
|
||||||
|
) catch {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errs.append(fil_errs.items[0]) catch {};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (op.ignore_permissions) return;
|
||||||
|
const fil_uid = self.uid() catch |err| {
|
||||||
|
std.fmt.format(
|
||||||
|
op.verbose_logger,
|
||||||
|
"error getting uid for inode {} \"{s}\": {}\n",
|
||||||
|
.{ self.inode.hdr.num, path, err },
|
||||||
|
) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
const fil_gid = self.gid() catch |err| {
|
||||||
|
std.fmt.format(
|
||||||
|
op.verbose_logger,
|
||||||
|
"error getting gid for inode {} \"{s}\": {}\n",
|
||||||
|
.{ self.inode.hdr.num, path, err },
|
||||||
|
) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
fil.chmod(self.inode.hdr.perm) catch |err| {
|
||||||
|
std.fmt.format(
|
||||||
|
op.verbose_logger,
|
||||||
|
"error setting permissions for inode {} \"{s}\": {}\n",
|
||||||
|
.{ self.inode.hdr.num, path, err },
|
||||||
|
) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
fil.chown(fil_uid, fil_gid) catch |err| {
|
||||||
|
std.fmt.format(
|
||||||
|
op.verbose_logger,
|
||||||
|
"error setting owner for inode {} \"{s}\": {}\n",
|
||||||
|
.{ self.inode.hdr.num, path, err },
|
||||||
|
) catch {};
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
+7
-7
@@ -152,7 +152,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
var mut: std.Thread.Mutex = .{};
|
var mut: std.Thread.Mutex = .{};
|
||||||
var cur_idx: usize = 0;
|
var cur_idx: usize = 0;
|
||||||
var wg: std.Thread.WaitGroup = .{};
|
var wg: std.Thread.WaitGroup = .{};
|
||||||
var completed: std.AutoArrayHashMap(usize, anyerror![]u8) = .init(self.alloc);
|
var completed: std.AutoHashMap(usize, []u8) = .init(self.alloc);
|
||||||
defer completed.deinit();
|
defer completed.deinit();
|
||||||
var errs: std.ArrayList(anyerror) = .init(self.alloc);
|
var errs: std.ArrayList(anyerror) = .init(self.alloc);
|
||||||
defer errs.deinit();
|
defer errs.deinit();
|
||||||
@@ -169,7 +169,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
if (comptime std.meta.hasFn(@TypeOf(writer), "pwrite")) {
|
||||||
break :blk .{ self, &wg, &errs, i, writer };
|
break :blk .{ self, &wg, &errs, i, writer };
|
||||||
}
|
}
|
||||||
break :blk .{ self, &wg, &mut, &cur_idx, &completed, i, writer };
|
break :blk .{ self, &wg, &mut, &cur_idx, &errs, &completed, i, writer };
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -188,7 +188,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
on_finish_args: anytype,
|
on_finish_args: anytype,
|
||||||
) !void {
|
) !void {
|
||||||
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
if (self.pool == null) return DataReaderError.ThreadPoolNotSet;
|
||||||
const mut: std.Thread.Mutex = .{};
|
var mut: std.Thread.Mutex = .{};
|
||||||
var cur_idx: usize = 0;
|
var cur_idx: usize = 0;
|
||||||
var block_wg: std.Thread.WaitGroup = .{};
|
var block_wg: std.Thread.WaitGroup = .{};
|
||||||
var finish_mut: std.Thread.Mutex = .{};
|
var finish_mut: std.Thread.Mutex = .{};
|
||||||
@@ -224,7 +224,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
completed: *std.AutoHashMap(usize, []u8),
|
completed: *std.AutoHashMap(usize, []u8),
|
||||||
idx: usize,
|
idx: usize,
|
||||||
writer: anytype,
|
writer: anytype,
|
||||||
) !void {
|
) void {
|
||||||
//TODO: We can marginally reduce memory usage if we don't store sparse blocks in completed.
|
//TODO: We can marginally reduce memory usage if we don't store sparse blocks in completed.
|
||||||
if (errs.items.len > 0) return; // Indicates an error has occured in another thread.
|
if (errs.items.len > 0) return; // Indicates an error has occured in another thread.
|
||||||
const block = self.blockAt(idx) catch |err| {
|
const block = self.blockAt(idx) catch |err| {
|
||||||
@@ -298,7 +298,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
mut: *std.Thread.Mutex,
|
mut: *std.Thread.Mutex,
|
||||||
cur_idx: *usize,
|
cur_idx: *usize,
|
||||||
errs: *std.ArrayList(anyerror),
|
errs: *std.ArrayList(anyerror),
|
||||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
completed: *std.AutoHashMap(usize, []u8),
|
||||||
idx: usize,
|
idx: usize,
|
||||||
writer: anytype,
|
writer: anytype,
|
||||||
) void {
|
) void {
|
||||||
@@ -308,7 +308,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
fn writeToThreadPWrite(
|
fn writeToThreadPWrite(
|
||||||
self: Self,
|
self: Self,
|
||||||
wg: *std.Thread.WaitGroup,
|
wg: *std.Thread.WaitGroup,
|
||||||
errs: *std.ArrayList(anyerror),
|
errs: std.ArrayList(anyerror),
|
||||||
idx: usize,
|
idx: usize,
|
||||||
writer: anytype,
|
writer: anytype,
|
||||||
) void {
|
) void {
|
||||||
@@ -322,7 +322,7 @@ pub fn DataReader(comptime T: type) type {
|
|||||||
mut: *std.Thread.Mutex,
|
mut: *std.Thread.Mutex,
|
||||||
cur_idx: *usize,
|
cur_idx: *usize,
|
||||||
errs: *std.ArrayList(anyerror),
|
errs: *std.ArrayList(anyerror),
|
||||||
completed: *std.AutoArrayHashMap(usize, anyerror![]u8),
|
completed: *std.AutoHashMap(usize, []u8),
|
||||||
idx: usize,
|
idx: usize,
|
||||||
writer: anytype,
|
writer: anytype,
|
||||||
finish_mut: *std.Thread.Mutex,
|
finish_mut: *std.Thread.Mutex,
|
||||||
|
|||||||
Reference in New Issue
Block a user