2023-08-27 21:14:23 +08:00
|
|
|
|
package stream
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
2023-08-28 18:18:02 +08:00
|
|
|
|
"io"
|
2025-01-18 23:28:12 +08:00
|
|
|
|
"math"
|
2023-08-28 18:18:02 +08:00
|
|
|
|
"os"
|
2025-08-06 13:32:37 +08:00
|
|
|
|
"sync"
|
2023-08-28 18:18:02 +08:00
|
|
|
|
|
2025-08-05 21:42:54 +08:00
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
2025-07-01 09:54:50 +08:00
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/internal/errs"
|
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
2025-08-11 23:41:22 +08:00
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
|
2025-07-01 09:54:50 +08:00
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
2025-03-27 23:20:44 +08:00
|
|
|
|
"go4.org/readerutil"
|
2023-08-27 21:14:23 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type FileStream struct {
|
|
|
|
|
Ctx context.Context
|
|
|
|
|
model.Obj
|
|
|
|
|
io.Reader
|
2024-03-29 14:42:01 +08:00
|
|
|
|
Mimetype string
|
|
|
|
|
WebPutAsTask bool
|
|
|
|
|
ForceStreamUpload bool
|
|
|
|
|
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
|
2023-08-27 21:14:23 +08:00
|
|
|
|
utils.Closers
|
2025-08-11 23:41:22 +08:00
|
|
|
|
|
|
|
|
|
tmpFile model.File //if present, tmpFile has full content, it will be deleted at last
|
|
|
|
|
peekBuff *buffer.Reader
|
|
|
|
|
size int64
|
|
|
|
|
oriReader io.Reader // the original reader, used for caching
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
2023-09-05 13:03:29 +08:00
|
|
|
|
func (f *FileStream) GetSize() int64 {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if f.size > 0 {
|
|
|
|
|
return f.size
|
|
|
|
|
}
|
|
|
|
|
if file, ok := f.tmpFile.(*os.File); ok {
|
|
|
|
|
info, err := file.Stat()
|
2023-09-05 13:03:29 +08:00
|
|
|
|
if err == nil {
|
|
|
|
|
return info.Size()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return f.Obj.GetSize()
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-27 21:14:23 +08:00
|
|
|
|
func (f *FileStream) GetMimetype() string {
|
|
|
|
|
return f.Mimetype
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *FileStream) NeedStore() bool {
|
|
|
|
|
return f.WebPutAsTask
|
|
|
|
|
}
|
2024-03-29 14:42:01 +08:00
|
|
|
|
|
|
|
|
|
func (f *FileStream) IsForceStreamUpload() bool {
|
|
|
|
|
return f.ForceStreamUpload
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-27 21:14:23 +08:00
|
|
|
|
func (f *FileStream) Close() error {
|
|
|
|
|
var err1, err2 error
|
2024-07-14 20:59:24 +08:00
|
|
|
|
|
2023-08-27 21:14:23 +08:00
|
|
|
|
err1 = f.Closers.Close()
|
2024-07-14 20:59:24 +08:00
|
|
|
|
if errors.Is(err1, os.ErrClosed) {
|
|
|
|
|
err1 = nil
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if file, ok := f.tmpFile.(*os.File); ok {
|
|
|
|
|
err2 = os.RemoveAll(file.Name())
|
2023-08-27 21:14:23 +08:00
|
|
|
|
if err2 != nil {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", file.Name())
|
2025-01-18 23:28:12 +08:00
|
|
|
|
} else {
|
|
|
|
|
f.tmpFile = nil
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if f.peekBuff != nil {
|
|
|
|
|
f.peekBuff.Reset()
|
|
|
|
|
f.peekBuff = nil
|
|
|
|
|
}
|
2023-08-27 21:14:23 +08:00
|
|
|
|
|
|
|
|
|
return errors.Join(err1, err2)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *FileStream) GetExist() model.Obj {
|
|
|
|
|
return f.Exist
|
|
|
|
|
}
|
|
|
|
|
func (f *FileStream) SetExist(obj model.Obj) {
|
|
|
|
|
f.Exist = obj
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-11 23:41:22 +08:00
|
|
|
|
// CacheFullAndWriter save all data into tmpFile or memory.
|
|
|
|
|
// It's not thread-safe!
|
|
|
|
|
func (f *FileStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
|
|
|
|
|
if cache := f.GetFile(); cache != nil {
|
|
|
|
|
if writer == nil {
|
|
|
|
|
return cache, nil
|
|
|
|
|
}
|
|
|
|
|
_, err := cache.Seek(0, io.SeekStart)
|
|
|
|
|
if err == nil {
|
|
|
|
|
reader := f.Reader
|
|
|
|
|
if up != nil {
|
|
|
|
|
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
|
|
|
|
|
*up = model.UpdateProgressWithRange(*up, 50, 100)
|
|
|
|
|
reader = &ReaderUpdatingProgress{
|
|
|
|
|
Reader: &SimpleReaderWithSize{
|
|
|
|
|
Reader: reader,
|
|
|
|
|
Size: f.GetSize(),
|
|
|
|
|
},
|
|
|
|
|
UpdateProgress: cacheProgress,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err = utils.CopyWithBuffer(writer, reader)
|
|
|
|
|
if err == nil {
|
|
|
|
|
_, err = cache.Seek(0, io.SeekStart)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return cache, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reader := f.Reader
|
|
|
|
|
if up != nil {
|
|
|
|
|
cacheProgress := model.UpdateProgressWithRange(*up, 0, 50)
|
|
|
|
|
*up = model.UpdateProgressWithRange(*up, 50, 100)
|
|
|
|
|
reader = &ReaderUpdatingProgress{
|
|
|
|
|
Reader: &SimpleReaderWithSize{
|
|
|
|
|
Reader: reader,
|
|
|
|
|
Size: f.GetSize(),
|
|
|
|
|
},
|
|
|
|
|
UpdateProgress: cacheProgress,
|
|
|
|
|
}
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if writer != nil {
|
|
|
|
|
reader = io.TeeReader(reader, writer)
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
f.Reader = reader
|
|
|
|
|
return f.cache(f.GetSize())
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-04-12 16:55:31 +08:00
|
|
|
|
func (f *FileStream) GetFile() model.File {
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if f.tmpFile != nil {
|
2025-04-12 16:55:31 +08:00
|
|
|
|
return f.tmpFile
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
if file, ok := f.Reader.(model.File); ok {
|
2025-04-12 16:55:31 +08:00
|
|
|
|
return file
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-04-12 16:55:31 +08:00
|
|
|
|
return nil
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
2023-08-27 21:14:23 +08:00
|
|
|
|
// RangeRead have to cache all data first since only Reader is provided.
|
2025-08-11 23:41:22 +08:00
|
|
|
|
// It's not thread-safe!
|
2023-08-27 21:14:23 +08:00
|
|
|
|
func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
2025-07-12 17:57:54 +08:00
|
|
|
|
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > f.GetSize() {
|
2025-01-27 20:08:39 +08:00
|
|
|
|
httpRange.Length = f.GetSize() - httpRange.Start
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if f.GetFile() != nil {
|
|
|
|
|
return io.NewSectionReader(f.GetFile(), httpRange.Start, httpRange.Length), nil
|
2025-07-03 10:39:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-04-12 16:55:31 +08:00
|
|
|
|
size := httpRange.Start + httpRange.Length
|
|
|
|
|
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
|
2023-08-27 21:14:23 +08:00
|
|
|
|
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
|
|
|
|
|
cache, err := f.cache(size)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// *旧笔记
|
|
|
|
|
// 使用bytes.Buffer作为io.CopyBuffer的写入对象,CopyBuffer会调用Buffer.ReadFrom
|
|
|
|
|
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大
|
|
|
|
|
|
|
|
|
|
func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
|
|
|
|
|
if maxCacheSize > int64(conf.MaxBufferLimit) {
|
|
|
|
|
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
|
2025-07-03 10:39:34 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
f.Add(tmpF)
|
|
|
|
|
f.tmpFile = tmpF
|
|
|
|
|
f.Reader = tmpF
|
|
|
|
|
return tmpF, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if f.peekBuff == nil {
|
|
|
|
|
f.peekBuff = &buffer.Reader{}
|
|
|
|
|
f.oriReader = f.Reader
|
|
|
|
|
}
|
|
|
|
|
bufSize := maxCacheSize - int64(f.peekBuff.Len())
|
|
|
|
|
buf := make([]byte, bufSize)
|
|
|
|
|
n, err := io.ReadFull(f.oriReader, buf)
|
|
|
|
|
if bufSize != int64(n) {
|
|
|
|
|
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
|
|
|
|
|
}
|
|
|
|
|
f.peekBuff.Append(buf)
|
|
|
|
|
if int64(f.peekBuff.Len()) >= f.GetSize() {
|
|
|
|
|
f.Reader = f.peekBuff
|
|
|
|
|
f.oriReader = nil
|
|
|
|
|
} else {
|
|
|
|
|
f.Reader = io.MultiReader(f.peekBuff, f.oriReader)
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return f.peekBuff, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *FileStream) SetTmpFile(file model.File) {
|
|
|
|
|
f.AddIfCloser(file)
|
|
|
|
|
f.tmpFile = file
|
|
|
|
|
f.Reader = file
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ model.FileStreamer = (*SeekableStream)(nil)
|
|
|
|
|
var _ model.FileStreamer = (*FileStream)(nil)
|
|
|
|
|
|
|
|
|
|
//var _ seekableStream = (*FileStream)(nil)
|
|
|
|
|
|
|
|
|
|
// for most internal stream, which is either RangeReadCloser or MFile
|
2025-03-27 23:20:44 +08:00
|
|
|
|
// Any functionality implemented based on SeekableStream should implement a Close method,
|
|
|
|
|
// whose only purpose is to close the SeekableStream object. If such functionality has
|
|
|
|
|
// additional resources that need to be closed, they should be added to the Closer property of
|
|
|
|
|
// the SeekableStream object and be closed together when the SeekableStream object is closed.
|
2023-08-27 21:14:23 +08:00
|
|
|
|
type SeekableStream struct {
|
2025-07-12 17:57:54 +08:00
|
|
|
|
*FileStream
|
2023-08-27 21:14:23 +08:00
|
|
|
|
// should have one of belows to support rangeRead
|
|
|
|
|
rangeReadCloser model.RangeReadCloserIF
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-12 17:57:54 +08:00
|
|
|
|
func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error) {
|
2023-08-27 21:14:23 +08:00
|
|
|
|
if len(fs.Mimetype) == 0 {
|
|
|
|
|
fs.Mimetype = utils.GetMimeType(fs.Obj.GetName())
|
|
|
|
|
}
|
2025-07-12 17:57:54 +08:00
|
|
|
|
|
|
|
|
|
if fs.Reader != nil {
|
|
|
|
|
fs.Add(link)
|
|
|
|
|
return &SeekableStream{FileStream: fs}, nil
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-07-12 17:57:54 +08:00
|
|
|
|
|
2025-07-03 10:39:34 +08:00
|
|
|
|
if link != nil {
|
2025-07-19 20:36:27 +08:00
|
|
|
|
size := link.ContentLength
|
|
|
|
|
if size <= 0 {
|
|
|
|
|
size = fs.GetSize()
|
|
|
|
|
}
|
|
|
|
|
rr, err := GetRangeReaderFromLink(size, link)
|
2025-07-12 17:57:54 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
rrc := &model.RangeReadCloser{
|
|
|
|
|
RangeReader: rr,
|
|
|
|
|
}
|
2025-07-12 17:57:54 +08:00
|
|
|
|
if _, ok := rr.(*model.FileRangeReader); ok {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
fs.Reader, err = rrc.RangeRead(fs.Ctx, http_range.Range{Length: -1})
|
2023-08-27 21:14:23 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
fs.size = size
|
2025-07-12 17:57:54 +08:00
|
|
|
|
fs.Add(link)
|
|
|
|
|
fs.Add(rrc)
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return &SeekableStream{FileStream: fs, rangeReadCloser: rrc}, nil
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
return nil, fmt.Errorf("illegal seekableStream")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RangeRead is not thread-safe, pls use it in single thread only.
|
|
|
|
|
func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if ss.GetFile() == nil && ss.rangeReadCloser != nil {
|
2023-08-27 21:14:23 +08:00
|
|
|
|
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return rc, nil
|
|
|
|
|
}
|
2025-04-12 16:55:31 +08:00
|
|
|
|
return ss.FileStream.RangeRead(httpRange)
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory
|
|
|
|
|
func (ss *SeekableStream) Read(p []byte) (n int, err error) {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if err := ss.generateReader(); err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
return ss.FileStream.Read(p)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ss *SeekableStream) generateReader() error {
|
2023-08-27 21:14:23 +08:00
|
|
|
|
if ss.Reader == nil {
|
|
|
|
|
if ss.rangeReadCloser == nil {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return fmt.Errorf("illegal seekableStream")
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, http_range.Range{Length: -1})
|
|
|
|
|
if err != nil {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return err
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-07-12 17:57:54 +08:00
|
|
|
|
ss.Reader = rc
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return nil
|
2023-08-27 21:14:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-08-11 23:41:22 +08:00
|
|
|
|
func (ss *SeekableStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
|
|
|
|
|
if err := ss.generateReader(); err != nil {
|
2023-08-27 21:14:23 +08:00
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return ss.FileStream.CacheFullAndWriter(up, writer)
|
2023-09-03 15:40:40 +08:00
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
|
|
|
|
|
type ReaderWithSize interface {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
io.Reader
|
2025-01-18 23:28:12 +08:00
|
|
|
|
GetSize() int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type SimpleReaderWithSize struct {
|
|
|
|
|
io.Reader
|
|
|
|
|
Size int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *SimpleReaderWithSize) GetSize() int64 {
|
|
|
|
|
return r.Size
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-16 12:22:11 +08:00
|
|
|
|
func (r *SimpleReaderWithSize) Close() error {
|
|
|
|
|
if c, ok := r.Reader.(io.Closer); ok {
|
|
|
|
|
return c.Close()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-18 23:28:12 +08:00
|
|
|
|
type ReaderUpdatingProgress struct {
|
|
|
|
|
Reader ReaderWithSize
|
|
|
|
|
model.UpdateProgress
|
|
|
|
|
offset int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *ReaderUpdatingProgress) Read(p []byte) (n int, err error) {
|
|
|
|
|
n, err = r.Reader.Read(p)
|
|
|
|
|
r.offset += n
|
|
|
|
|
r.UpdateProgress(math.Min(100.0, float64(r.offset)/float64(r.Reader.GetSize())*100.0))
|
|
|
|
|
return n, err
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-16 12:22:11 +08:00
|
|
|
|
func (r *ReaderUpdatingProgress) Close() error {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if c, ok := r.Reader.(io.Closer); ok {
|
|
|
|
|
return c.Close()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
2025-02-16 12:22:11 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-01-18 23:28:12 +08:00
|
|
|
|
type RangeReadReadAtSeeker struct {
|
|
|
|
|
ss *SeekableStream
|
|
|
|
|
masterOff int64
|
2025-08-06 13:32:37 +08:00
|
|
|
|
readerMap sync.Map
|
2025-03-27 23:20:44 +08:00
|
|
|
|
headCache *headCache
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type headCache struct {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
reader io.Reader
|
|
|
|
|
bufs [][]byte
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *headCache) head(p []byte) (int, error) {
|
|
|
|
|
n := 0
|
|
|
|
|
for _, buf := range c.bufs {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
n += copy(p[n:], buf)
|
|
|
|
|
if n == len(p) {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return n, nil
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
nn, err := io.ReadFull(c.reader, p[n:])
|
|
|
|
|
if nn > 0 {
|
|
|
|
|
buf := make([]byte, nn)
|
|
|
|
|
copy(buf, p[n:])
|
2025-08-06 13:32:37 +08:00
|
|
|
|
c.bufs = append(c.bufs, buf)
|
2025-08-11 23:41:22 +08:00
|
|
|
|
n += nn
|
|
|
|
|
if err == io.ErrUnexpectedEOF {
|
|
|
|
|
err = io.EOF
|
|
|
|
|
}
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return n, err
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
|
2025-03-27 23:20:44 +08:00
|
|
|
|
func (r *headCache) Close() error {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
clear(r.bufs)
|
2025-01-27 20:08:56 +08:00
|
|
|
|
r.bufs = nil
|
|
|
|
|
return nil
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-01-27 20:08:56 +08:00
|
|
|
|
func (r *RangeReadReadAtSeeker) InitHeadCache() {
|
2025-07-03 10:39:34 +08:00
|
|
|
|
if r.ss.GetFile() == nil && r.masterOff == 0 {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
value, _ := r.readerMap.LoadAndDelete(int64(0))
|
|
|
|
|
r.headCache = &headCache{reader: value.(io.Reader)}
|
2025-03-27 23:20:44 +08:00
|
|
|
|
r.ss.Closers.Add(r.headCache)
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-07-03 10:39:34 +08:00
|
|
|
|
func NewReadAtSeeker(ss *SeekableStream, offset int64, forceRange ...bool) (model.File, error) {
|
|
|
|
|
if ss.GetFile() != nil {
|
|
|
|
|
_, err := ss.GetFile().Seek(offset, io.SeekStart)
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-07-03 10:39:34 +08:00
|
|
|
|
return ss.GetFile(), nil
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-01-27 20:08:56 +08:00
|
|
|
|
r := &RangeReadReadAtSeeker{
|
|
|
|
|
ss: ss,
|
|
|
|
|
masterOff: offset,
|
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if offset != 0 || utils.IsBool(forceRange...) {
|
|
|
|
|
if offset < 0 || offset > ss.GetSize() {
|
|
|
|
|
return nil, errors.New("offset out of range")
|
|
|
|
|
}
|
2025-01-27 20:08:56 +08:00
|
|
|
|
_, err := r.getReaderAtOffset(offset)
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
r.readerMap.Store(int64(offset), ss)
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-01-27 20:08:56 +08:00
|
|
|
|
return r, nil
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-03-27 23:20:44 +08:00
|
|
|
|
func NewMultiReaderAt(ss []*SeekableStream) (readerutil.SizeReaderAt, error) {
|
|
|
|
|
readers := make([]readerutil.SizeReaderAt, 0, len(ss))
|
|
|
|
|
for _, s := range ss {
|
|
|
|
|
ra, err := NewReadAtSeeker(s, 0)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
readers = append(readers, io.NewSectionReader(ra, 0, s.GetSize()))
|
|
|
|
|
}
|
|
|
|
|
return readerutil.NewMultiReaderAt(readers...), nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-06 13:32:37 +08:00
|
|
|
|
func (r *RangeReadReadAtSeeker) getReaderAtOffset(off int64) (io.Reader, error) {
|
|
|
|
|
var rr io.Reader
|
|
|
|
|
var cur int64 = -1
|
|
|
|
|
r.readerMap.Range(func(key, value any) bool {
|
|
|
|
|
k := key.(int64)
|
|
|
|
|
if off == k {
|
|
|
|
|
cur = k
|
|
|
|
|
rr = value.(io.Reader)
|
|
|
|
|
return false
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
if off > k && off-k <= 4*utils.MB && (rr == nil || k < cur) {
|
|
|
|
|
rr = value.(io.Reader)
|
|
|
|
|
cur = k
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
if cur >= 0 {
|
|
|
|
|
r.readerMap.Delete(int64(cur))
|
|
|
|
|
}
|
|
|
|
|
if off == int64(cur) {
|
|
|
|
|
// logrus.Debugf("getReaderAtOffset match_%d", off)
|
|
|
|
|
return rr, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if rr != nil {
|
|
|
|
|
n, _ := utils.CopyWithBufferN(io.Discard, rr, off-cur)
|
|
|
|
|
cur += n
|
|
|
|
|
if cur == off {
|
|
|
|
|
// logrus.Debugf("getReaderAtOffset old_%d", off)
|
|
|
|
|
return rr, nil
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
// logrus.Debugf("getReaderAtOffset new_%d", off)
|
2025-01-27 20:08:56 +08:00
|
|
|
|
|
2025-08-06 13:32:37 +08:00
|
|
|
|
reader, err := r.ss.RangeRead(http_range.Range{Start: off, Length: -1})
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return reader, nil
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
2025-08-06 13:32:37 +08:00
|
|
|
|
func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if off < 0 || off >= r.ss.GetSize() {
|
|
|
|
|
return 0, io.EOF
|
|
|
|
|
}
|
2025-01-27 20:08:56 +08:00
|
|
|
|
if off == 0 && r.headCache != nil {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return r.headCache.head(p)
|
2025-01-27 20:08:56 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
var rr io.Reader
|
|
|
|
|
rr, err = r.getReaderAtOffset(off)
|
2025-01-18 23:28:12 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
n, err = io.ReadFull(rr, p)
|
|
|
|
|
if n > 0 {
|
|
|
|
|
off += int64(n)
|
|
|
|
|
switch err {
|
|
|
|
|
case nil:
|
|
|
|
|
r.readerMap.Store(int64(off), rr)
|
|
|
|
|
case io.ErrUnexpectedEOF:
|
|
|
|
|
err = io.EOF
|
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-08-06 13:32:37 +08:00
|
|
|
|
return n, err
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
|
switch whence {
|
|
|
|
|
case io.SeekStart:
|
|
|
|
|
case io.SeekCurrent:
|
|
|
|
|
offset += r.masterOff
|
|
|
|
|
case io.SeekEnd:
|
|
|
|
|
offset += r.ss.GetSize()
|
|
|
|
|
default:
|
2025-08-11 23:41:22 +08:00
|
|
|
|
return 0, errors.New("Seek: invalid whence")
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if offset < 0 || offset > r.ss.GetSize() {
|
|
|
|
|
return 0, errors.New("Seek: invalid offset")
|
2025-01-18 23:28:12 +08:00
|
|
|
|
}
|
|
|
|
|
r.masterOff = offset
|
|
|
|
|
return offset, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
|
2025-08-06 13:32:37 +08:00
|
|
|
|
n, err = r.ReadAt(p, r.masterOff)
|
2025-08-11 23:41:22 +08:00
|
|
|
|
if n > 0 {
|
|
|
|
|
r.masterOff += int64(n)
|
|
|
|
|
}
|
2025-01-18 23:28:12 +08:00
|
|
|
|
return n, err
|
|
|
|
|
}
|