Potential workaround for poor zstd performance
Performance is still not great, but better
This commit is contained in:
@@ -40,6 +40,7 @@ type outDat struct {
|
|||||||
func (r FullReader) process(index int, offset int64, out chan outDat) {
|
func (r FullReader) process(index int, offset int64, out chan outDat) {
|
||||||
var err error
|
var err error
|
||||||
var dat []byte
|
var dat []byte
|
||||||
|
var rdr io.ReadCloser
|
||||||
size := realSize(r.sizes[index])
|
size := realSize(r.sizes[index])
|
||||||
if size == 0 {
|
if size == 0 {
|
||||||
out <- outDat{
|
out <- outDat{
|
||||||
@@ -49,13 +50,25 @@ func (r FullReader) process(index int, offset int64, out chan outDat) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size))
|
// rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size))
|
||||||
if size == r.sizes[index] {
|
if size == r.sizes[index] {
|
||||||
rdr, err = r.d.Reader(rdr)
|
//Special workaround for zstd for increased performancce.
|
||||||
|
if zstd, ok := r.d.(*decompress.Zstd); ok {
|
||||||
|
dat = make([]byte, size)
|
||||||
|
_, err = r.r.ReadAt(dat, offset)
|
||||||
|
if err == nil {
|
||||||
|
dat, err = zstd.Decode(dat)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
rdr, err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
dat, err = io.ReadAll(rdr)
|
dat, err = io.ReadAll(rdr)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dat = make([]byte, size)
|
||||||
|
_, err = r.r.ReadAt(dat, offset)
|
||||||
|
}
|
||||||
out <- outDat{
|
out <- outDat{
|
||||||
i: index,
|
i: index,
|
||||||
err: err,
|
err: err,
|
||||||
|
|||||||
+21
-6
@@ -12,17 +12,20 @@ type Reader struct {
|
|||||||
cur io.Reader
|
cur io.Reader
|
||||||
fragRdr io.Reader
|
fragRdr io.Reader
|
||||||
d decompress.Decompressor
|
d decompress.Decompressor
|
||||||
|
comRdr io.Reader
|
||||||
blockSizes []uint32
|
blockSizes []uint32
|
||||||
blockSize uint32
|
blockSize uint32
|
||||||
|
resetable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReader(r io.Reader, d decompress.Decompressor, blockSizes []uint32, blockSize uint32) *Reader {
|
func NewReader(r io.Reader, d decompress.Decompressor, blockSizes []uint32, blockSize uint32) *Reader {
|
||||||
var out Reader
|
return &Reader{
|
||||||
out.d = d
|
d: d,
|
||||||
out.master = r
|
master: r,
|
||||||
out.blockSizes = blockSizes
|
blockSizes: blockSizes,
|
||||||
out.blockSize = blockSize
|
blockSize: blockSize,
|
||||||
return &out
|
resetable: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) AddFragment(rdr io.Reader) {
|
func (r *Reader) AddFragment(rdr io.Reader) {
|
||||||
@@ -50,7 +53,19 @@ func (r *Reader) advance() (err error) {
|
|||||||
} else {
|
} else {
|
||||||
r.cur = io.LimitReader(r.master, int64(size))
|
r.cur = io.LimitReader(r.master, int64(size))
|
||||||
if size == r.blockSizes[0] {
|
if size == r.blockSizes[0] {
|
||||||
|
if r.d.Resetable() {
|
||||||
|
if r.comRdr == nil {
|
||||||
r.cur, err = r.d.Reader(r.cur)
|
r.cur, err = r.d.Reader(r.cur)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = r.d.Reset(r.comRdr, r.cur)
|
||||||
|
r.cur = r.comRdr
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
r.cur, err = r.d.Reader(r.cur)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,3 +11,9 @@ type GZip struct{}
|
|||||||
func (g GZip) Reader(src io.Reader) (io.ReadCloser, error) {
|
func (g GZip) Reader(src io.Reader) (io.ReadCloser, error) {
|
||||||
return zlib.NewReader(src)
|
return zlib.NewReader(src)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g GZip) Resetable() bool { return true }
|
||||||
|
|
||||||
|
func (g GZip) Reset(old, src io.Reader) error {
|
||||||
|
return old.(zlib.Resetter).Reset(src, nil)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,19 @@
|
|||||||
package decompress
|
package decompress
|
||||||
|
|
||||||
import "io"
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrNotResetable = errors.New("decompressor not resetable")
|
||||||
|
|
||||||
type Decompressor interface {
|
type Decompressor interface {
|
||||||
|
//Creates a new decompressor reading from src.
|
||||||
Reader(src io.Reader) (io.ReadCloser, error)
|
Reader(src io.Reader) (io.ReadCloser, error)
|
||||||
|
//Reports whether Reset will work or not.
|
||||||
|
Resetable() bool
|
||||||
|
//Reset attempts to re-use an old decompressor with new data.
|
||||||
|
//Will return ErrNotResetable if not Resetable().
|
||||||
|
//Must ALWAYS be provided with a reader created with Reader.
|
||||||
|
Reset(old, src io.Reader) error
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,3 +11,10 @@ type Lz4 struct{}
|
|||||||
func (l Lz4) Reader(r io.Reader) (io.ReadCloser, error) {
|
func (l Lz4) Reader(r io.Reader) (io.ReadCloser, error) {
|
||||||
return io.NopCloser(lz4.NewReader(r)), nil
|
return io.NopCloser(lz4.NewReader(r)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l Lz4) Resetable() bool { return true }
|
||||||
|
|
||||||
|
func (l Lz4) Reset(old, src io.Reader) error {
|
||||||
|
old.(*lz4.Reader).Reset(src)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,3 +12,7 @@ func (l Lzma) Reader(r io.Reader) (io.ReadCloser, error) {
|
|||||||
rdr, err := lzma.NewReader(r)
|
rdr, err := lzma.NewReader(r)
|
||||||
return io.NopCloser(rdr), err
|
return io.NopCloser(rdr), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l Lzma) Resetable() bool { return false }
|
||||||
|
|
||||||
|
func (l Lzma) Reset(old, src io.Reader) error { return ErrNotResetable }
|
||||||
|
|||||||
@@ -16,3 +16,7 @@ func (l Lzo) Reader(r io.Reader) (io.ReadCloser, error) {
|
|||||||
}
|
}
|
||||||
return io.NopCloser(bytes.NewReader(cache)), nil
|
return io.NopCloser(bytes.NewReader(cache)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l Lzo) Resetable() bool { return false }
|
||||||
|
|
||||||
|
func (l Lzo) Reset(old, src io.Reader) error { return ErrNotResetable }
|
||||||
|
|||||||
@@ -12,3 +12,9 @@ func (x Xz) Reader(r io.Reader) (io.ReadCloser, error) {
|
|||||||
rdr, err := xz.NewReader(r, 0)
|
rdr, err := xz.NewReader(r, 0)
|
||||||
return io.NopCloser(rdr), err
|
return io.NopCloser(rdr), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x Xz) Resetable() bool { return true }
|
||||||
|
|
||||||
|
func (x Xz) Reset(old, src io.Reader) error {
|
||||||
|
return old.(*xz.Reader).Reset(src)
|
||||||
|
}
|
||||||
|
|||||||
+11
-13
@@ -1,31 +1,29 @@
|
|||||||
package decompress
|
package decompress
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Zstd struct{}
|
type Zstd struct {
|
||||||
|
writeToReader *zstd.Decoder
|
||||||
|
}
|
||||||
|
|
||||||
func (z Zstd) Reader(src io.Reader) (io.ReadCloser, error) {
|
func (z Zstd) Reader(src io.Reader) (io.ReadCloser, error) {
|
||||||
r, err := zstd.NewReader(src)
|
r, err := zstd.NewReader(src)
|
||||||
return r.IOReadCloser(), err
|
return r.IOReadCloser(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
type ZstdDecodeAll struct {
|
func (z Zstd) Resetable() bool { return true }
|
||||||
rdr *zstd.Decoder
|
|
||||||
|
func (z Zstd) Reset(old, src io.Reader) error {
|
||||||
|
return old.(*zstd.Decoder).Reset(src)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *ZstdDecodeAll) Reader(src io.Reader) (io.ReadCloser, error) {
|
func (z *Zstd) Decode(in []byte) (out []byte, err error) {
|
||||||
if z.rdr == nil {
|
if z.writeToReader == nil {
|
||||||
z.rdr, _ = zstd.NewReader(nil)
|
z.writeToReader, _ = zstd.NewReader(nil)
|
||||||
}
|
}
|
||||||
data, err := io.ReadAll(src)
|
return z.writeToReader.DecodeAll(in, nil)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
out, err := z.rdr.DecodeAll(data, nil)
|
|
||||||
return io.NopCloser(bytes.NewReader(out)), err
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ type Reader struct {
|
|||||||
master io.Reader
|
master io.Reader
|
||||||
cur io.Reader
|
cur io.Reader
|
||||||
d decompress.Decompressor
|
d decompress.Decompressor
|
||||||
|
comRdr io.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReader(master io.Reader, d decompress.Decompressor) *Reader {
|
func NewReader(master io.Reader, d decompress.Decompressor) *Reader {
|
||||||
@@ -25,9 +26,11 @@ func realSize(siz uint16) uint16 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) advance() (err error) {
|
func (r *Reader) advance() (err error) {
|
||||||
|
if !r.d.Resetable() {
|
||||||
if clr, ok := r.cur.(io.Closer); ok {
|
if clr, ok := r.cur.(io.Closer); ok {
|
||||||
clr.Close()
|
clr.Close()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
var raw uint16
|
var raw uint16
|
||||||
err = binary.Read(r.master, binary.LittleEndian, &raw)
|
err = binary.Read(r.master, binary.LittleEndian, &raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -36,7 +39,19 @@ func (r *Reader) advance() (err error) {
|
|||||||
size := realSize(raw)
|
size := realSize(raw)
|
||||||
r.cur = io.LimitReader(r.master, int64(size))
|
r.cur = io.LimitReader(r.master, int64(size))
|
||||||
if size == raw {
|
if size == raw {
|
||||||
|
if r.d.Resetable() {
|
||||||
|
if r.comRdr == nil {
|
||||||
r.cur, err = r.d.Reader(r.cur)
|
r.cur, err = r.d.Reader(r.cur)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = r.d.Reset(r.comRdr, r.cur)
|
||||||
|
r.cur = r.comRdr
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
r.cur, err = r.d.Reader(r.cur)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ func NewReader(r io.ReaderAt) (*Reader, error) {
|
|||||||
case LZ4Compression:
|
case LZ4Compression:
|
||||||
squash.d = decompress.Lz4{}
|
squash.d = decompress.Lz4{}
|
||||||
case ZSTDCompression:
|
case ZSTDCompression:
|
||||||
squash.d = decompress.Zstd{}
|
squash.d = &decompress.Zstd{}
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("uh, I need to do this, OR something if very wrong")
|
return nil, errors.New("uh, I need to do this, OR something if very wrong")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user