Compare commits

...

4 Commits

Author SHA1 Message Date
Caleb Gardner ce2e45ceec Fixed issues with decompress.Decoder 2023-01-05 01:29:23 -06:00
Caleb Gardner 089ef53c8c Revert changes to fullreader 2023-01-04 06:40:57 -06:00
Caleb Gardner 658e5c9e0b Mount is non-blocking again 2023-01-04 06:01:12 -06:00
Caleb Gardner f2d86aff96 Fixed a race condition with mounts that caused them to fail 2023-01-04 05:41:43 -06:00
8 changed files with 125 additions and 141 deletions
+30 -5
View File
@@ -3,6 +3,7 @@ package squashfs
import (
"bytes"
"context"
"errors"
"io"
"github.com/CalebQ42/fuse"
@@ -10,17 +11,41 @@ import (
"github.com/CalebQ42/squashfs/internal/inode"
)
// Creates a fuse mount, then mounts the archive on a seperate goroutine.
// If waiting for the mount to end, simply do <-con.Ready.
func (r *Reader) Mount(mountpoint string) (con *fuse.Conn, err error) {
con, err = fuse.Mount(mountpoint, fuse.ReadOnly())
// Mounts the archive to the given mountpoint using fuse3. Non-blocking.
// If Unmount does not get called, the mount point must be unmounted using umount before the directory can be used again.
func (r *Reader) Mount(mountpoint string) (err error) {
if r.con != nil {
return errors.New("squashfs archive already mounted")
}
r.con, err = fuse.Mount(mountpoint, fuse.ReadOnly())
if err != nil {
return
}
go fs.Serve(con, &squashFuse{r: r})
<-r.con.Ready
r.mountDone = make(chan struct{})
go func() {
fs.Serve(r.con, &squashFuse{r: r})
close(r.mountDone)
}()
return
}
// Blocks until the mount ends.
func (r *Reader) MountWait() {
if r.mountDone != nil {
<-r.mountDone
}
}
// Unmounts the archive.
func (r *Reader) Unmount() error {
if r.con != nil {
defer func() { r.con = nil }()
return r.con.Close()
}
return errors.New("squashfs archive is not mounted")
}
type squashFuse struct {
r *Reader
}
+85 -120
View File
@@ -2,7 +2,6 @@ package data
import (
"io"
"sync"
"github.com/CalebQ42/squashfs/internal/decompress"
"github.com/CalebQ42/squashfs/internal/toreader"
@@ -27,9 +26,9 @@ func NewFullReader(r io.ReaderAt, start uint64, d decompress.Decompressor, block
}
}
func (r *FullReader) AddFragment(rdr func() (io.Reader, error), size uint32) {
func (r *FullReader) AddFragment(rdr func() (io.Reader, error)) {
r.fragRdr = rdr
r.sizes = append(r.sizes, size)
r.sizes = append(r.sizes, 0)
}
type outDat struct {
@@ -38,50 +37,49 @@ type outDat struct {
i int
}
func (r FullReader) process(index int, offset int64, od *outDat, out chan *outDat) {
defer func() {
out <- od
}()
od.i = index
func (r FullReader) process(index int, offset int64, out chan outDat) {
var err error
var dat []byte
var rdr io.ReadCloser
size := realSize(r.sizes[index])
if size == 0 {
od.err = nil
od.data = make([]byte, r.blockSize)
out <- outDat{
i: index,
err: nil,
data: make([]byte, r.blockSize),
}
return
}
// rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size))
if size == r.sizes[index] {
if dec, ok := r.d.(decompress.Decoder); ok {
dat := make([]byte, size)
_, od.err = r.r.ReadAt(dat, offset)
if od.err != nil {
return
dat = make([]byte, size)
_, err = r.r.ReadAt(dat, offset)
if err == nil {
dat, err = dec.Decode(dat)
}
} else {
rdr, err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
if err == nil {
dat, err = io.ReadAll(rdr)
}
od.data, od.err = dec.Decode(dat, int(r.blockSize))
return
}
var rdr io.ReadCloser
rdr, od.err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
if od.err != nil {
return
}
od.data = make([]byte, r.blockSize)
var read int
read, od.err = rdr.Read(od.data)
od.data = od.data[:read]
rdr.Close()
} else {
od.data = make([]byte, size)
_, od.err = r.r.ReadAt(od.data, offset)
dat = make([]byte, size)
_, err = r.r.ReadAt(dat, offset)
}
out <- outDat{
i: index,
err: err,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}
func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
out := make(chan outDat, len(r.sizes))
offset := r.start
num := len(r.sizes)
start := off / int64(r.blockSize)
@@ -101,42 +99,40 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
offset += uint64(realSize(r.sizes[i]))
continue
}
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil {
go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr()
if err != nil {
od.i = num - 1
od.err = e
out <- outDat{
i: num - 1,
err: e,
}
return
}
od.data = make([]byte, r.sizes[num-1])
_, e = rdr.Read(od.data)
od.i = num - 1
od.err = e
dat, e := io.ReadAll(rdr)
out <- outDat{
i: num - 1,
err: e,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}()
continue
}
go r.process(i, int64(offset), od, out)
go r.process(i, int64(offset), out)
offset += uint64(realSize(r.sizes[i]))
}
cur := start
cache := make(map[int]outDat)
for dat := range out {
for cur := start; cur < int64(end); {
dat := <-out
if dat.err != nil {
err = dat.err
pol.Put(dat)
return
}
if dat.i != int(cur) {
cache[dat.i] = *dat
pol.Put(dat)
cache[dat.i] = dat
continue
}
if cur == start {
@@ -147,18 +143,16 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
}
n += len(dat.data)
cur++
pol.Put(dat)
var ok bool
var curDat outDat
for {
curDat, ok = cache[int(cur)]
dat, ok = cache[int(cur)]
if !ok {
break
}
for i := range curDat.data {
p[n+i] = curDat.data[i]
for i := range dat.data {
p[n+i] = dat.data[i]
}
n += len(curDat.data)
n += len(dat.data)
cur++
delete(cache, int(cur))
}
@@ -170,57 +164,58 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
}
func (r FullReader) WriteTo(w io.Writer) (n int64, err error) {
pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
out := make(chan outDat, len(r.sizes))
offset := r.start
num := len(r.sizes)
for i := 0; i < num; i++ {
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil {
go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr()
if err != nil {
od.i = num - 1
od.err = e
out <- outDat{
i: num - 1,
err: e,
}
return
}
buf := make([]byte, r.sizes[num-1])
_, e = rdr.Read(buf)
od.i = num - 1
od.err = e
od.data = buf
dat, e := io.ReadAll(rdr)
out <- outDat{
i: num - 1,
err: e,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}()
continue
}
go r.process(i, int64(offset), od, out)
go r.process(i, int64(offset), out)
offset += uint64(realSize(r.sizes[i]))
}
wt, ok := w.(io.WriterAt)
if !ok {
var cur int
cache := make(map[int]outDat)
var tmpN int
var dat *outDat
for cur < len(r.sizes) {
dat = <-out
defer pol.Put(dat)
if dat.err != nil {
err = dat.err
return
}
if dat.i != cur {
cache[dat.i] = *dat
continue
cache := make(map[int]outDat)
var tmpN int
for cur := 0; cur < num; {
dat := <-out
if dat.err != nil {
err = dat.err
return
}
if dat.i != cur {
cache[dat.i] = dat
continue
}
tmpN, err = w.Write(dat.data)
n += int64(tmpN)
if err != nil {
return
}
cur++
var ok bool
for {
dat, ok = cache[cur]
if !ok {
break
}
tmpN, err = w.Write(dat.data)
n += int64(tmpN)
@@ -228,36 +223,6 @@ func (r FullReader) WriteTo(w io.Writer) (n int64, err error) {
return
}
cur++
var ok bool
var curDat outDat
for {
curDat, ok = cache[cur]
if !ok {
break
}
tmpN, err = w.Write(curDat.data)
n += int64(tmpN)
if err != nil {
return
}
cur++
}
}
} else {
var done int
var dat *outDat
for done < len(r.sizes) {
dat = <-out
defer pol.Put(dat)
if dat.err != nil {
err = dat.err
return
}
_, err = wt.WriteAt(dat.data, int64(dat.i*int(r.blockSize)))
if err != nil {
return
}
done++
}
}
return
+1 -1
View File
@@ -18,5 +18,5 @@ type Resetable interface {
type Decoder interface {
//Decodes a chunk of data all at once.
Decode(in []byte, outSize int) ([]byte, error)
Decode(in []byte) ([]byte, error)
}
-9
View File
@@ -16,12 +16,3 @@ func (l Lz4) Reset(old, src io.Reader) error {
old.(*lz4.Reader).Reset(src)
return nil
}
func (l Lz4) Decode(in []byte, outSize int) (out []byte, err error) {
out = make([]byte, outSize)
outLen, err := lz4.UncompressBlock(in, out)
if outLen < outSize {
out = out[:outLen]
}
return
}
+2 -2
View File
@@ -19,9 +19,9 @@ func (z Zstd) Reset(old, src io.Reader) error {
return old.(*zstd.Decoder).Reset(src)
}
func (z Zstd) Decode(in []byte, outSize int) ([]byte, error) {
func (z Zstd) Decode(in []byte) ([]byte, error) {
if z.writeToReader == nil {
z.writeToReader, _ = zstd.NewReader(nil)
}
return z.writeToReader.DecodeAll(in, make([]byte, outSize))
return z.writeToReader.DecodeAll(in, nil)
}
+3
View File
@@ -7,6 +7,7 @@ import (
"math"
"time"
"github.com/CalebQ42/fuse"
"github.com/CalebQ42/squashfs/internal/decompress"
"github.com/CalebQ42/squashfs/internal/directory"
"github.com/CalebQ42/squashfs/internal/inode"
@@ -16,6 +17,8 @@ import (
type Reader struct {
*FS
con *fuse.Conn
mountDone chan struct{}
d decompress.Decompressor
r io.ReaderAt
fragEntries []fragEntry
+1 -1
View File
@@ -70,7 +70,7 @@ func (r Reader) getReaders(i inode.Inode) (full *data.FullReader, rdr *data.Read
}
fragRdr = io.LimitReader(fragRdr, int64(fragSize))
return fragRdr, nil
}, fragSize)
})
var fragRdr io.Reader
fragRdr, err = r.fragReader(fragInd)
if err != nil {
+3 -3
View File
@@ -196,11 +196,11 @@ func TestFuse(t *testing.T) {
if err != nil {
t.Fatal(err)
}
con, err := rdr.Mount("testing/fuseTest")
err = rdr.Mount("testing/fuseTest")
if err != nil {
t.Fatal(err)
}
defer con.Close()
<-con.Ready
defer rdr.Unmount()
rdr.MountWait()
t.Fatal("testing")
}