diff --git a/extraction_options.go b/extraction_options.go index c55a5ec..3272fa6 100644 --- a/extraction_options.go +++ b/extraction_options.go @@ -4,46 +4,31 @@ import ( "io" "io/fs" "runtime" - - "github.com/CalebQ42/squashfs/internal/routinemanager" + "sync" ) type ExtractionOptions struct { - manager *routinemanager.Manager - LogOutput io.Writer //Where the verbose log should write. - DereferenceSymlink bool //Replace symlinks with the target file. - UnbreakSymlink bool //Try to make sure symlinks remain unbroken when extracted, without changing the symlink. - Verbose bool //Prints extra info to log on an error. - IgnorePerm bool //Ignore file's permissions and instead use Perm. - Perm fs.FileMode //Permission to use when IgnorePerm. Defaults to 0777. - SimultaneousFiles uint16 //Number of files to process in parallel. Default set based on runtime.NumCPU(). - ExtractionRoutines uint16 //Number of goroutines to use for each file's extraction. Only applies to regular files. Default set based on runtime.NumCPU(). + dispatcher chan struct{} // Limits the amount of work being done simultaneously. + fullRdrPool sync.Pool // Pool for data.FullReader results. + LogOutput io.Writer //Where the verbose log should write. + DereferenceSymlink bool //Replace symlinks with the target file. + UnbreakSymlink bool //Try to make sure symlinks remain unbroken when extracted, without changing the symlink. + Verbose bool //Prints extra info to log on an error. + IgnorePerm bool //Ignore file's permissions and instead use Perm. + Perm fs.FileMode //Permission to use when IgnorePerm. Defaults to 0777. + ExtractionRoutines uint16 //The number of threads to use during extraction. Defaults to a number based on runtime.NumCPU(). + SimultaneousFiles uint16 //Depreciated: Only use ExtractionRoutines } // The default extraction options. func DefaultOptions() *ExtractionOptions { - cores := uint16(runtime.NumCPU() / 2) - var files, routines uint16 - if cores <= 4 { - files = 1 - routines = cores - } else { - files = cores - 4 - routines = 4 - } return &ExtractionOptions{ Perm: 0777, - SimultaneousFiles: files, - ExtractionRoutines: routines, - } -} - -// Less limited default options. Can run up 2x faster than DefaultOptions. -// Tends to use all available CPU resources. -func FastOptions() *ExtractionOptions { - return &ExtractionOptions{ - Perm: 0777, - SimultaneousFiles: uint16(runtime.NumCPU()), ExtractionRoutines: uint16(runtime.NumCPU()), } } + +// Depreciated: This just calls DefaultOptions() +func FastOptions() *ExtractionOptions { + return DefaultOptions() +} diff --git a/file.go b/file.go index f595d12..b5c78dc 100644 --- a/file.go +++ b/file.go @@ -10,8 +10,8 @@ import ( "path/filepath" "runtime" "strconv" + "sync" - "github.com/CalebQ42/squashfs/internal/routinemanager" squashfslow "github.com/CalebQ42/squashfs/low" "github.com/CalebQ42/squashfs/low/data" "github.com/CalebQ42/squashfs/low/inode" @@ -216,8 +216,16 @@ func (f File) Extract(folder string) error { // Extract the file to the given folder. If the file is a folder, the folder's contents will be extracted to the folder. // Allows setting various extraction options via ExtractionOptions. func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { - if op.manager == nil { - op.manager = routinemanager.NewManager(op.SimultaneousFiles) + if op.dispatcher == nil { + op.fullRdrPool = sync.Pool{ + New: func() any { + return &data.BlockResults{} + }, + } + op.dispatcher = make(chan struct{}, op.ExtractionRoutines) + for range op.ExtractionRoutines { + op.dispatcher <- struct{}{} + } if op.LogOutput != nil { log.SetOutput(op.LogOutput) } @@ -231,11 +239,13 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { } switch f.Low.Inode.Type { case inode.Dir, inode.EDir: + <-op.dispatcher d, err := f.Low.ToDir(f.r.Low) if err != nil { if op.Verbose { log.Println("Failed to create squashfs.Directory for", path) } + op.dispatcher <- struct{}{} return errors.Join(errors.New("failed to create squashfs.Directory: "+path), err) } errChan := make(chan error, len(d.Entries)) @@ -248,19 +258,21 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { return errors.Join(errors.New("failed to get base from entry: "+path), err) } go func(b squashfslow.FileBase, path string) { - i := op.manager.Lock() if b.IsDir() { + <-op.dispatcher extDir := filepath.Join(path, b.Name) err = os.Mkdir(extDir, 0777) - op.manager.Unlock(i) if err != nil { if op.Verbose { log.Println("Failed to create directory", path) } + op.dispatcher <- struct{}{} errChan <- errors.Join(errors.New("failed to create directory: "+path), err) return } - err = f.r.FileFromBase(b, f.r.FSFromDirectory(d, f.parent)).ExtractWithOptions(extDir, op) + fil := f.r.FileFromBase(b, f.r.FSFromDirectory(d, f.parent)) + op.dispatcher <- struct{}{} + err = fil.ExtractWithOptions(extDir, op) if err != nil { if op.Verbose { log.Println("Failed to extract directory", path) @@ -272,12 +284,12 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { } else { fil := f.r.FileFromBase(b, f.r.FSFromDirectory(d, f.parent)) err = fil.ExtractWithOptions(path, op) - op.manager.Unlock(i) fil.Close() errChan <- err } }(b, path) } + op.dispatcher <- struct{}{} var errCache []error for range d.Entries { err := <-errChan @@ -289,23 +301,28 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { return errors.Join(errors.New("failed to extract folder: "+path), errors.Join(errCache...)) } case inode.Fil, inode.EFil: + <-op.dispatcher path = filepath.Join(path, f.Low.Name) outFil, err := os.Create(path) if err != nil { if op.Verbose { log.Println("Failed to create file", path) } + op.dispatcher <- struct{}{} return errors.Join(errors.New("failed to create file: "+path), err) } defer outFil.Close() full, err := f.Low.GetFullReader(&f.r.Low) + defer full.Close() if err != nil { if op.Verbose { log.Println("Failed to create full reader for", path) } + op.dispatcher <- struct{}{} return errors.Join(errors.New("failed to create full reader: "+path), err) } - full.SetGoroutineLimit(op.ExtractionRoutines) + full.SetDispatcherPool(op.dispatcher, &op.fullRdrPool) + op.dispatcher <- struct{}{} _, err = full.WriteTo(outFil) if err != nil { if op.Verbose { @@ -314,6 +331,8 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { return errors.Join(errors.New("failed to write file: "+path), err) } case inode.Sym, inode.ESym: + <-op.dispatcher + defer func() { op.dispatcher <- struct{}{} }() symPath := f.SymlinkPath() if op.DereferenceSymlink { filTmp := f.GetSymlinkFile() @@ -361,6 +380,8 @@ func (f File) ExtractWithOptions(path string, op *ExtractionOptions) error { } } case inode.Char, inode.EChar, inode.Block, inode.EBlock, inode.Fifo, inode.EFifo: + <-op.dispatcher + defer func() { op.dispatcher <- struct{}{} }() if runtime.GOOS == "windows" { if op.Verbose { log.Println(f.path(), "ignored. A device link and can't be created on Windows.") diff --git a/low/data/fullreader.go b/low/data/fullreader.go index b46cceb..f368c41 100644 --- a/low/data/fullreader.go +++ b/low/data/fullreader.go @@ -4,19 +4,21 @@ import ( "errors" "io" "runtime" + "sync" "github.com/CalebQ42/squashfs/internal/decompress" ) type FullReader struct { - fileSize uint64 - blockSize uint32 - goroutineLimit uint16 - rdr io.ReaderAt - decomp decompress.Decompressor - sizes []uint32 - blockOffsets []uint64 - fragDat []byte + fileSize uint64 + blockSize uint32 + dispatcher chan struct{} + pool *sync.Pool + rdr io.ReaderAt + decomp decompress.Decompressor + sizes []uint32 + blockOffsets []uint64 + fragDat []byte } func NewFullReader(rdr io.ReaderAt, decomp decompress.Decompressor, blockSize uint32, size uint64, start uint64, sizes []uint32) FullReader { @@ -56,12 +58,15 @@ func (f *FullReader) AddFragData(blockStart uint64, blockSize uint32, offset uin return err } } - f.fragDat = dat[offset : offset+uint32(f.fileSize%uint64(f.blockSize))] + f.fragDat = make([]byte, f.fileSize%uint64(f.blockSize)) + copy(f.fragDat, dat[offset:]) + dat = nil return nil } -func (f *FullReader) SetGoroutineLimit(limit uint16) { - f.goroutineLimit = limit +func (f *FullReader) SetDispatcherPool(dispatcher chan struct{}, pool *sync.Pool) { + f.dispatcher = dispatcher + f.pool = pool } // The number of blocks, including the fragment block if present @@ -94,53 +99,86 @@ func (f FullReader) Block(i uint32) ([]byte, error) { return nil, err } if realSize == f.sizes[i] { - return f.decomp.Decompress(dat) + dat, err = f.decomp.Decompress(dat) } - return dat, nil + return dat, err } -type blockResults struct { +func (f FullReader) blockFromPool(i uint32) *BlockResults { + out := f.pool.Get().(*BlockResults) + out.idx = i + out.err = nil + if i == uint32(len(f.sizes)) && f.fragDat != nil { + out.block = f.fragDat + return out + } + if i >= uint32(len(f.sizes)) { + out.err = errors.New("invalid block index") + return out + } + realSize := f.sizes[i] &^ (1 << 24) + if realSize == 0 { + if i == uint32(len(f.sizes)-1) && f.fragDat == nil { + out.block = make([]byte, f.fileSize%uint64(f.blockSize)) + return out + } + out.block = make([]byte, f.blockSize) + } + out.block = make([]byte, realSize) + _, out.err = f.rdr.ReadAt(out.block, int64(f.blockOffsets[i])) + if out.err != nil { + return out + } + if realSize == f.sizes[i] { + out.block, out.err = f.decomp.Decompress(out.block) + } + return out +} + +type BlockResults struct { idx uint32 block []byte err error } func (f FullReader) WriteTo(w io.Writer) (wrote int64, err error) { - routineLimit := f.goroutineLimit - if routineLimit == 0 { - routineLimit = uint16(runtime.NumCPU() / 2) + if f.dispatcher == nil { + f.dispatcher = make(chan struct{}, runtime.NumCPU()) + for range runtime.NumCPU() { + f.dispatcher <- struct{}{} + } } - dispatchChan := make(chan struct{}, routineLimit) - for range int(routineLimit) { - dispatchChan <- struct{}{} + if f.pool == nil { + f.pool = &sync.Pool{ + New: func() any { + return &BlockResults{} + }, + } } - resChan := make(chan blockResults, routineLimit) - var results map[uint32]blockResults + open := true + resChan := make(chan *BlockResults, len(f.dispatcher)) + var results map[uint32]*BlockResults if _, is := w.(io.WriterAt); !is { - results = make(map[uint32]blockResults) + results = make(map[uint32]*BlockResults) } for i := range f.BlockNum() { go func(idx uint32) { - _, open := <-dispatchChan + <-f.dispatcher + defer func() { f.dispatcher <- struct{}{} }() if !open { - resChan <- blockResults{} + resChan <- f.pool.Get().(*BlockResults) return } - block, err := f.Block(idx) - resChan <- blockResults{ - idx: idx, - block: block, - err: err, - } - dispatchChan <- struct{}{} + resChan <- f.blockFromPool(idx) }(i) } out := int64(0) errOut := make([]error, 0) for i := uint32(0); i < f.BlockNum(); { res := <-resChan + defer f.pool.Put(res) if res.err != nil { - close(dispatchChan) + open = false errOut = append(errOut, res.err) } if len(errOut) > 0 { @@ -180,6 +218,8 @@ func (f FullReader) WriteTo(w io.Writer) (wrote int64, err error) { out = max(out, int64(res.idx)*int64(f.blockSize)+int64(len(res.block))) } i++ + delete(results, i) + f.pool.Put(res) } else { break } diff --git a/mem.out b/mem.out new file mode 100644 index 0000000..5139801 Binary files /dev/null and b/mem.out differ diff --git a/squashfs.test b/squashfs.test new file mode 100755 index 0000000..e1b65e8 Binary files /dev/null and b/squashfs.test differ diff --git a/squashfs_test.go b/squashfs_test.go index 83c59c8..ee92dab 100644 --- a/squashfs_test.go +++ b/squashfs_test.go @@ -116,8 +116,8 @@ func BenchmarkRace(b *testing.B) { b.Log("Unsquashfs error:", err) } unsquashTime = time.Since(start) - b.Log("Library took:", libTime.Round(time.Millisecond)) - b.Log("unsquashfs took:", unsquashTime.Round(time.Millisecond)) + // b.Log("Library took:", libTime.Round(time.Millisecond)) + // b.Log("unsquashfs took:", unsquashTime.Round(time.Millisecond)) b.Log("unsquashfs is", strconv.FormatFloat(float64(libTime.Milliseconds())/float64(unsquashTime.Milliseconds()), 'f', 2, 64), "times faster") }