Messing around with optimizations.

This commit is contained in:
Caleb Gardner
2022-12-14 13:48:22 -06:00
parent 56fdba2f28
commit 1b5078c7bd
3 changed files with 81 additions and 66 deletions
+80 -64
View File
@@ -2,6 +2,7 @@ package data
import ( import (
"io" "io"
"sync"
"github.com/CalebQ42/squashfs/internal/decompress" "github.com/CalebQ42/squashfs/internal/decompress"
"github.com/CalebQ42/squashfs/internal/toreader" "github.com/CalebQ42/squashfs/internal/toreader"
@@ -26,9 +27,9 @@ func NewFullReader(r io.ReaderAt, start uint64, d decompress.Decompressor, block
} }
} }
func (r *FullReader) AddFragment(rdr func() (io.Reader, error)) { func (r *FullReader) AddFragment(rdr func() (io.Reader, error), size uint32) {
r.fragRdr = rdr r.fragRdr = rdr
r.sizes = append(r.sizes, 0) r.sizes = append(r.sizes, size)
} }
type outDat struct { type outDat struct {
@@ -37,50 +38,51 @@ type outDat struct {
i int i int
} }
func (r FullReader) process(index int, offset int64, out chan outDat) { func (r FullReader) process(index int, offset int64, od *outDat, out chan *outDat) {
var err error defer func() {
var dat []byte out <- od
var rdr io.ReadCloser }()
od.i = index
size := realSize(r.sizes[index]) size := realSize(r.sizes[index])
if size == 0 { if size == 0 {
out <- outDat{ od.err = nil
i: index, od.data = make([]byte, r.blockSize)
err: nil,
data: make([]byte, r.blockSize),
}
return return
} }
// rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size)) // rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size))
if size == r.sizes[index] { if size == r.sizes[index] {
//Special workaround for zstd for increased performancce. //Special workaround for zstd for increased performancce.
if zstd, ok := r.d.(*decompress.Zstd); ok { if zstd, ok := r.d.(*decompress.Zstd); ok {
dat = make([]byte, size) od.data = make([]byte, size)
_, err = r.r.ReadAt(dat, offset) _, od.err = r.r.ReadAt(od.data, offset)
if err == nil { if od.err == nil {
dat, err = zstd.Decode(dat) od.data, od.err = zstd.Decode(od.data)
} }
} else { } else {
rdr, err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size))) var rdr io.ReadCloser
if err == nil { rdr, od.err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
dat, err = io.ReadAll(rdr) if od.err != nil {
return
} }
od.data = make([]byte, r.blockSize)
var read int
read, od.err = rdr.Read(od.data)
od.data = od.data[:read]
rdr.Close()
} }
} else { } else {
dat = make([]byte, size) od.data = make([]byte, size)
_, err = r.r.ReadAt(dat, offset) _, od.err = r.r.ReadAt(od.data, offset)
}
out <- outDat{
i: index,
err: err,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
} }
} }
func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) { func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
out := make(chan outDat, len(r.sizes)) pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
offset := r.start offset := r.start
num := len(r.sizes) num := len(r.sizes)
start := off / int64(r.blockSize) start := off / int64(r.blockSize)
@@ -100,40 +102,42 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
offset += uint64(realSize(r.sizes[i])) offset += uint64(realSize(r.sizes[i]))
continue continue
} }
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil { if i == num-1 && r.fragRdr != nil {
go func() { go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr() rdr, e := r.fragRdr()
if err != nil { if err != nil {
out <- outDat{ od.i = num - 1
i: num - 1, od.err = e
err: e,
}
return return
} }
dat, e := io.ReadAll(rdr) od.data = make([]byte, r.sizes[num-1])
out <- outDat{ _, e = rdr.Read(od.data)
i: num - 1, od.i = num - 1
err: e, od.err = e
data: dat,
}
if clr, ok := rdr.(io.Closer); ok { if clr, ok := rdr.(io.Closer); ok {
clr.Close() clr.Close()
} }
}() }()
continue continue
} }
go r.process(i, int64(offset), out) go r.process(i, int64(offset), od, out)
offset += uint64(realSize(r.sizes[i])) offset += uint64(realSize(r.sizes[i]))
} }
cur := start
cache := make(map[int]outDat) cache := make(map[int]outDat)
for cur := start; cur < int64(end); { for dat := range out {
dat := <-out
if dat.err != nil { if dat.err != nil {
err = dat.err err = dat.err
pol.Put(dat)
return return
} }
if dat.i != int(cur) { if dat.i != int(cur) {
cache[dat.i] = dat cache[dat.i] = *dat
pol.Put(dat)
continue continue
} }
if cur == start { if cur == start {
@@ -144,16 +148,18 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
} }
n += len(dat.data) n += len(dat.data)
cur++ cur++
pol.Put(dat)
var ok bool var ok bool
var curDat outDat
for { for {
dat, ok = cache[int(cur)] curDat, ok = cache[int(cur)]
if !ok { if !ok {
break break
} }
for i := range dat.data { for i := range curDat.data {
p[n+i] = dat.data[i] p[n+i] = curDat.data[i]
} }
n += len(dat.data) n += len(curDat.data)
cur++ cur++
delete(cache, int(cur)) delete(cache, int(cur))
} }
@@ -165,60 +171,70 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
} }
func (r FullReader) WriteTo(w io.Writer) (n int64, err error) { func (r FullReader) WriteTo(w io.Writer) (n int64, err error) {
out := make(chan outDat, len(r.sizes)) pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
offset := r.start offset := r.start
num := len(r.sizes) num := len(r.sizes)
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil { if i == num-1 && r.fragRdr != nil {
go func() { go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr() rdr, e := r.fragRdr()
if err != nil { if err != nil {
out <- outDat{ od.i = num - 1
i: num - 1, od.err = e
err: e,
}
return return
} }
dat, e := io.ReadAll(rdr) buf := make([]byte, r.sizes[num-1])
out <- outDat{ _, e = rdr.Read(buf)
i: num - 1, od.i = num - 1
err: e, od.err = e
data: dat, od.data = buf
}
if clr, ok := rdr.(io.Closer); ok { if clr, ok := rdr.(io.Closer); ok {
clr.Close() clr.Close()
} }
}() }()
continue continue
} }
go r.process(i, int64(offset), out) go r.process(i, int64(offset), od, out)
offset += uint64(realSize(r.sizes[i])) offset += uint64(realSize(r.sizes[i]))
} }
var cur int
cache := make(map[int]outDat) cache := make(map[int]outDat)
var tmpN int var tmpN int
for cur := 0; cur < num; { for dat := range out {
dat := <-out
if dat.err != nil { if dat.err != nil {
err = dat.err err = dat.err
pol.Put(dat)
return return
} }
if dat.i != cur { if dat.i != cur {
cache[dat.i] = dat cache[dat.i] = *dat
pol.Put(dat)
continue continue
} }
tmpN, err = w.Write(dat.data) tmpN, err = w.Write(dat.data)
n += int64(tmpN) n += int64(tmpN)
pol.Put(dat)
if err != nil { if err != nil {
return return
} }
cur++ cur++
var ok bool var ok bool
var curDat outDat
for { for {
dat, ok = cache[cur] curDat, ok = cache[cur]
if !ok { if !ok {
break break
} }
tmpN, err = w.Write(dat.data) tmpN, err = w.Write(curDat.data)
n += int64(tmpN) n += int64(tmpN)
if err != nil { if err != nil {
return return
+1 -1
View File
@@ -70,7 +70,7 @@ func (r Reader) getReaders(i inode.Inode) (full *data.FullReader, rdr *data.Read
} }
fragRdr = io.LimitReader(fragRdr, int64(fragSize)) fragRdr = io.LimitReader(fragRdr, int64(fragSize))
return fragRdr, nil return fragRdr, nil
}) }, fragSize)
var fragRdr io.Reader var fragRdr io.Reader
fragRdr, err = r.fragReader(fragInd) fragRdr, err = r.fragReader(fragInd)
if err != nil { if err != nil {
-1
View File
@@ -73,7 +73,6 @@ func TestMisc(t *testing.T) {
} }
func BenchmarkRace(b *testing.B) { func BenchmarkRace(b *testing.B) {
// tmpDir := b.TempDir()
tmpDir := "testing" tmpDir := "testing"
fil, err := preTest(tmpDir) fil, err := preTest(tmpDir)
if err != nil { if err != nil {