perf(stream): improve file stream range reading and caching mechanism (#1001)

* perf(stream): improve file stream range reading and caching mechanism

* 。

* add bytes_test.go

* fix(stream): handle EOF and buffer reading more gracefully

* 注释

* refactor: update CacheFullAndWriter to accept pointer for UpdateProgress

* update tests

* Update drivers/google_drive/util.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com>

* 更优雅的克隆Link

* 修复stream已缓存但无法重复读取

* 将Bytes类型重命名为Reader

* 修复栈溢出

* update tests

---------

Signed-off-by: j2rong4cn <36783515+j2rong4cn@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
j2rong4cn
2025-08-11 23:41:22 +08:00
committed by GitHub
parent 8c244a984d
commit 57fceabcf4
48 changed files with 657 additions and 380 deletions

View File

@ -1,7 +1,6 @@
package stream
import (
"bytes"
"context"
"errors"
"fmt"
@ -13,6 +12,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"go4.org/readerutil"
@ -27,13 +27,19 @@ type FileStream struct {
ForceStreamUpload bool
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
utils.Closers
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
peekBuff *bytes.Reader
tmpFile model.File //if present, tmpFile has full content, it will be deleted at last
peekBuff *buffer.Reader
size int64
oriReader io.Reader // the original reader, used for caching
}
func (f *FileStream) GetSize() int64 {
if f.tmpFile != nil {
info, err := f.tmpFile.Stat()
if f.size > 0 {
return f.size
}
if file, ok := f.tmpFile.(*os.File); ok {
info, err := file.Stat()
if err == nil {
return info.Size()
}
@ -60,14 +66,18 @@ func (f *FileStream) Close() error {
if errors.Is(err1, os.ErrClosed) {
err1 = nil
}
if f.tmpFile != nil {
err2 = os.RemoveAll(f.tmpFile.Name())
if file, ok := f.tmpFile.(*os.File); ok {
err2 = os.RemoveAll(file.Name())
if err2 != nil {
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", f.tmpFile.Name())
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", file.Name())
} else {
f.tmpFile = nil
}
}
if f.peekBuff != nil {
f.peekBuff.Reset()
f.peekBuff = nil
}
return errors.Join(err1, err2)
}
@ -79,20 +89,55 @@ func (f *FileStream) SetExist(obj model.Obj) {
f.Exist = obj
}
// CacheFullInTempFile save all data into tmpFile. Not recommended since it wears disk,
// and can't start upload until the file is written. It's not thread-safe!
func (f *FileStream) CacheFullInTempFile() (model.File, error) {
if file := f.GetFile(); file != nil {
return file, nil
// CacheFullAndWriter save all data into tmpFile or memory.
// It's not thread-safe!
func (f *FileStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
if cache := f.GetFile(); cache != nil {
if writer == nil {
return cache, nil
}
_, err := cache.Seek(0, io.SeekStart)
if err == nil {
reader := f.Reader
if up != nil {
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
*up = model.UpdateProgressWithRange(*up, 50, 100)
reader = &ReaderUpdatingProgress{
Reader: &SimpleReaderWithSize{
Reader: reader,
Size: f.GetSize(),
},
UpdateProgress: cacheProgress,
}
}
_, err = utils.CopyWithBuffer(writer, reader)
if err == nil {
_, err = cache.Seek(0, io.SeekStart)
}
}
if err != nil {
return nil, err
}
return cache, nil
}
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
if err != nil {
return nil, err
reader := f.Reader
if up != nil {
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
*up = model.UpdateProgressWithRange(*up, 50, 100)
reader = &ReaderUpdatingProgress{
Reader: &SimpleReaderWithSize{
Reader: reader,
Size: f.GetSize(),
},
UpdateProgress: cacheProgress,
}
}
f.Add(tmpF)
f.tmpFile = tmpF
f.Reader = tmpF
return tmpF, nil
if writer != nil {
reader = io.TeeReader(reader, writer)
}
f.Reader = reader
return f.cache(f.GetSize())
}
func (f *FileStream) GetFile() model.File {
@ -106,40 +151,68 @@ func (f *FileStream) GetFile() model.File {
}
// RangeRead have to cache all data first since only Reader is provided.
// also support a peeking RangeRead at very start, but won't buffer more than conf.MaxBufferLimit data in memory
// It's not thread-safe!
func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > f.GetSize() {
httpRange.Length = f.GetSize() - httpRange.Start
}
var cache io.ReaderAt = f.GetFile()
if cache != nil {
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
if f.GetFile() != nil {
return io.NewSectionReader(f.GetFile(), httpRange.Start, httpRange.Length), nil
}
size := httpRange.Start + httpRange.Length
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
}
if size <= int64(conf.MaxBufferLimit) {
bufSize := min(size, f.GetSize())
// 使用bytes.Buffer作为io.CopyBuffer的写入对象CopyBuffer会调用Buffer.ReadFrom
// 即使被写入的数据量与Buffer.Cap一致Buffer也会扩大
buf := make([]byte, bufSize)
n, err := io.ReadFull(f.Reader, buf)
if err != nil {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
}
f.peekBuff = bytes.NewReader(buf)
f.Reader = io.MultiReader(f.peekBuff, f.Reader)
cache = f.peekBuff
} else {
var err error
cache, err = f.CacheFullInTempFile()
cache, err := f.cache(size)
if err != nil {
return nil, err
}
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
}
// *旧笔记
// 使用bytes.Buffer作为io.CopyBuffer的写入对象CopyBuffer会调用Buffer.ReadFrom
// 即使被写入的数据量与Buffer.Cap一致Buffer也会扩大
func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
if maxCacheSize > int64(conf.MaxBufferLimit) {
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
if err != nil {
return nil, err
}
f.Add(tmpF)
f.tmpFile = tmpF
f.Reader = tmpF
return tmpF, nil
}
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
if f.peekBuff == nil {
f.peekBuff = &buffer.Reader{}
f.oriReader = f.Reader
}
bufSize := maxCacheSize - int64(f.peekBuff.Len())
buf := make([]byte, bufSize)
n, err := io.ReadFull(f.oriReader, buf)
if bufSize != int64(n) {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
}
f.peekBuff.Append(buf)
if int64(f.peekBuff.Len()) >= f.GetSize() {
f.Reader = f.peekBuff
f.oriReader = nil
} else {
f.Reader = io.MultiReader(f.peekBuff, f.oriReader)
}
return f.peekBuff, nil
}
func (f *FileStream) SetTmpFile(file model.File) {
f.AddIfCloser(file)
f.tmpFile = file
f.Reader = file
}
var _ model.FileStreamer = (*SeekableStream)(nil)
@ -156,7 +229,6 @@ type SeekableStream struct {
*FileStream
// should have one of belows to support rangeRead
rangeReadCloser model.RangeReadCloserIF
size int64
}
func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error) {
@ -178,38 +250,26 @@ func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error
if err != nil {
return nil, err
}
if _, ok := rr.(*model.FileRangeReader); ok {
fs.Reader, err = rr.RangeRead(fs.Ctx, http_range.Range{Length: -1})
if err != nil {
return nil, err
}
fs.Add(link)
return &SeekableStream{FileStream: fs, size: size}, nil
}
rrc := &model.RangeReadCloser{
RangeReader: rr,
}
if _, ok := rr.(*model.FileRangeReader); ok {
fs.Reader, err = rrc.RangeRead(fs.Ctx, http_range.Range{Length: -1})
if err != nil {
return nil, err
}
}
fs.size = size
fs.Add(link)
fs.Add(rrc)
return &SeekableStream{FileStream: fs, rangeReadCloser: rrc, size: size}, nil
return &SeekableStream{FileStream: fs, rangeReadCloser: rrc}, nil
}
return nil, fmt.Errorf("illegal seekableStream")
}
func (ss *SeekableStream) GetSize() int64 {
if ss.size > 0 {
return ss.size
}
return ss.FileStream.GetSize()
}
//func (ss *SeekableStream) Peek(length int) {
//
//}
// RangeRead is not thread-safe, pls use it in single thread only.
func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
if ss.tmpFile == nil && ss.rangeReadCloser != nil {
if ss.GetFile() == nil && ss.rangeReadCloser != nil {
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange)
if err != nil {
return nil, err
@ -219,47 +279,37 @@ func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, erro
return ss.FileStream.RangeRead(httpRange)
}
//func (f *FileStream) GetReader() io.Reader {
// return f.Reader
//}
// only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory
func (ss *SeekableStream) Read(p []byte) (n int, err error) {
if err := ss.generateReader(); err != nil {
return 0, err
}
return ss.FileStream.Read(p)
}
func (ss *SeekableStream) generateReader() error {
if ss.Reader == nil {
if ss.rangeReadCloser == nil {
return 0, fmt.Errorf("illegal seekableStream")
return fmt.Errorf("illegal seekableStream")
}
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, http_range.Range{Length: -1})
if err != nil {
return 0, err
return err
}
ss.Reader = rc
}
return ss.Reader.Read(p)
return nil
}
func (ss *SeekableStream) CacheFullInTempFile() (model.File, error) {
if file := ss.GetFile(); file != nil {
return file, nil
}
tmpF, err := utils.CreateTempFile(ss, ss.GetSize())
if err != nil {
func (ss *SeekableStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
if err := ss.generateReader(); err != nil {
return nil, err
}
ss.Add(tmpF)
ss.tmpFile = tmpF
ss.Reader = tmpF
return tmpF, nil
}
func (f *FileStream) SetTmpFile(r *os.File) {
f.Add(r)
f.tmpFile = r
f.Reader = r
return ss.FileStream.CacheFullAndWriter(up, writer)
}
type ReaderWithSize interface {
io.ReadCloser
io.Reader
GetSize() int64
}
@ -293,7 +343,10 @@ func (r *ReaderUpdatingProgress) Read(p []byte) (n int, err error) {
}
func (r *ReaderUpdatingProgress) Close() error {
return r.Reader.Close()
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
type RangeReadReadAtSeeker struct {
@ -311,19 +364,20 @@ type headCache struct {
func (c *headCache) head(p []byte) (int, error) {
n := 0
for _, buf := range c.bufs {
if len(buf)+n >= len(p) {
n += copy(p[n:], buf[:len(p)-n])
n += copy(p[n:], buf)
if n == len(p) {
return n, nil
} else {
n += copy(p[n:], buf)
}
}
w, err := io.ReadAtLeast(c.reader, p[n:], 1)
if w > 0 {
buf := make([]byte, w)
copy(buf, p[n:n+w])
nn, err := io.ReadFull(c.reader, p[n:])
if nn > 0 {
buf := make([]byte, nn)
copy(buf, p[n:])
c.bufs = append(c.bufs, buf)
n += w
n += nn
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
}
return n, err
}
@ -422,6 +476,9 @@ func (r *RangeReadReadAtSeeker) getReaderAtOffset(off int64) (io.Reader, error)
}
func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 || off >= r.ss.GetSize() {
return 0, io.EOF
}
if off == 0 && r.headCache != nil {
return r.headCache.head(p)
}
@ -430,12 +487,15 @@ func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
if err != nil {
return 0, err
}
n, err = io.ReadAtLeast(rr, p, 1)
off += int64(n)
if err == nil {
r.readerMap.Store(int64(off), rr)
} else {
rr = nil
n, err = io.ReadFull(rr, p)
if n > 0 {
off += int64(n)
switch err {
case nil:
r.readerMap.Store(int64(off), rr)
case io.ErrUnexpectedEOF:
err = io.EOF
}
}
return n, err
}
@ -444,20 +504,14 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
if offset == 0 {
return r.masterOff, nil
}
offset += r.masterOff
case io.SeekEnd:
offset += r.ss.GetSize()
default:
return 0, errs.NotSupport
return 0, errors.New("Seek: invalid whence")
}
if offset < 0 {
return r.masterOff, errors.New("invalid seek: negative position")
}
if offset > r.ss.GetSize() {
offset = r.ss.GetSize()
if offset < 0 || offset > r.ss.GetSize() {
return 0, errors.New("Seek: invalid offset")
}
r.masterOff = offset
return offset, nil
@ -465,6 +519,8 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
n, err = r.ReadAt(p, r.masterOff)
r.masterOff += int64(n)
if n > 0 {
r.masterOff += int64(n)
}
return n, err
}