diff --git a/drivers/115/meta.go b/drivers/115/meta.go index 87f26b59..6a32715e 100644 --- a/drivers/115/meta.go +++ b/drivers/115/meta.go @@ -18,7 +18,6 @@ var config = driver.Config{ Name: "115 Cloud", DefaultRoot: "0", // OnlyProxy: true, - // OnlyLocal: true, // NoOverwriteUpload: true, } diff --git a/drivers/115_open/meta.go b/drivers/115_open/meta.go index 28b53bae..c24b9993 100644 --- a/drivers/115_open/meta.go +++ b/drivers/115_open/meta.go @@ -11,23 +11,14 @@ type Addition struct { // define other OrderBy string `json:"order_by" type:"select" options:"file_name,file_size,user_utime,file_type"` OrderDirection string `json:"order_direction" type:"select" options:"asc,desc"` - LimitRate float64 `json:"limit_rate" type:"float" default:"1" help:"limit all api request rate ([limit]r/1s)"` + LimitRate float64 `json:"limit_rate" type:"float" default:"1" help:"limit all api request rate ([limit]r/1s)"` AccessToken string `json:"access_token" required:"true"` RefreshToken string `json:"refresh_token" required:"true"` } var config = driver.Config{ - Name: "115 Open", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "115 Open", + DefaultRoot: "0", } func init() { diff --git a/drivers/115_share/meta.go b/drivers/115_share/meta.go index 9abed56d..d3b142e8 100644 --- a/drivers/115_share/meta.go +++ b/drivers/115_share/meta.go @@ -19,12 +19,7 @@ type Addition struct { var config = driver.Config{ Name: "115 Share", DefaultRoot: "0", - // OnlyProxy: true, - // OnlyLocal: true, - CheckStatus: false, - Alert: "", - NoOverwriteUpload: true, - NoUpload: true, + NoUpload: true, } func init() { diff --git a/drivers/123_share/meta.go b/drivers/123_share/meta.go index 012e0de0..12c620d7 100644 --- a/drivers/123_share/meta.go +++ b/drivers/123_share/meta.go @@ -15,17 +15,10 @@ type Addition struct { } var config = driver.Config{ - Name: "123PanShare", - LocalSort: true, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: true, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "123PanShare", + LocalSort: true, + NoUpload: true, + DefaultRoot: "0", } func init() { diff --git a/drivers/alias/driver.go b/drivers/alias/driver.go index c2dcfb95..cdfeda2a 100644 --- a/drivers/alias/driver.go +++ b/drivers/alias/driver.go @@ -3,6 +3,7 @@ package alias import ( "context" "errors" + "fmt" "io" stdpath "path" "strings" @@ -11,8 +12,10 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/sign" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" ) type Alias struct { @@ -111,21 +114,43 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) ( return nil, errs.ObjectNotFound } for _, dst := range dsts { - link, err := d.link(ctx, dst, sub, args) - if err == nil { - link.Expiration = nil // 去除非必要缓存,d.link里op.Lin有缓存 - if !args.Redirect && len(link.URL) > 0 { - // 正常情况下 多并发 仅支持返回URL的驱动 - // alias套娃alias 可以让crypt、mega等驱动(不返回URL的) 支持并发 - if d.DownloadConcurrency > 0 { - link.Concurrency = d.DownloadConcurrency - } - if d.DownloadPartSize > 0 { - link.PartSize = d.DownloadPartSize * utils.KB + reqPath := stdpath.Join(dst, sub) + link, file, err := d.link(ctx, reqPath, args) + if err != nil { + continue + } + var resultLink *model.Link + if link != nil { + resultLink = &model.Link{ + URL: link.URL, + Header: link.Header, + RangeReader: link.RangeReader, + SyncClosers: utils.NewSyncClosers(link), + } + if link.MFile != nil { + resultLink.RangeReader = &model.FileRangeReader{ + RangeReaderIF: stream.GetRangeReaderFromMFile(file.GetSize(), link.MFile), } } - return link, nil + + } else { + resultLink = &model.Link{ + URL: fmt.Sprintf("%s/p%s?sign=%s", + common.GetApiUrl(ctx), + utils.EncodePath(reqPath, true), + sign.Sign(reqPath)), + } + } + if !args.Redirect { + if d.DownloadConcurrency > 0 { + resultLink.Concurrency = d.DownloadConcurrency + } + if d.DownloadPartSize > 0 { + resultLink.PartSize = d.DownloadPartSize * utils.KB + } + } + return resultLink, nil } return nil, errs.ObjectNotFound } @@ -251,9 +276,13 @@ func (d *Alias) Put(ctx context.Context, dstDir model.Obj, s model.FileStreamer, reqPath, err := d.getReqPath(ctx, dstDir, true) if err == nil { if len(reqPath) == 1 { - return fs.PutDirectly(ctx, *reqPath[0], s) + return fs.PutDirectly(ctx, *reqPath[0], &stream.FileStream{ + Obj: s, + Mimetype: s.GetMimetype(), + WebPutAsTask: s.NeedStore(), + Reader: s, + }) } else { - defer s.Close() file, err := s.CacheFullInTempFile() if err != nil { return err @@ -338,14 +367,6 @@ func (d *Alias) Extract(ctx context.Context, obj model.Obj, args model.ArchiveIn for _, dst := range dsts { link, err := d.extract(ctx, dst, sub, args) if err == nil { - if !args.Redirect && len(link.URL) > 0 { - if d.DownloadConcurrency > 0 { - link.Concurrency = d.DownloadConcurrency - } - if d.DownloadPartSize > 0 { - link.PartSize = d.DownloadPartSize * utils.KB - } - } return link, nil } } diff --git a/drivers/alias/util.go b/drivers/alias/util.go index 7704b9c5..1ae9c798 100644 --- a/drivers/alias/util.go +++ b/drivers/alias/util.go @@ -96,37 +96,23 @@ func (d *Alias) list(ctx context.Context, dst, sub string, args *fs.ListArgs) ([ }) } -func (d *Alias) link(ctx context.Context, dst, sub string, args model.LinkArgs) (*model.Link, error) { - reqPath := stdpath.Join(dst, sub) - // 参考 crypt 驱动 +func (d *Alias) link(ctx context.Context, reqPath string, args model.LinkArgs) (*model.Link, model.Obj, error) { storage, reqActualPath, err := op.GetStorageAndActualPath(reqPath) if err != nil { - return nil, err + return nil, nil, err } - useRawLink := len(common.GetApiUrl(ctx)) == 0 // ftp、s3 - if !useRawLink { - _, ok := storage.(*Alias) - useRawLink = !ok && !args.Redirect + // proxy || ftp,s3 + if !args.Redirect || len(common.GetApiUrl(ctx)) == 0 { + return op.Link(ctx, storage, reqActualPath, args) } - if useRawLink { - link, _, err := op.Link(ctx, storage, reqActualPath, args) - return link, err - } - _, err = fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true}) + obj, err := fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true}) if err != nil { - return nil, err + return nil, nil, err } - if common.ShouldProxy(storage, stdpath.Base(sub)) { - link := &model.Link{ - URL: fmt.Sprintf("%s/p%s?sign=%s", - common.GetApiUrl(ctx), - utils.EncodePath(reqPath, true), - sign.Sign(reqPath)), - } - return link, nil + if common.ShouldProxy(storage, stdpath.Base(reqPath)) { + return nil, obj, nil } - link, _, err := op.Link(ctx, storage, reqActualPath, args) - return link, err + return op.Link(ctx, storage, reqActualPath, args) } func (d *Alias) getReqPath(ctx context.Context, obj model.Obj, isParent bool) ([]*string, error) { diff --git a/drivers/aliyundrive/driver.go b/drivers/aliyundrive/driver.go index fcceb1be..ae2abb3a 100644 --- a/drivers/aliyundrive/driver.go +++ b/drivers/aliyundrive/driver.go @@ -165,7 +165,7 @@ func (d *AliDrive) Remove(ctx context.Context, obj model.Obj) error { } func (d *AliDrive) Put(ctx context.Context, dstDir model.Obj, streamer model.FileStreamer, up driver.UpdateProgress) error { - file := stream.FileStream{ + file := &stream.FileStream{ Obj: streamer, Reader: streamer, Mimetype: streamer.GetMimetype(), @@ -209,7 +209,7 @@ func (d *AliDrive) Put(ctx context.Context, dstDir model.Obj, streamer model.Fil io.Closer }{ Reader: io.MultiReader(buf, file), - Closer: &file, + Closer: file, } } } else { diff --git a/drivers/aliyundrive_open/meta.go b/drivers/aliyundrive_open/meta.go index a1252525..d76fc2ea 100644 --- a/drivers/aliyundrive_open/meta.go +++ b/drivers/aliyundrive_open/meta.go @@ -25,12 +25,6 @@ type Addition struct { var config = driver.Config{ Name: "AliyundriveOpen", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, DefaultRoot: "root", NoOverwriteUpload: true, } diff --git a/drivers/chaoxing/meta.go b/drivers/chaoxing/meta.go index be7432da..86177daf 100644 --- a/drivers/chaoxing/meta.go +++ b/drivers/chaoxing/meta.go @@ -32,7 +32,6 @@ func init() { config: driver.Config{ Name: "ChaoXingGroupDrive", OnlyProxy: true, - OnlyLocal: false, DefaultRoot: "-1", NoOverwriteUpload: true, }, diff --git a/drivers/cloudreve_v4/meta.go b/drivers/cloudreve_v4/meta.go index de45ea37..304af598 100644 --- a/drivers/cloudreve_v4/meta.go +++ b/drivers/cloudreve_v4/meta.go @@ -26,15 +26,8 @@ type Addition struct { var config = driver.Config{ Name: "Cloudreve V4", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, DefaultRoot: "cloudreve://my", CheckStatus: true, - Alert: "", NoOverwriteUpload: true, } diff --git a/drivers/crypt/driver.go b/drivers/crypt/driver.go index d10c59cc..43e5291b 100644 --- a/drivers/crypt/driver.go +++ b/drivers/crypt/driver.go @@ -1,12 +1,14 @@ package crypt import ( + "bytes" "context" "fmt" "io" stdpath "path" "regexp" "strings" + "sync" "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" @@ -241,6 +243,9 @@ func (d *Crypt) Get(ctx context.Context, path string) (model.Obj, error) { //return nil, errs.ObjectNotFound } +// https://github.com/rclone/rclone/blob/v1.67.0/backend/crypt/cipher.go#L37 +const fileHeaderSize = 32 + func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { dstDirActualPath, err := d.getActualPathForRemote(file.GetPath(), false) if err != nil { @@ -251,58 +256,64 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) ( return nil, err } - if remoteLink.RangeReadCloser == nil && remoteLink.MFile == nil && len(remoteLink.URL) == 0 { + rrf, err := stream.GetRangeReaderFromLink(remoteFile.GetSize(), remoteLink) + if err != nil { + _ = remoteLink.Close() return nil, fmt.Errorf("the remote storage driver need to be enhanced to support encrytion") } - resultRangeReadCloser := &model.RangeReadCloser{} - resultRangeReadCloser.TryAdd(remoteLink.MFile) - if remoteLink.RangeReadCloser != nil { - resultRangeReadCloser.AddClosers(remoteLink.RangeReadCloser.GetClosers()) - } - remoteFileSize := remoteFile.GetSize() - rangeReaderFunc := func(ctx context.Context, underlyingOffset, underlyingLength int64) (io.ReadCloser, error) { - length := underlyingLength - if underlyingLength >= 0 && underlyingOffset+underlyingLength >= remoteFileSize { - length = -1 - } - if remoteLink.MFile != nil { - _, err := remoteLink.MFile.Seek(underlyingOffset, io.SeekStart) - if err != nil { - return nil, err - } - //keep reuse same MFile and close at last. - return io.NopCloser(remoteLink.MFile), nil - } - rrc := remoteLink.RangeReadCloser - if rrc == nil && len(remoteLink.URL) > 0 { - var err error - rrc, err = stream.GetRangeReadCloserFromLink(remoteFileSize, remoteLink) - if err != nil { - return nil, err - } - resultRangeReadCloser.AddClosers(rrc.GetClosers()) - remoteLink.RangeReadCloser = rrc - } - if rrc != nil { - remoteReader, err := rrc.RangeRead(ctx, http_range.Range{Start: underlyingOffset, Length: length}) - if err != nil { - return nil, err - } - return remoteReader, nil - } - return nil, errs.NotSupport - } - resultRangeReadCloser.RangeReader = func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { - readSeeker, err := d.cipher.DecryptDataSeek(ctx, rangeReaderFunc, httpRange.Start, httpRange.Length) + mu := &sync.Mutex{} + var fileHeader []byte + rangeReaderFunc := func(ctx context.Context, offset, limit int64) (io.ReadCloser, error) { + length := limit + if offset == 0 && limit > 0 { + mu.Lock() + if limit <= fileHeaderSize { + defer mu.Unlock() + if fileHeader != nil { + return io.NopCloser(bytes.NewReader(fileHeader[:limit])), nil + } + length = fileHeaderSize + } else if fileHeader == nil { + defer mu.Unlock() + } else { + mu.Unlock() + } + } + + remoteReader, err := rrf.RangeRead(ctx, http_range.Range{Start: offset, Length: length}) if err != nil { return nil, err } - return readSeeker, nil - } + if offset == 0 && limit > 0 { + fileHeader = make([]byte, fileHeaderSize) + n, _ := io.ReadFull(remoteReader, fileHeader) + if n != fileHeaderSize { + fileHeader = nil + return nil, fmt.Errorf("can't read data, expected=%d, got=%d", fileHeaderSize, n) + } + if limit <= fileHeaderSize { + remoteReader.Close() + return io.NopCloser(bytes.NewReader(fileHeader[:limit])), nil + } else { + remoteReader = utils.ReadCloser{ + Reader: io.MultiReader(bytes.NewReader(fileHeader), remoteReader), + Closer: remoteReader, + } + } + } + return remoteReader, nil + } return &model.Link{ - RangeReadCloser: resultRangeReadCloser, + RangeReader: stream.RangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + readSeeker, err := d.cipher.DecryptDataSeek(ctx, rangeReaderFunc, httpRange.Start, httpRange.Length) + if err != nil { + return nil, err + } + return readSeeker, nil + }), + SyncClosers: utils.NewSyncClosers(remoteLink), }, nil } diff --git a/drivers/crypt/meta.go b/drivers/crypt/meta.go index 45201b1e..b2d009ba 100644 --- a/drivers/crypt/meta.go +++ b/drivers/crypt/meta.go @@ -26,17 +26,12 @@ type Addition struct { } var config = driver.Config{ - Name: "Crypt", - LocalSort: true, - OnlyLocal: true, - OnlyProxy: true, - NoCache: true, - NoUpload: false, - NeedMs: false, - DefaultRoot: "/", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "Crypt", + LocalSort: true, + OnlyProxy: true, + NoCache: true, + DefaultRoot: "/", + NoLinkURL: true, } func init() { diff --git a/drivers/doubao/meta.go b/drivers/doubao/meta.go index db5b1586..d2e350e3 100644 --- a/drivers/doubao/meta.go +++ b/drivers/doubao/meta.go @@ -16,17 +16,9 @@ type Addition struct { } var config = driver.Config{ - Name: "Doubao", - LocalSort: true, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "Doubao", + LocalSort: true, + DefaultRoot: "0", } func init() { diff --git a/drivers/doubao_share/meta.go b/drivers/doubao_share/meta.go index 0de0c062..a79c9e19 100644 --- a/drivers/doubao_share/meta.go +++ b/drivers/doubao_share/meta.go @@ -12,17 +12,10 @@ type Addition struct { } var config = driver.Config{ - Name: "DoubaoShare", - LocalSort: true, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: true, - NeedMs: false, - DefaultRoot: "/", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "DoubaoShare", + LocalSort: true, + NoUpload: true, + DefaultRoot: "/", } func init() { diff --git a/drivers/dropbox/meta.go b/drivers/dropbox/meta.go index 98a7806c..fee4ab29 100644 --- a/drivers/dropbox/meta.go +++ b/drivers/dropbox/meta.go @@ -18,13 +18,6 @@ type Addition struct { var config = driver.Config{ Name: "Dropbox", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "", NoOverwriteUpload: true, } diff --git a/drivers/febbox/meta.go b/drivers/febbox/meta.go index d0e5421e..e449ad00 100644 --- a/drivers/febbox/meta.go +++ b/drivers/febbox/meta.go @@ -16,17 +16,9 @@ type Addition struct { } var config = driver.Config{ - Name: "FebBox", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: true, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "FebBox", + NoUpload: true, + DefaultRoot: "0", } func init() { diff --git a/drivers/ftp/driver.go b/drivers/ftp/driver.go index 2ffc7153..647ee1b7 100644 --- a/drivers/ftp/driver.go +++ b/drivers/ftp/driver.go @@ -8,6 +8,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/jlaffaye/ftp" ) @@ -26,7 +27,7 @@ func (d *FTP) GetAddition() driver.Additional { } func (d *FTP) Init(ctx context.Context) error { - return d.login() + return d._login() } func (d *FTP) Drop(ctx context.Context) error { @@ -65,15 +66,22 @@ func (d *FTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*m return nil, err } - r := NewFileReader(d.conn, encode(file.GetPath(), d.Encoding), file.GetSize()) - link := &model.Link{ + remoteFile := NewFileReader(d.conn, encode(file.GetPath(), d.Encoding), file.GetSize()) + if remoteFile != nil && !d.Config().OnlyLinkMFile { + return &model.Link{ + RangeReader: &model.FileRangeReader{ + RangeReaderIF: stream.RateLimitRangeReaderFunc(stream.GetRangeReaderFromMFile(file.GetSize(), remoteFile)), + }, + SyncClosers: utils.NewSyncClosers(remoteFile), + }, nil + } + return &model.Link{ MFile: &stream.RateLimitFile{ - File: r, + File: remoteFile, Limiter: stream.ServerDownloadLimit, Ctx: ctx, }, - } - return link, nil + }, nil } func (d *FTP) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { diff --git a/drivers/ftp/meta.go b/drivers/ftp/meta.go index 04588725..6e8cc107 100644 --- a/drivers/ftp/meta.go +++ b/drivers/ftp/meta.go @@ -31,10 +31,11 @@ type Addition struct { } var config = driver.Config{ - Name: "FTP", - LocalSort: true, - OnlyLocal: true, - DefaultRoot: "/", + Name: "FTP", + LocalSort: true, + OnlyLinkMFile: true, + DefaultRoot: "/", + NoLinkURL: true, } func init() { diff --git a/drivers/ftp/util.go b/drivers/ftp/util.go index 196d874c..9e050b4b 100644 --- a/drivers/ftp/util.go +++ b/drivers/ftp/util.go @@ -1,18 +1,28 @@ package ftp import ( + "fmt" "io" "os" "sync" "sync/atomic" "time" + "github.com/OpenListTeam/OpenList/v4/pkg/singleflight" "github.com/jlaffaye/ftp" ) // do others that not defined in Driver interface func (d *FTP) login() error { + err, _, _ := singleflight.ErrorGroup.Do(fmt.Sprintf("FTP.login:%p", d), func() (error, error) { + return d._login(), nil + }) + return err +} + +func (d *FTP) _login() error { + if d.conn != nil { _, err := d.conn.CurrentDir() if err == nil { diff --git a/drivers/github_releases/meta.go b/drivers/github_releases/meta.go index 947ec4c0..4f3aae7f 100644 --- a/drivers/github_releases/meta.go +++ b/drivers/github_releases/meta.go @@ -15,17 +15,8 @@ type Addition struct { } var config = driver.Config{ - Name: "GitHub Releases", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "GitHub Releases", + NoUpload: true, } func init() { diff --git a/drivers/halalcloud/driver.go b/drivers/halalcloud/driver.go index 04f7fcdd..93783dcd 100644 --- a/drivers/halalcloud/driver.go +++ b/drivers/halalcloud/driver.go @@ -14,6 +14,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -253,8 +254,8 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin chunks := getChunkSizes(result.Sizes) resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { length := httpRange.Length - if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size { - length = -1 + if httpRange.Length < 0 || httpRange.Start+httpRange.Length >= size { + length = size - httpRange.Start } oo := &openObject{ ctx: ctx, @@ -276,10 +277,9 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin duration = time.Until(time.Now().Add(time.Hour)) } - resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader} return &model.Link{ - RangeReadCloser: resultRangeReadCloser, - Expiration: &duration, + RangeReader: stream.RateLimitRangeReaderFunc(resultRangeReader), + Expiration: &duration, }, nil } diff --git a/drivers/halalcloud/meta.go b/drivers/halalcloud/meta.go index eefcdba1..17e4d04b 100644 --- a/drivers/halalcloud/meta.go +++ b/drivers/halalcloud/meta.go @@ -18,17 +18,10 @@ type Addition struct { } var config = driver.Config{ - Name: "HalalCloud", - LocalSort: false, - OnlyLocal: true, - OnlyProxy: true, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "/", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "HalalCloud", + OnlyProxy: true, + DefaultRoot: "/", + NoLinkURL: true, } func init() { diff --git a/drivers/ilanzou/meta.go b/drivers/ilanzou/meta.go index 65b116e4..bbed5164 100644 --- a/drivers/ilanzou/meta.go +++ b/drivers/ilanzou/meta.go @@ -29,17 +29,8 @@ func init() { op.RegisterDriver(func() driver.Driver { return &ILanZou{ config: driver.Config{ - Name: "ILanZou", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "ILanZou", + DefaultRoot: "0", }, conf: Conf{ base: "https://api.ilanzou.com", @@ -55,17 +46,8 @@ func init() { op.RegisterDriver(func() driver.Driver { return &ILanZou{ config: driver.Config{ - Name: "FeijiPan", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "0", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "FeijiPan", + DefaultRoot: "0", }, conf: Conf{ base: "https://api.feijipan.com", diff --git a/drivers/ipfs_api/meta.go b/drivers/ipfs_api/meta.go index a3974c0d..8293932b 100644 --- a/drivers/ipfs_api/meta.go +++ b/drivers/ipfs_api/meta.go @@ -17,7 +17,6 @@ var config = driver.Config{ Name: "IPFS API", DefaultRoot: "/", LocalSort: true, - OnlyProxy: false, } func init() { diff --git a/drivers/kodbox/meta.go b/drivers/kodbox/meta.go index b4d2a816..0b894806 100644 --- a/drivers/kodbox/meta.go +++ b/drivers/kodbox/meta.go @@ -14,8 +14,7 @@ type Addition struct { } var config = driver.Config{ - Name: "KodBox", - DefaultRoot: "", + Name: "KodBox", } func init() { diff --git a/drivers/lenovonas_share/meta.go b/drivers/lenovonas_share/meta.go index 57455668..3ea86de5 100644 --- a/drivers/lenovonas_share/meta.go +++ b/drivers/lenovonas_share/meta.go @@ -13,17 +13,9 @@ type Addition struct { } var config = driver.Config{ - Name: "LenovoNasShare", - LocalSort: true, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: true, - NeedMs: false, - DefaultRoot: "", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "LenovoNasShare", + LocalSort: true, + NoUpload: true, } func init() { diff --git a/drivers/local/driver.go b/drivers/local/driver.go index 64cdbb24..575f3603 100644 --- a/drivers/local/driver.go +++ b/drivers/local/driver.go @@ -19,6 +19,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/sign" + "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/server/common" "github.com/OpenListTeam/times" @@ -220,7 +221,7 @@ func (d *Local) Get(ctx context.Context, path string) (model.Obj, error) { func (d *Local) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { fullPath := file.GetPath() - var link model.Link + link := &model.Link{} if args.Type == "thumb" && utils.Ext(file.GetName()) != "svg" { var buf *bytes.Buffer var thumbPath *string @@ -252,7 +253,14 @@ func (d *Local) Link(ctx context.Context, file model.Obj, args model.LinkArgs) ( } link.MFile = open } - return &link, nil + if link.MFile != nil && !d.Config().OnlyLinkMFile { + link.AddIfCloser(link.MFile) + link.RangeReader = &model.FileRangeReader{ + RangeReaderIF: stream.GetRangeReaderFromMFile(file.GetSize(), link.MFile), + } + link.MFile = nil + } + return link, nil } func (d *Local) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { diff --git a/drivers/local/meta.go b/drivers/local/meta.go index 861aa614..4d531dfd 100644 --- a/drivers/local/meta.go +++ b/drivers/local/meta.go @@ -17,11 +17,12 @@ type Addition struct { } var config = driver.Config{ - Name: "Local", - OnlyLocal: true, - LocalSort: true, - NoCache: true, - DefaultRoot: "/", + Name: "Local", + OnlyLinkMFile: false, + LocalSort: true, + NoCache: true, + DefaultRoot: "/", + NoLinkURL: true, } func init() { diff --git a/drivers/mega/driver.go b/drivers/mega/driver.go index 8b71b75e..996f4dce 100644 --- a/drivers/mega/driver.go +++ b/drivers/mega/driver.go @@ -14,6 +14,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" log "github.com/sirupsen/logrus" "github.com/t3rm1n4l/go-mega" @@ -95,8 +96,8 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (* size := file.GetSize() resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { length := httpRange.Length - if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size { - length = -1 + if httpRange.Length < 0 || httpRange.Start+httpRange.Length >= size { + length = size - httpRange.Start } var down *mega.Download err := utils.Retry(3, time.Second, func() (err error) { @@ -114,11 +115,9 @@ func (d *Mega) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (* return readers.NewLimitedReadCloser(oo, length), nil } - resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader} - resultLink := &model.Link{ - RangeReadCloser: resultRangeReadCloser, - } - return resultLink, nil + return &model.Link{ + RangeReader: stream.RateLimitRangeReaderFunc(resultRangeReader), + }, nil } return nil, fmt.Errorf("unable to convert dir to mega n") } diff --git a/drivers/mega/meta.go b/drivers/mega/meta.go index dfbd07fa..552c7162 100644 --- a/drivers/mega/meta.go +++ b/drivers/mega/meta.go @@ -18,7 +18,8 @@ type Addition struct { var config = driver.Config{ Name: "Mega_nz", LocalSort: true, - OnlyLocal: true, + OnlyProxy: true, + NoLinkURL: true, } func init() { diff --git a/drivers/misskey/meta.go b/drivers/misskey/meta.go index b957674f..8fbb49d9 100644 --- a/drivers/misskey/meta.go +++ b/drivers/misskey/meta.go @@ -15,17 +15,8 @@ type Addition struct { } var config = driver.Config{ - Name: "Misskey", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, - DefaultRoot: "/", - CheckStatus: false, - Alert: "", - NoOverwriteUpload: false, + Name: "Misskey", + DefaultRoot: "/", } func init() { diff --git a/drivers/mopan/meta.go b/drivers/mopan/meta.go index 74200fe8..dd5c289c 100644 --- a/drivers/mopan/meta.go +++ b/drivers/mopan/meta.go @@ -27,8 +27,7 @@ func (a *Addition) GetRootId() string { } var config = driver.Config{ - Name: "MoPan", - // DefaultRoot: "root, / or other", + Name: "MoPan", CheckStatus: true, Alert: "warning|This network disk may store your password in clear text. Please set your password carefully", } diff --git a/drivers/netease_music/driver.go b/drivers/netease_music/driver.go index 83a28309..b03c3b2c 100644 --- a/drivers/netease_music/driver.go +++ b/drivers/netease_music/driver.go @@ -73,7 +73,7 @@ func (d *NeteaseMusic) List(ctx context.Context, dir model.Obj, args model.ListA func (d *NeteaseMusic) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { if lrc, ok := file.(*LyricObj); ok { - if args.Type == "parsed" { + if args.Type == "parsed" && !args.Redirect { return lrc.getLyricLink(), nil } else { return lrc.getProxyLink(ctx), nil diff --git a/drivers/netease_music/types.go b/drivers/netease_music/types.go index c0a2d476..1175ff60 100644 --- a/drivers/netease_music/types.go +++ b/drivers/netease_music/types.go @@ -10,6 +10,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/sign" + "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils/random" "github.com/OpenListTeam/OpenList/v4/server/common" @@ -54,7 +55,9 @@ func (lrc *LyricObj) getProxyLink(ctx context.Context) *model.Link { func (lrc *LyricObj) getLyricLink() *model.Link { return &model.Link{ - MFile: strings.NewReader(lrc.lyric), + RangeReader: &model.FileRangeReader{ + RangeReaderIF: stream.GetRangeReaderFromMFile(int64(len(lrc.lyric)), strings.NewReader(lrc.lyric)), + }, } } diff --git a/drivers/onedrive_sharelink/meta.go b/drivers/onedrive_sharelink/meta.go index 1996285e..10bea87a 100644 --- a/drivers/onedrive_sharelink/meta.go +++ b/drivers/onedrive_sharelink/meta.go @@ -22,7 +22,6 @@ var config = driver.Config{ OnlyProxy: true, NoUpload: true, DefaultRoot: "/", - CheckStatus: false, } func init() { diff --git a/drivers/pikpak/meta.go b/drivers/pikpak/meta.go index c182e620..c602bd2a 100644 --- a/drivers/pikpak/meta.go +++ b/drivers/pikpak/meta.go @@ -17,9 +17,8 @@ type Addition struct { } var config = driver.Config{ - Name: "PikPak", - LocalSort: true, - DefaultRoot: "", + Name: "PikPak", + LocalSort: true, } func init() { diff --git a/drivers/pikpak_share/meta.go b/drivers/pikpak_share/meta.go index 3c3bb4fc..18842de2 100644 --- a/drivers/pikpak_share/meta.go +++ b/drivers/pikpak_share/meta.go @@ -15,10 +15,9 @@ type Addition struct { } var config = driver.Config{ - Name: "PikPakShare", - LocalSort: true, - NoUpload: true, - DefaultRoot: "", + Name: "PikPakShare", + LocalSort: true, + NoUpload: true, } func init() { diff --git a/drivers/quark_open/meta.go b/drivers/quark_open/meta.go index 787c5e8d..3527b52e 100644 --- a/drivers/quark_open/meta.go +++ b/drivers/quark_open/meta.go @@ -28,7 +28,7 @@ func init() { return &QuarkOpen{ config: driver.Config{ Name: "QuarkOpen", - OnlyLocal: true, + OnlyProxy: true, DefaultRoot: "0", NoOverwriteUpload: true, }, diff --git a/drivers/quark_uc/meta.go b/drivers/quark_uc/meta.go index 48185cee..a3694e4e 100644 --- a/drivers/quark_uc/meta.go +++ b/drivers/quark_uc/meta.go @@ -27,7 +27,6 @@ func init() { return &QuarkOrUC{ config: driver.Config{ Name: "Quark", - OnlyLocal: false, DefaultRoot: "0", NoOverwriteUpload: true, }, @@ -43,7 +42,7 @@ func init() { return &QuarkOrUC{ config: driver.Config{ Name: "UC", - OnlyLocal: true, + OnlyProxy: true, DefaultRoot: "0", NoOverwriteUpload: true, }, diff --git a/drivers/quark_uc_tv/meta.go b/drivers/quark_uc_tv/meta.go index 34c41541..f8b3d4ce 100644 --- a/drivers/quark_uc_tv/meta.go +++ b/drivers/quark_uc_tv/meta.go @@ -30,7 +30,6 @@ func init() { return &QuarkUCTV{ config: driver.Config{ Name: "QuarkTV", - OnlyLocal: false, DefaultRoot: "0", NoOverwriteUpload: true, NoUpload: true, @@ -49,7 +48,6 @@ func init() { return &QuarkUCTV{ config: driver.Config{ Name: "UCTV", - OnlyLocal: false, DefaultRoot: "0", NoOverwriteUpload: true, NoUpload: true, diff --git a/drivers/s3/driver.go b/drivers/s3/driver.go index 228eb3fd..d19ead47 100644 --- a/drivers/s3/driver.go +++ b/drivers/s3/driver.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "io" "net/url" stdpath "path" "strings" @@ -158,7 +157,7 @@ func (d *S3) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) e Name: getPlaceholderName(d.Placeholder), Modified: time.Now(), }, - Reader: io.NopCloser(bytes.NewReader([]byte{})), + Reader: bytes.NewReader([]byte{}), Mimetype: "application/octet-stream", }, func(float64) {}) } diff --git a/drivers/sftp/driver.go b/drivers/sftp/driver.go index 4e540621..e0cdda86 100644 --- a/drivers/sftp/driver.go +++ b/drivers/sftp/driver.go @@ -30,7 +30,7 @@ func (d *SFTP) GetAddition() driver.Additional { } func (d *SFTP) Init(ctx context.Context) error { - return d.initClient() + return d._initClient() } func (d *SFTP) Drop(ctx context.Context) error { @@ -63,6 +63,14 @@ func (d *SFTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (* if err != nil { return nil, err } + if remoteFile != nil && !d.Config().OnlyLinkMFile { + return &model.Link{ + RangeReader: &model.FileRangeReader{ + RangeReaderIF: stream.RateLimitRangeReaderFunc(stream.GetRangeReaderFromMFile(file.GetSize(), remoteFile)), + }, + SyncClosers: utils.NewSyncClosers(remoteFile), + }, nil + } return &model.Link{ MFile: &stream.RateLimitFile{ File: remoteFile, diff --git a/drivers/sftp/meta.go b/drivers/sftp/meta.go index 5adc03df..9dada9ef 100644 --- a/drivers/sftp/meta.go +++ b/drivers/sftp/meta.go @@ -16,11 +16,12 @@ type Addition struct { } var config = driver.Config{ - Name: "SFTP", - LocalSort: true, - OnlyLocal: true, - DefaultRoot: "/", - CheckStatus: true, + Name: "SFTP", + LocalSort: true, + OnlyLinkMFile: false, + DefaultRoot: "/", + CheckStatus: true, + NoLinkURL: true, } func init() { diff --git a/drivers/sftp/util.go b/drivers/sftp/util.go index 53f9c379..5c47c532 100644 --- a/drivers/sftp/util.go +++ b/drivers/sftp/util.go @@ -1,8 +1,10 @@ package sftp import ( + "fmt" "path" + "github.com/OpenListTeam/OpenList/v4/pkg/singleflight" "github.com/pkg/sftp" log "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -11,6 +13,12 @@ import ( // do others that not defined in Driver interface func (d *SFTP) initClient() error { + err, _, _ := singleflight.ErrorGroup.Do(fmt.Sprintf("SFTP.initClient:%p", d), func() (error, error) { + return d._initClient(), nil + }) + return err +} +func (d *SFTP) _initClient() error { var auth ssh.AuthMethod if len(d.PrivateKey) > 0 { var err error @@ -52,7 +60,9 @@ func (d *SFTP) clientReconnectOnConnectionError() error { return nil } log.Debugf("[sftp] discarding closed sftp connection: %v", err) - _ = d.client.Close() + if d.client != nil { + _ = d.client.Close() + } err = d.initClient() return err } diff --git a/drivers/smb/driver.go b/drivers/smb/driver.go index 3cdfbbe4..d38c9cef 100644 --- a/drivers/smb/driver.go +++ b/drivers/smb/driver.go @@ -30,10 +30,10 @@ func (d *SMB) GetAddition() driver.Additional { } func (d *SMB) Init(ctx context.Context) error { - if strings.Index(d.Addition.Address, ":") < 0 { + if !strings.Contains(d.Addition.Address, ":") { d.Addition.Address = d.Addition.Address + ":445" } - return d.initFS() + return d._initFS() } func (d *SMB) Drop(ctx context.Context) error { @@ -81,6 +81,13 @@ func (d *SMB) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*m return nil, err } d.updateLastConnTime() + if remoteFile != nil && !d.Config().OnlyLinkMFile { + return &model.Link{ + RangeReader: &model.FileRangeReader{ + RangeReaderIF: stream.RateLimitRangeReaderFunc(stream.GetRangeReaderFromMFile(file.GetSize(), remoteFile)), + }, + }, nil + } return &model.Link{ MFile: &stream.RateLimitFile{ File: remoteFile, diff --git a/drivers/smb/meta.go b/drivers/smb/meta.go index 3031c64f..87a98277 100644 --- a/drivers/smb/meta.go +++ b/drivers/smb/meta.go @@ -14,11 +14,12 @@ type Addition struct { } var config = driver.Config{ - Name: "SMB", - LocalSort: true, - OnlyLocal: true, - DefaultRoot: ".", - NoCache: true, + Name: "SMB", + LocalSort: true, + OnlyLinkMFile: false, + DefaultRoot: ".", + NoCache: true, + NoLinkURL: true, } func init() { diff --git a/drivers/smb/util.go b/drivers/smb/util.go index 662110b0..166e2ae3 100644 --- a/drivers/smb/util.go +++ b/drivers/smb/util.go @@ -1,6 +1,7 @@ package smb import ( + "fmt" "io/fs" "net" "os" @@ -8,6 +9,7 @@ import ( "sync/atomic" "time" + "github.com/OpenListTeam/OpenList/v4/pkg/singleflight" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/hirochachacha/go-smb2" @@ -26,6 +28,12 @@ func (d *SMB) getLastConnTime() time.Time { } func (d *SMB) initFS() error { + err, _, _ := singleflight.ErrorGroup.Do(fmt.Sprintf("SMB.initFS:%p", d), func() (error, error) { + return d._initFS(), nil + }) + return err +} +func (d *SMB) _initFS() error { conn, err := net.Dial("tcp", d.Address) if err != nil { return err diff --git a/drivers/strm/meta.go b/drivers/strm/meta.go index 95bbe068..445cf9b3 100644 --- a/drivers/strm/meta.go +++ b/drivers/strm/meta.go @@ -13,13 +13,14 @@ type Addition struct { } var config = driver.Config{ - Name: "Strm", - LocalSort: true, - NoCache: true, - NoUpload: true, - DefaultRoot: "/", - OnlyLocal: true, - OnlyProxy: true, + Name: "Strm", + LocalSort: true, + NoCache: true, + NoUpload: true, + DefaultRoot: "/", + OnlyLinkMFile: true, + OnlyProxy: true, + NoLinkURL: true, } func init() { diff --git a/drivers/template/meta.go b/drivers/template/meta.go index 6e5c74d8..a546e676 100644 --- a/drivers/template/meta.go +++ b/drivers/template/meta.go @@ -16,7 +16,7 @@ type Addition struct { var config = driver.Config{ Name: "Template", LocalSort: false, - OnlyLocal: false, + OnlyLinkMFile: false, OnlyProxy: false, NoCache: false, NoUpload: false, @@ -25,6 +25,7 @@ var config = driver.Config{ CheckStatus: false, Alert: "", NoOverwriteUpload: false, + NoLinkURL: false, } func init() { diff --git a/drivers/thunderx/meta.go b/drivers/thunderx/meta.go index a3aacf79..15f75f73 100644 --- a/drivers/thunderx/meta.go +++ b/drivers/thunderx/meta.go @@ -85,7 +85,6 @@ func (i *Addition) GetIdentity() string { var config = driver.Config{ Name: "ThunderX", LocalSort: true, - OnlyProxy: false, } var configExpert = driver.Config{ diff --git a/drivers/url_tree/meta.go b/drivers/url_tree/meta.go index c542e741..e298c7be 100644 --- a/drivers/url_tree/meta.go +++ b/drivers/url_tree/meta.go @@ -16,17 +16,10 @@ type Addition struct { } var config = driver.Config{ - Name: "UrlTree", - LocalSort: true, - OnlyLocal: false, - OnlyProxy: false, - NoCache: true, - NoUpload: false, - NeedMs: false, - DefaultRoot: "", - CheckStatus: true, - Alert: "", - NoOverwriteUpload: false, + Name: "UrlTree", + LocalSort: true, + NoCache: true, + CheckStatus: true, } func init() { diff --git a/drivers/virtual/meta.go b/drivers/virtual/meta.go index fc783f53..f567830a 100644 --- a/drivers/virtual/meta.go +++ b/drivers/virtual/meta.go @@ -14,11 +14,11 @@ type Addition struct { } var config = driver.Config{ - Name: "Virtual", - OnlyLocal: true, - LocalSort: true, - NeedMs: true, - //NoCache: true, + Name: "Virtual", + OnlyLinkMFile: true, + LocalSort: true, + NeedMs: true, + NoLinkURL: true, } func init() { diff --git a/drivers/weiyun/meta.go b/drivers/weiyun/meta.go index 0173f92a..2f51be68 100644 --- a/drivers/weiyun/meta.go +++ b/drivers/weiyun/meta.go @@ -14,12 +14,9 @@ type Addition struct { } var config = driver.Config{ - Name: "WeiYun", - LocalSort: false, - OnlyProxy: true, - CheckStatus: true, - Alert: "", - NoOverwriteUpload: false, + Name: "WeiYun", + OnlyProxy: true, + CheckStatus: true, } func init() { diff --git a/drivers/wopan/meta.go b/drivers/wopan/meta.go index b19eff69..55dade1c 100644 --- a/drivers/wopan/meta.go +++ b/drivers/wopan/meta.go @@ -18,15 +18,7 @@ type Addition struct { var config = driver.Config{ Name: "WoPan", - LocalSort: false, - OnlyLocal: false, - OnlyProxy: false, - NoCache: false, - NoUpload: false, - NeedMs: false, DefaultRoot: "0", - CheckStatus: false, - Alert: "", NoOverwriteUpload: true, } diff --git a/internal/driver/config.go b/internal/driver/config.go index 6068143c..bec6d47b 100644 --- a/internal/driver/config.go +++ b/internal/driver/config.go @@ -1,20 +1,26 @@ package driver type Config struct { - Name string `json:"name"` - LocalSort bool `json:"local_sort"` - OnlyLocal bool `json:"only_local"` - OnlyProxy bool `json:"only_proxy"` - NoCache bool `json:"no_cache"` - NoUpload bool `json:"no_upload"` - NeedMs bool `json:"need_ms"` // if need get message from user, such as validate code - DefaultRoot string `json:"default_root"` - CheckStatus bool `json:"-"` - Alert string `json:"alert"` //info,success,warning,danger - NoOverwriteUpload bool `json:"-"` // whether to support overwrite upload - ProxyRangeOption bool `json:"-"` + Name string `json:"name"` + LocalSort bool `json:"local_sort"` + // if the driver returns Link with MFile, this should be set to true + OnlyLinkMFile bool `json:"only_local"` + OnlyProxy bool `json:"only_proxy"` + NoCache bool `json:"no_cache"` + NoUpload bool `json:"no_upload"` + // if need get message from user, such as validate code + NeedMs bool `json:"need_ms"` + DefaultRoot string `json:"default_root"` + CheckStatus bool `json:"-"` + //info,success,warning,danger + Alert string `json:"alert"` + // whether to support overwrite upload + NoOverwriteUpload bool `json:"-"` + ProxyRangeOption bool `json:"-"` + // if the driver returns Link without URL, this should be set to true + NoLinkURL bool `json:"-"` } func (c Config) MustProxy() bool { - return c.OnlyProxy || c.OnlyLocal + return c.OnlyProxy || c.OnlyLinkMFile || c.NoLinkURL } diff --git a/internal/fs/copy.go b/internal/fs/copy.go index bd8be1f2..64186aa6 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -3,7 +3,6 @@ package fs import ( "context" "fmt" - "net/http" stdpath "path" "time" @@ -86,19 +85,17 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool } if !srcObj.IsDir() { // copy file directly - link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{ - Header: http.Header{}, - }) + link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{}) if err != nil { return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath) } - fs := stream.FileStream{ + // any link provided is seekable + ss, err := stream.NewSeekableStream(&stream.FileStream{ Obj: srcObj, Ctx: ctx, - } - // any link provided is seekable - ss, err := stream.NewSeekableStream(fs, link) + }, link) if err != nil { + _ = link.Close() return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath) } return nil, op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false) @@ -165,19 +162,17 @@ func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Drive return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) } tsk.SetTotalBytes(srcFile.GetSize()) - link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{ - Header: http.Header{}, - }) + link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{}) if err != nil { return errors.WithMessagef(err, "failed get [%s] link", srcFilePath) } - fs := stream.FileStream{ + // any link provided is seekable + ss, err := stream.NewSeekableStream(&stream.FileStream{ Obj: srcFile, Ctx: tsk.Ctx(), - } - // any link provided is seekable - ss, err := stream.NewSeekableStream(fs, link) + }, link) if err != nil { + _ = link.Close() return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) } return op.Put(tsk.Ctx(), dstStorage, dstDirPath, ss, tsk.SetProgress, true) diff --git a/internal/fs/move.go b/internal/fs/move.go index 0410e9ce..bc9b4ed9 100644 --- a/internal/fs/move.go +++ b/internal/fs/move.go @@ -3,7 +3,6 @@ package fs import ( "context" "fmt" - "net/http" stdpath "path" "sync" "time" @@ -346,23 +345,18 @@ func (t *MoveTask) copyFile(srcStorage, dstStorage driver.Driver, srcFilePath, d return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) } - link, _, err := op.Link(t.Ctx(), srcStorage, srcFilePath, model.LinkArgs{ - Header: http.Header{}, - }) + link, _, err := op.Link(t.Ctx(), srcStorage, srcFilePath, model.LinkArgs{}) if err != nil { return errors.WithMessagef(err, "failed get [%s] link", srcFilePath) } - - fs := stream.FileStream{ + ss, err := stream.NewSeekableStream(&stream.FileStream{ Obj: srcFile, Ctx: t.Ctx(), - } - - ss, err := stream.NewSeekableStream(fs, link) + }, link) if err != nil { + _ = link.Close() return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) } - return op.Put(t.Ctx(), dstStorage, dstDirPath, ss, nil, true) } diff --git a/internal/fs/put.go b/internal/fs/put.go index 846f6e96..bc59c244 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -10,8 +10,8 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/task" - "github.com/pkg/errors" "github.com/OpenListTeam/tache" + "github.com/pkg/errors" ) type UploadTask struct { @@ -73,9 +73,11 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) func putDirectly(ctx context.Context, dstDirPath string, file model.FileStreamer, lazyCache ...bool) error { storage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) if err != nil { + _ = file.Close() return errors.WithMessage(err, "failed get storage") } if storage.Config().NoUpload { + _ = file.Close() return errors.WithStack(errs.UploadNotSupported) } return op.Put(ctx, storage, dstDirActualPath, file, nil, lazyCache...) diff --git a/internal/model/args.go b/internal/model/args.go index 2477adc0..a6a09bbc 100644 --- a/internal/model/args.go +++ b/internal/model/args.go @@ -2,6 +2,7 @@ package model import ( "context" + "errors" "io" "net/http" "time" @@ -24,16 +25,25 @@ type LinkArgs struct { } type Link struct { - URL string `json:"url"` // most common way - Header http.Header `json:"header"` // needed header (for url) - RangeReadCloser RangeReadCloserIF `json:"-"` // recommended way if can't use URL - MFile io.ReadSeeker `json:"-"` // best for local,smb... file system, which exposes MFile + URL string `json:"url"` // most common way + Header http.Header `json:"header"` // needed header (for url) + RangeReader RangeReaderIF `json:"-"` // recommended way if can't use URL + MFile File `json:"-"` // best for local,smb... file system, which exposes MFile Expiration *time.Duration // local cache expire Duration //for accelerating request, use multi-thread downloading Concurrency int `json:"concurrency"` PartSize int `json:"part_size"` + + utils.SyncClosers `json:"-"` +} + +func (l *Link) Close() error { + if clr, ok := l.MFile.(io.Closer); ok { + return errors.Join(clr.Close(), l.SyncClosers.Close()) + } + return l.SyncClosers.Close() } type OtherArgs struct { @@ -74,23 +84,24 @@ type ArchiveDecompressArgs struct { PutIntoNewDir bool } -type RangeReadCloserIF interface { +type RangeReaderIF interface { RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) +} + +type RangeReadCloserIF interface { + RangeReaderIF utils.ClosersIF } var _ RangeReadCloserIF = (*RangeReadCloser)(nil) type RangeReadCloser struct { - RangeReader RangeReaderFunc + RangeReader RangeReaderIF utils.Closers } func (r *RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { - rc, err := r.RangeReader(ctx, httpRange) - r.Closers.Add(rc) + rc, err := r.RangeReader.RangeRead(ctx, httpRange) + r.Add(rc) return rc, err } - -// type WriterFunc func(w io.Writer) error -type RangeReaderFunc func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) diff --git a/internal/model/file.go b/internal/model/file.go index d3a1fa6a..9fd20adc 100644 --- a/internal/model/file.go +++ b/internal/model/file.go @@ -1,6 +1,9 @@ package model -import "io" +import ( + "errors" + "io" +) // File is basic file level accessing interface type File interface { @@ -8,3 +11,22 @@ type File interface { io.ReaderAt io.Seeker } +type FileCloser struct { + File + io.Closer +} + +func (f *FileCloser) Close() error { + var errs []error + if clr, ok := f.File.(io.Closer); ok { + errs = append(errs, clr.Close()) + } + if f.Closer != nil { + errs = append(errs, f.Closer.Close()) + } + return errors.Join(errs...) +} + +type FileRangeReader struct { + RangeReaderIF +} diff --git a/internal/model/obj.go b/internal/model/obj.go index 5bfcca47..33ae6e3d 100644 --- a/internal/model/obj.go +++ b/internal/model/obj.go @@ -37,7 +37,7 @@ type Obj interface { // FileStreamer ->check FileStream for more comments type FileStreamer interface { io.Reader - io.Closer + utils.ClosersIF Obj GetMimetype() string //SetReader(io.Reader) diff --git a/internal/net/request.go b/internal/net/request.go index 59db7643..9b634ee5 100644 --- a/internal/net/request.go +++ b/internal/net/request.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/http_range" @@ -70,7 +71,7 @@ func (d Downloader) Download(ctx context.Context, p *HttpRequestParams) (readClo var finalP HttpRequestParams awsutil.Copy(&finalP, p) - if finalP.Range.Length == -1 { + if finalP.Range.Length < 0 || finalP.Range.Start+finalP.Range.Length > finalP.Size { finalP.Range.Length = finalP.Size - finalP.Range.Start } impl := downloader{params: &finalP, cfg: d, ctx: ctx} @@ -120,7 +121,7 @@ type ConcurrencyLimit struct { Limit int // 需要大于0 } -var ErrExceedMaxConcurrency = errors.New("ExceedMaxConcurrency") +var ErrExceedMaxConcurrency = ErrorHttpStatusCode(http.StatusTooManyRequests) func (l *ConcurrencyLimit) sub() error { l._m.Lock() @@ -181,6 +182,7 @@ func (d *downloader) download() (io.ReadCloser, error) { resp.Body = utils.NewReadCloser(resp.Body, func() error { d.m.Lock() defer d.m.Unlock() + var err error if closeFunc != nil { d.concurrencyFinish() err = closeFunc() @@ -199,7 +201,7 @@ func (d *downloader) download() (io.ReadCloser, error) { d.pos = d.params.Range.Start d.maxPos = d.params.Range.Start + d.params.Range.Length d.concurrency = d.cfg.Concurrency - d.sendChunkTask(true) + _ = d.sendChunkTask(true) var rc io.ReadCloser = NewMultiReadCloser(d.bufs[0], d.interrupt, d.finishBuf) @@ -303,7 +305,7 @@ func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) { return true, nil } - d.sendChunkTask(false) + _ = d.sendChunkTask(false) d.readingID = id return false, d.getBuf(id) @@ -398,14 +400,15 @@ var errInfiniteRetry = errors.New("infinite retry") func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int64, error) { resp, err := d.cfg.HttpClient(d.ctx, params) if err != nil { - if resp == nil { + statusCode, ok := errors.Unwrap(err).(ErrorHttpStatusCode) + if !ok { return 0, err } - if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + if statusCode == http.StatusRequestedRangeNotSatisfiable { return 0, err } if ch.id == 0 { //第1个任务 有限的重试,超过重试就会结束请求 - switch resp.StatusCode { + switch statusCode { default: return 0, err case http.StatusTooManyRequests: @@ -414,7 +417,7 @@ func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int case http.StatusGatewayTimeout: } <-time.After(time.Millisecond * 200) - return 0, &errNeedRetry{err: fmt.Errorf("http request failure,status: %d", resp.StatusCode)} + return 0, &errNeedRetry{err: err} } // 来到这 说明第1个分片下载 连接成功了 @@ -450,7 +453,7 @@ func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int return 0, err } } - d.sendChunkTask(true) + _ = d.sendChunkTask(true) n, err := utils.CopyWithBuffer(ch.buf, resp.Body) if err != nil { @@ -552,12 +555,26 @@ type chunk struct { func DefaultHttpRequestFunc(ctx context.Context, params *HttpRequestParams) (*http.Response, error) { header := http_range.ApplyRangeToHttpHeader(params.Range, params.HeaderRef) + return RequestHttp(ctx, "GET", header, params.URL) +} - res, err := RequestHttp(ctx, "GET", header, params.URL) - if err != nil { - return res, err +func GetRangeReaderHttpRequestFunc(rangeReader model.RangeReaderIF) HttpRequestFunc { + return func(ctx context.Context, params *HttpRequestParams) (*http.Response, error) { + rc, err := rangeReader.RangeRead(ctx, params.Range) + if err != nil { + return nil, err + } + + return &http.Response{ + StatusCode: http.StatusPartialContent, + Status: http.StatusText(http.StatusPartialContent), + Body: rc, + Header: http.Header{ + "Content-Range": {params.Range.ContentRange(params.Size)}, + }, + ContentLength: params.Range.Length, + }, nil } - return res, nil } type HttpRequestParams struct { diff --git a/internal/net/serve.go b/internal/net/serve.go index 0dac0d3d..4177bf88 100644 --- a/internal/net/serve.go +++ b/internal/net/serve.go @@ -114,14 +114,14 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time // 使用请求的Context // 不然从sendContent读不到数据,即使请求断开CopyBuffer也会一直堵塞 - ctx := context.WithValue(r.Context(), "request_header", r.Header) + ctx := r.Context() switch { case len(ranges) == 0: reader, err := RangeReadCloser.RangeRead(ctx, http_range.Range{Length: -1}) if err != nil { code = http.StatusRequestedRangeNotSatisfiable - if errors.Is(err, ErrExceedMaxConcurrency) { - code = http.StatusTooManyRequests + if statusCode, ok := errors.Unwrap(err).(ErrorHttpStatusCode); ok { + code = int(statusCode) } http.Error(w, err.Error(), code) return nil @@ -143,8 +143,8 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time sendContent, err = RangeReadCloser.RangeRead(ctx, ra) if err != nil { code = http.StatusRequestedRangeNotSatisfiable - if errors.Is(err, ErrExceedMaxConcurrency) { - code = http.StatusTooManyRequests + if statusCode, ok := errors.Unwrap(err).(ErrorHttpStatusCode); ok { + code = int(statusCode) } http.Error(w, err.Error(), code) return nil @@ -205,8 +205,8 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time log.Warnf("Maybe size incorrect or reader not giving correct/full data, or connection closed before finish. written bytes: %d ,sendSize:%d, ", written, sendSize) } code = http.StatusInternalServerError - if errors.Is(err, ErrExceedMaxConcurrency) { - code = http.StatusTooManyRequests + if statusCode, ok := errors.Unwrap(err).(ErrorHttpStatusCode); ok { + code = int(statusCode) } w.WriteHeader(code) return err @@ -259,11 +259,17 @@ func RequestHttp(ctx context.Context, httpMethod string, headerOverride http.Hea _ = res.Body.Close() msg := string(all) log.Debugln(msg) - return res, fmt.Errorf("http request [%s] failure,status: %d response:%s", URL, res.StatusCode, msg) + return nil, fmt.Errorf("http request [%s] failure,status: %w response:%s", URL, ErrorHttpStatusCode(res.StatusCode), msg) } return res, nil } +type ErrorHttpStatusCode int + +func (e ErrorHttpStatusCode) Error() string { + return fmt.Sprintf("%d|%s", e, http.StatusText(int(e))) +} + var once sync.Once var httpClient *http.Client diff --git a/internal/net/util.go b/internal/net/util.go index 40b5e145..fc5921ad 100644 --- a/internal/net/util.go +++ b/internal/net/util.go @@ -350,3 +350,5 @@ func GetRangedHttpReader(readCloser io.ReadCloser, offset, length int64) (io.Rea // return an io.ReadCloser that is limited to `length` bytes. return &LimitedReadCloser{readCloser, length_int}, nil } + +type RequestHeaderKey struct{} diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index c1700b07..2264e1d9 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -3,7 +3,6 @@ package tool import ( "context" "fmt" - "net/http" "os" stdpath "path" "path/filepath" @@ -43,11 +42,11 @@ func (t *TransferTask) Run() error { defer func() { t.SetEndTime(time.Now()) }() if t.SrcStorage == nil { if t.DeletePolicy == UploadDownloadStream { - rrc, err := stream.GetRangeReadCloserFromLink(t.GetTotalBytes(), &model.Link{URL: t.Url}) + rr, err := stream.GetRangeReaderFromLink(t.GetTotalBytes(), &model.Link{URL: t.Url}) if err != nil { return err } - r, err := rrc.RangeRead(t.Ctx(), http_range.Range{Length: t.GetTotalBytes()}) + r, err := rr.RangeRead(t.Ctx(), http_range.Range{Length: t.GetTotalBytes()}) if err != nil { return err } @@ -63,9 +62,8 @@ func (t *TransferTask) Run() error { }, Reader: r, Mimetype: mimetype, - Closers: utils.NewClosers(rrc), + Closers: utils.NewClosers(r), } - defer s.Close() return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress) } return transferStdPath(t) @@ -279,19 +277,17 @@ func transferObjFile(t *TransferTask) error { if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath) } - link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{ - Header: http.Header{}, - }) + link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{}) if err != nil { return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath) } - fs := stream.FileStream{ + // any link provided is seekable + ss, err := stream.NewSeekableStream(&stream.FileStream{ Obj: srcFile, Ctx: t.Ctx(), - } - // any link provided is seekable - ss, err := stream.NewSeekableStream(fs, link) + }, link) if err != nil { + _ = link.Close() return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath) } t.SetTotalBytes(srcFile.GetSize()) diff --git a/internal/op/archive.go b/internal/op/archive.go index 16911f6a..0ae34fd2 100644 --- a/internal/op/archive.go +++ b/internal/op/archive.go @@ -31,12 +31,6 @@ func GetArchiveMeta(ctx context.Context, storage driver.Driver, path string, arg } path = utils.FixAndCleanPath(path) key := Key(storage, path) - if !args.Refresh { - if meta, ok := archiveMetaCache.Get(key); ok { - log.Debugf("use cache when get %s archive meta", path) - return meta, nil - } - } fn := func() (*model.ArchiveMetaProvider, error) { _, m, err := getArchiveMeta(ctx, storage, path, args) if err != nil { @@ -47,10 +41,16 @@ func GetArchiveMeta(ctx context.Context, storage driver.Driver, path string, arg } return m, nil } - if storage.Config().OnlyLocal { + if storage.Config().OnlyLinkMFile { meta, err := fn() return meta, err } + if !args.Refresh { + if meta, ok := archiveMetaCache.Get(key); ok { + log.Debugf("use cache when get %s archive meta", path) + return meta, nil + } + } meta, err, _ := archiveMetaG.Do(key, fn) return meta, err } @@ -62,12 +62,7 @@ func GetArchiveToolAndStream(ctx context.Context, storage driver.Driver, path st } baseName, ext, found := strings.Cut(obj.GetName(), ".") if !found { - if clr, ok := l.MFile.(io.Closer); ok { - _ = clr.Close() - } - if l.RangeReadCloser != nil { - _ = l.RangeReadCloser.Close() - } + _ = l.Close() return nil, nil, nil, errors.Errorf("failed get archive tool: the obj does not have an extension.") } partExt, t, err := tool.GetArchiveTool("." + ext) @@ -75,23 +70,13 @@ func GetArchiveToolAndStream(ctx context.Context, storage driver.Driver, path st var e error partExt, t, e = tool.GetArchiveTool(stdpath.Ext(obj.GetName())) if e != nil { - if clr, ok := l.MFile.(io.Closer); ok { - _ = clr.Close() - } - if l.RangeReadCloser != nil { - _ = l.RangeReadCloser.Close() - } + _ = l.Close() return nil, nil, nil, errors.WithMessagef(stderrors.Join(err, e), "failed get archive tool: %s", ext) } } - ss, err := stream.NewSeekableStream(stream.FileStream{Ctx: ctx, Obj: obj}, l) + ss, err := stream.NewSeekableStream(&stream.FileStream{Ctx: ctx, Obj: obj}, l) if err != nil { - if clr, ok := l.MFile.(io.Closer); ok { - _ = clr.Close() - } - if l.RangeReadCloser != nil { - _ = l.RangeReadCloser.Close() - } + _ = l.Close() return nil, nil, nil, errors.WithMessagef(err, "failed get [%s] stream", path) } ret := []*stream.SeekableStream{ss} @@ -107,14 +92,9 @@ func GetArchiveToolAndStream(ctx context.Context, storage driver.Driver, path st if err != nil { break } - ss, err = stream.NewSeekableStream(stream.FileStream{Ctx: ctx, Obj: o}, l) + ss, err = stream.NewSeekableStream(&stream.FileStream{Ctx: ctx, Obj: o}, l) if err != nil { - if clr, ok := l.MFile.(io.Closer); ok { - _ = clr.Close() - } - if l.RangeReadCloser != nil { - _ = l.RangeReadCloser.Close() - } + _ = l.Close() for _, s := range ret { _ = s.Close() } @@ -375,12 +355,12 @@ func ArchiveGet(ctx context.Context, storage driver.Driver, path string, args mo } type extractLink struct { - Link *model.Link - Obj model.Obj + *model.Link + Obj model.Obj } var extractCache = cache.NewMemCache(cache.WithShards[*extractLink](16)) -var extractG singleflight.Group[*extractLink] +var extractG = singleflight.Group[*extractLink]{Remember: true} func DriverExtract(ctx context.Context, storage driver.Driver, path string, args model.ArchiveInnerArgs) (*model.Link, model.Obj, error) { if storage.Config().CheckStatus && storage.GetStorage().Status != WORK { @@ -389,9 +369,9 @@ func DriverExtract(ctx context.Context, storage driver.Driver, path string, args key := stdpath.Join(Key(storage, path), args.InnerPath) if link, ok := extractCache.Get(key); ok { return link.Link, link.Obj, nil - } else if link, ok := extractCache.Get(key + ":" + args.IP); ok { - return link.Link, link.Obj, nil } + + var forget utils.CloseFunc fn := func() (*extractLink, error) { link, err := driverExtract(ctx, storage, path, args) if err != nil { @@ -400,16 +380,33 @@ func DriverExtract(ctx context.Context, storage driver.Driver, path string, args if link.Link.Expiration != nil { extractCache.Set(key, link, cache.WithEx[*extractLink](*link.Link.Expiration)) } + link.Add(forget) return link, nil } - if storage.Config().OnlyLocal { + + if storage.Config().OnlyLinkMFile { link, err := fn() if err != nil { return nil, nil, err } return link.Link, link.Obj, nil } + + forget = func() error { + if forget != nil { + forget = nil + linkG.Forget(key) + } + return nil + } link, err, _ := extractG.Do(key, fn) + if err == nil && !link.AcquireReference() { + link, err, _ = extractG.Do(key, fn) + if err == nil { + link.AcquireReference() + } + } + if err != nil { return nil, nil, err } diff --git a/internal/op/driver.go b/internal/op/driver.go index bf5c717e..d3fca871 100644 --- a/internal/op/driver.go +++ b/internal/op/driver.go @@ -81,7 +81,15 @@ func getMainItems(config driver.Config) []driver.Item { Help: "The cache expiration time for this storage", }) } - if !config.OnlyProxy && !config.OnlyLocal { + if config.MustProxy() { + items = append(items, driver.Item{ + Name: "webdav_policy", + Type: conf.TypeSelect, + Default: "native_proxy", + Options: "use_proxy_url,native_proxy", + Required: true, + }) + } else { items = append(items, []driver.Item{{ Name: "web_proxy", Type: conf.TypeBool, @@ -104,14 +112,6 @@ func getMainItems(config driver.Config) []driver.Item { } items = append(items, item) } - } else { - items = append(items, driver.Item{ - Name: "webdav_policy", - Type: conf.TypeSelect, - Default: "native_proxy", - Options: "use_proxy_url,native_proxy", - Required: true, - }) } items = append(items, driver.Item{ Name: "down_proxy_url", diff --git a/internal/op/fs.go b/internal/op/fs.go index cae7d95d..04dd9c7e 100644 --- a/internal/op/fs.go +++ b/internal/op/fs.go @@ -244,7 +244,7 @@ func GetUnwrap(ctx context.Context, storage driver.Driver, path string) (model.O } var linkCache = cache.NewMemCache(cache.WithShards[*model.Link](16)) -var linkG singleflight.Group[*model.Link] +var linkG = singleflight.Group[*model.Link]{Remember: true} // Link get link, if is an url. should have an expiry time func Link(ctx context.Context, storage driver.Driver, path string, args model.LinkArgs) (*model.Link, model.Obj, error) { @@ -262,6 +262,8 @@ func Link(ctx context.Context, storage driver.Driver, path string, args model.Li if link, ok := linkCache.Get(key); ok { return link, file, nil } + + var forget utils.CloseFunc fn := func() (*model.Link, error) { link, err := storage.Link(ctx, file, args) if err != nil { @@ -270,15 +272,29 @@ func Link(ctx context.Context, storage driver.Driver, path string, args model.Li if link.Expiration != nil { linkCache.Set(key, link, cache.WithEx[*model.Link](*link.Expiration)) } + link.Add(forget) return link, nil } - if storage.Config().OnlyLocal { + if storage.Config().OnlyLinkMFile { link, err := fn() return link, file, err } + forget = func() error { + if forget != nil { + forget = nil + linkG.Forget(key) + } + return nil + } link, err, _ := linkG.Do(key, fn) + if err == nil && !link.AcquireReference() { + link, err, _ = linkG.Do(key, fn) + if err == nil { + link.AcquireReference() + } + } return link, file, err } @@ -507,14 +523,15 @@ func Remove(ctx context.Context, storage driver.Driver, path string) error { } func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file model.FileStreamer, up driver.UpdateProgress, lazyCache ...bool) error { - if storage.Config().CheckStatus && storage.GetStorage().Status != WORK { - return errors.Errorf("storage not init: %s", storage.GetStorage().Status) - } + close := file.Close defer func() { - if err := file.Close(); err != nil { + if err := close(); err != nil { log.Errorf("failed to close file streamer, %v", err) } }() + if storage.Config().CheckStatus && storage.GetStorage().Status != WORK { + return errors.Errorf("storage not init: %s", storage.GetStorage().Status) + } // UrlTree PUT if storage.GetStorage().Driver == "UrlTree" { var link string diff --git a/internal/stream/limit.go b/internal/stream/limit.go index db5da20d..ee32840d 100644 --- a/internal/stream/limit.go +++ b/internal/stream/limit.go @@ -142,19 +142,19 @@ func (r *RateLimitFile) Close() error { return nil } -type RateLimitRangeReadCloser struct { - model.RangeReadCloserIF - Limiter Limiter -} +type RateLimitRangeReaderFunc RangeReaderFunc -func (rrc *RateLimitRangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { - rc, err := rrc.RangeReadCloserIF.RangeRead(ctx, httpRange) +func (f RateLimitRangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + rc, err := f(ctx, httpRange) if err != nil { return nil, err } - return &RateLimitReader{ - Reader: rc, - Limiter: rrc.Limiter, - Ctx: ctx, - }, nil + if ServerDownloadLimit != nil { + rc = &RateLimitReader{ + Ctx: ctx, + Reader: rc, + Limiter: ServerDownloadLimit, + } + } + return rc, nil } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 1164f80d..f8a96650 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -110,8 +110,7 @@ const InMemoryBufMaxSizeBytes = InMemoryBufMaxSize * 1024 * 1024 // RangeRead have to cache all data first since only Reader is provided. // also support a peeking RangeRead at very start, but won't buffer more than 10MB data in memory func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) { - if httpRange.Length == -1 { - // 参考 internal/net/request.go + if httpRange.Length < 0 || httpRange.Start+httpRange.Length > f.GetSize() { httpRange.Length = f.GetSize() - httpRange.Start } var cache io.ReaderAt = f.GetFile() @@ -159,47 +158,40 @@ var _ model.FileStreamer = (*FileStream)(nil) // additional resources that need to be closed, they should be added to the Closer property of // the SeekableStream object and be closed together when the SeekableStream object is closed. type SeekableStream struct { - FileStream + *FileStream // should have one of belows to support rangeRead rangeReadCloser model.RangeReadCloserIF } -func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error) { +func NewSeekableStream(fs *FileStream, link *model.Link) (*SeekableStream, error) { if len(fs.Mimetype) == 0 { fs.Mimetype = utils.GetMimeType(fs.Obj.GetName()) } - ss := &SeekableStream{FileStream: fs} - if ss.Reader != nil { - ss.TryAdd(ss.Reader) - return ss, nil + + if fs.Reader != nil { + fs.Add(link) + return &SeekableStream{FileStream: fs}, nil } + if link != nil { - if link.MFile != nil { - ss.Closers.TryAdd(link.MFile) - ss.Reader = link.MFile - return ss, nil + rr, err := GetRangeReaderFromLink(fs.GetSize(), link) + if err != nil { + return nil, err } - if link.RangeReadCloser != nil { - ss.rangeReadCloser = &RateLimitRangeReadCloser{ - RangeReadCloserIF: link.RangeReadCloser, - Limiter: ServerDownloadLimit, - } - ss.Add(ss.rangeReadCloser) - return ss, nil - } - if len(link.URL) > 0 { - rrc, err := GetRangeReadCloserFromLink(ss.GetSize(), link) + if _, ok := rr.(*model.FileRangeReader); ok { + fs.Reader, err = rr.RangeRead(fs.Ctx, http_range.Range{Length: -1}) if err != nil { return nil, err } - rrc = &RateLimitRangeReadCloser{ - RangeReadCloserIF: rrc, - Limiter: ServerDownloadLimit, - } - ss.rangeReadCloser = rrc - ss.Add(rrc) - return ss, nil + fs.Add(link) + return &SeekableStream{FileStream: fs}, nil } + rrc := &model.RangeReadCloser{ + RangeReader: rr, + } + fs.Add(link) + fs.Add(rrc) + return &SeekableStream{FileStream: fs, rangeReadCloser: rrc}, nil } return nil, fmt.Errorf("illegal seekableStream") } @@ -211,9 +203,6 @@ func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error) // RangeRead is not thread-safe, pls use it in single thread only. func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) { if ss.tmpFile == nil && ss.rangeReadCloser != nil { - if httpRange.Length == -1 { - httpRange.Length = ss.GetSize() - httpRange.Start - } rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange) if err != nil { return nil, err @@ -229,10 +218,6 @@ func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, erro // only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory func (ss *SeekableStream) Read(p []byte) (n int, err error) { - //f.mu.Lock() - - //f.peekedOnce = true - //defer f.mu.Unlock() if ss.Reader == nil { if ss.rangeReadCloser == nil { return 0, fmt.Errorf("illegal seekableStream") @@ -241,7 +226,7 @@ func (ss *SeekableStream) Read(p []byte) (n int, err error) { if err != nil { return 0, nil } - ss.Reader = io.NopCloser(rc) + ss.Reader = rc } return ss.Reader.Read(p) } @@ -496,7 +481,7 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) { return r.masterOff, errors.New("invalid seek: negative position") } if offset > r.ss.GetSize() { - return r.masterOff, io.EOF + offset = r.ss.GetSize() } r.masterOff = offset return offset, nil diff --git a/internal/stream/util.go b/internal/stream/util.go index 8bbc57a5..5971860c 100644 --- a/internal/stream/util.go +++ b/internal/stream/util.go @@ -3,6 +3,7 @@ package stream import ( "context" "encoding/hex" + "errors" "fmt" "io" "net/http" @@ -14,57 +15,93 @@ import ( log "github.com/sirupsen/logrus" ) -func GetRangeReadCloserFromLink(size int64, link *model.Link) (model.RangeReadCloserIF, error) { - if len(link.URL) == 0 { - return nil, fmt.Errorf("can't create RangeReadCloser since URL is empty in link") - } - rangeReaderFunc := func(ctx context.Context, r http_range.Range) (io.ReadCloser, error) { - if link.Concurrency > 0 || link.PartSize > 0 { - header := net.ProcessHeader(nil, link.Header) - down := net.NewDownloader(func(d *net.Downloader) { - d.Concurrency = link.Concurrency - d.PartSize = link.PartSize - }) - req := &net.HttpRequestParams{ - URL: link.URL, - Range: r, - Size: size, - HeaderRef: header, - } - rc, err := down.Download(ctx, req) - return rc, err +type RangeReaderFunc func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) - } - response, err := RequestRangedHttp(ctx, link, r.Start, r.Length) - if err != nil { - if response == nil { - return nil, fmt.Errorf("http request failure, err:%s", err) +func (f RangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + return f(ctx, httpRange) +} + +func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF, error) { + if link.MFile != nil { + return &model.FileRangeReader{RangeReaderIF: GetRangeReaderFromMFile(size, link.MFile)}, nil + } + if link.Concurrency > 0 || link.PartSize > 0 { + down := net.NewDownloader(func(d *net.Downloader) { + d.Concurrency = link.Concurrency + d.PartSize = link.PartSize + }) + var rangeReader RangeReaderFunc = func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + var req *net.HttpRequestParams + if link.RangeReader != nil { + req = &net.HttpRequestParams{ + Range: httpRange, + Size: size, + } + } else { + requestHeader, _ := ctx.Value(net.RequestHeaderKey{}).(http.Header) + header := net.ProcessHeader(requestHeader, link.Header) + req = &net.HttpRequestParams{ + Range: httpRange, + Size: size, + URL: link.URL, + HeaderRef: header, + } } - return nil, err + return down.Download(ctx, req) } - if r.Start == 0 && (r.Length == -1 || r.Length == size) || response.StatusCode == http.StatusPartialContent || - checkContentRange(&response.Header, r.Start) { + if link.RangeReader != nil { + down.HttpClient = net.GetRangeReaderHttpRequestFunc(link.RangeReader) + return rangeReader, nil + } + return RateLimitRangeReaderFunc(rangeReader), nil + } + + if link.RangeReader != nil { + return link.RangeReader, nil + } + + if len(link.URL) == 0 { + return nil, errors.New("invalid link: must have at least one of MFile, URL, or RangeReader") + } + rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + if httpRange.Length < 0 || httpRange.Start+httpRange.Length > size { + httpRange.Length = size - httpRange.Start + } + requestHeader, _ := ctx.Value(net.RequestHeaderKey{}).(http.Header) + header := net.ProcessHeader(requestHeader, link.Header) + header = http_range.ApplyRangeToHttpHeader(httpRange, header) + + response, err := net.RequestHttp(ctx, "GET", header, link.URL) + if err != nil { + if _, ok := errors.Unwrap(err).(net.ErrorHttpStatusCode); ok { + return nil, err + } + return nil, fmt.Errorf("http request failure, err:%w", err) + } + if httpRange.Start == 0 && (httpRange.Length == -1 || httpRange.Length == size) || response.StatusCode == http.StatusPartialContent || + checkContentRange(&response.Header, httpRange.Start) { return response.Body, nil } else if response.StatusCode == http.StatusOK { log.Warnf("remote http server not supporting range request, expect low perfromace!") - readCloser, err := net.GetRangedHttpReader(response.Body, r.Start, r.Length) + readCloser, err := net.GetRangedHttpReader(response.Body, httpRange.Start, httpRange.Length) if err != nil { return nil, err } return readCloser, nil } - return response.Body, nil } - resultRangeReadCloser := model.RangeReadCloser{RangeReader: rangeReaderFunc} - return &resultRangeReadCloser, nil + return RateLimitRangeReaderFunc(rangeReader), nil } -func RequestRangedHttp(ctx context.Context, link *model.Link, offset, length int64) (*http.Response, error) { - header := net.ProcessHeader(nil, link.Header) - header = http_range.ApplyRangeToHttpHeader(http_range.Range{Start: offset, Length: length}, header) - - return net.RequestHttp(ctx, "GET", header, link.URL) +func GetRangeReaderFromMFile(size int64, file model.File) RangeReaderFunc { + return func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + length := httpRange.Length + if length < 0 || httpRange.Start+length > size { + length = size - httpRange.Start + } + return &model.FileCloser{File: io.NewSectionReader(file, httpRange.Start, length)}, nil + } } // 139 cloud does not properly return 206 http status code, add a hack here diff --git a/pkg/singleflight/singleflight.go b/pkg/singleflight/singleflight.go index 48383478..c686beb1 100644 --- a/pkg/singleflight/singleflight.go +++ b/pkg/singleflight/singleflight.go @@ -73,6 +73,8 @@ type call[T any] struct { type Group[T any] struct { mu sync.Mutex // protects m m map[string]*call[T] // lazily initialized + + Remember bool } // Result holds the results of Do, so they can be passed @@ -156,7 +158,7 @@ func (g *Group[T]) doCall(c *call[T], key string, fn func() (T, error)) { g.mu.Lock() defer g.mu.Unlock() c.wg.Done() - if g.m[key] == c { + if !g.Remember && g.m[key] == c { delete(g.m, key) } diff --git a/pkg/singleflight/var.go b/pkg/singleflight/var.go new file mode 100644 index 00000000..41c97a2e --- /dev/null +++ b/pkg/singleflight/var.go @@ -0,0 +1,3 @@ +package singleflight + +var ErrorGroup Group[error] diff --git a/pkg/utils/io.go b/pkg/utils/io.go index dd158adb..93b3ddbc 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -153,46 +153,99 @@ func Retry(attempts int, sleep time.Duration, f func() error) (err error) { type ClosersIF interface { io.Closer Add(closer io.Closer) - TryAdd(reader io.Reader) - AddClosers(closers Closers) - GetClosers() Closers + AddIfCloser(a any) } +type Closers []io.Closer -type Closers struct { - closers []io.Closer +func (c *Closers) Close() error { + var errs []error + for _, closer := range *c { + if closer != nil { + errs = append(errs, closer.Close()) + } + } + *c = (*c)[:0] + return errors.Join(errs...) } - -func (c *Closers) GetClosers() Closers { - return *c +func (c *Closers) Add(closer io.Closer) { + if closer != nil { + *c = append(*c, closer) + } +} +func (c *Closers) AddIfCloser(a any) { + if closer, ok := a.(io.Closer); ok { + *c = append(*c, closer) + } } var _ ClosersIF = (*Closers)(nil) -func (c *Closers) Close() error { +func NewClosers(c ...io.Closer) Closers { + return Closers(c) +} + +type SyncClosersIF interface { + ClosersIF + AcquireReference() bool +} + +type SyncClosers struct { + closers []io.Closer + mu sync.Mutex + ref int +} + +var _ SyncClosersIF = (*SyncClosers)(nil) + +func (c *SyncClosers) AcquireReference() bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.closers) == 0 { + return false + } + c.ref++ + log.Debugf("SyncClosers.AcquireReference %p,ref=%d\n", c, c.ref) + return true +} + +func (c *SyncClosers) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + defer log.Debugf("SyncClosers.Close %p,ref=%d\n", c, c.ref) + if c.ref > 1 { + c.ref-- + return nil + } + c.ref = 0 + var errs []error for _, closer := range c.closers { if closer != nil { errs = append(errs, closer.Close()) } } + c.closers = c.closers[:0] return errors.Join(errs...) } -func (c *Closers) Add(closer io.Closer) { + +func (c *SyncClosers) Add(closer io.Closer) { if closer != nil { + c.mu.Lock() c.closers = append(c.closers, closer) - } -} -func (c *Closers) AddClosers(closers Closers) { - c.closers = append(c.closers, closers.closers...) -} -func (c *Closers) TryAdd(reader io.Reader) { - if closer, ok := reader.(io.Closer); ok { - c.closers = append(c.closers, closer) + c.mu.Unlock() } } -func NewClosers(c ...io.Closer) Closers { - return Closers{c} +func (c *SyncClosers) AddIfCloser(a any) { + if closer, ok := a.(io.Closer); ok { + c.mu.Lock() + c.closers = append(c.closers, closer) + c.mu.Unlock() + } +} + +func NewSyncClosers(c ...io.Closer) SyncClosers { + return SyncClosers{closers: c} } type Ordered interface { diff --git a/server/common/proxy.go b/server/common/proxy.go index 77ac0973..48b856a6 100644 --- a/server/common/proxy.go +++ b/server/common/proxy.go @@ -12,81 +12,63 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/net" "github.com/OpenListTeam/OpenList/v4/internal/stream" - "github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/utils" ) func Proxy(w http.ResponseWriter, r *http.Request, link *model.Link, file model.Obj) error { if link.MFile != nil { - if clr, ok := link.MFile.(io.Closer); ok { - defer clr.Close() - } - attachHeader(w, file) - contentType := link.Header.Get("Content-Type") - if contentType != "" { - w.Header().Set("Content-Type", contentType) - } + attachHeader(w, file, link.Header) http.ServeContent(w, r, file.GetName(), file.ModTime(), link.MFile) return nil - } else if link.RangeReadCloser != nil { - attachHeader(w, file) - return net.ServeHTTP(w, r, file.GetName(), file.ModTime(), file.GetSize(), &stream.RateLimitRangeReadCloser{ - RangeReadCloserIF: link.RangeReadCloser, - Limiter: stream.ServerDownloadLimit, - }) - } else if link.Concurrency > 0 || link.PartSize > 0 { - attachHeader(w, file) - size := file.GetSize() - rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { - requestHeader := ctx.Value("request_header") - if requestHeader == nil { - requestHeader = http.Header{} - } - header := net.ProcessHeader(requestHeader.(http.Header), link.Header) - down := net.NewDownloader(func(d *net.Downloader) { - d.Concurrency = link.Concurrency - d.PartSize = link.PartSize - }) - req := &net.HttpRequestParams{ - URL: link.URL, - Range: httpRange, - Size: size, - HeaderRef: header, - } - rc, err := down.Download(ctx, req) - return rc, err - } - return net.ServeHTTP(w, r, file.GetName(), file.ModTime(), file.GetSize(), &stream.RateLimitRangeReadCloser{ - RangeReadCloserIF: &model.RangeReadCloser{RangeReader: rangeReader}, - Limiter: stream.ServerDownloadLimit, - }) - } else { - //transparent proxy - header := net.ProcessHeader(r.Header, link.Header) - res, err := net.RequestHttp(r.Context(), r.Method, header, link.URL) - if err != nil { - return err - } - defer res.Body.Close() + } - maps.Copy(w.Header(), res.Header) - w.WriteHeader(res.StatusCode) - if r.Method == http.MethodHead { - return nil + if link.Concurrency > 0 || link.PartSize > 0 { + attachHeader(w, file, link.Header) + rrf, _ := stream.GetRangeReaderFromLink(file.GetSize(), link) + if link.RangeReader == nil { + r = r.WithContext(context.WithValue(r.Context(), net.RequestHeaderKey{}, r.Header)) } - _, err = utils.CopyWithBuffer(w, &stream.RateLimitReader{ - Reader: res.Body, - Limiter: stream.ServerDownloadLimit, - Ctx: r.Context(), + return net.ServeHTTP(w, r, file.GetName(), file.ModTime(), file.GetSize(), &model.RangeReadCloser{ + RangeReader: rrf, }) + } + + if link.RangeReader != nil { + attachHeader(w, file, link.Header) + return net.ServeHTTP(w, r, file.GetName(), file.ModTime(), file.GetSize(), &model.RangeReadCloser{ + RangeReader: link.RangeReader, + }) + } + + //transparent proxy + header := net.ProcessHeader(r.Header, link.Header) + res, err := net.RequestHttp(r.Context(), r.Method, header, link.URL) + if err != nil { return err } + defer res.Body.Close() + + maps.Copy(w.Header(), res.Header) + w.WriteHeader(res.StatusCode) + if r.Method == http.MethodHead { + return nil + } + _, err = utils.CopyWithBuffer(w, &stream.RateLimitReader{ + Reader: res.Body, + Limiter: stream.ServerDownloadLimit, + Ctx: r.Context(), + }) + return err } -func attachHeader(w http.ResponseWriter, file model.Obj) { +func attachHeader(w http.ResponseWriter, file model.Obj, header http.Header) { fileName := file.GetName() w.Header().Set("Content-Disposition", utils.GenerateContentDisposition(fileName)) w.Header().Set("Content-Type", utils.GetMimeType(fileName)) w.Header().Set("Etag", GetEtag(file)) + contentType := header.Get("Content-Type") + if len(contentType) > 0 { + w.Header().Set("Content-Type", contentType) + } } func GetEtag(file model.Obj) string { hash := "" @@ -106,12 +88,12 @@ func ProxyRange(ctx context.Context, link *model.Link, size int64) { if link.MFile != nil { return } - if link.RangeReadCloser == nil && !strings.HasPrefix(link.URL, GetApiUrl(ctx)+"/") { - var rrc, err = stream.GetRangeReadCloserFromLink(size, link) + if link.RangeReader == nil && !strings.HasPrefix(link.URL, GetApiUrl(ctx)+"/") { + rrf, err := stream.GetRangeReaderFromLink(size, link) if err != nil { return } - link.RangeReadCloser = rrc + link.RangeReader = rrf } } diff --git a/server/ftp/fsread.go b/server/ftp/fsread.go index e00f3c76..c52510ec 100644 --- a/server/ftp/fsread.go +++ b/server/ftp/fsread.go @@ -45,12 +45,12 @@ func OpenDownload(ctx context.Context, reqPath string, offset int64) (*FileDownl if err != nil { return nil, err } - fileStream := stream.FileStream{ + ss, err := stream.NewSeekableStream(&stream.FileStream{ Obj: obj, Ctx: ctx, - } - ss, err := stream.NewSeekableStream(fileStream, link) + }, link) if err != nil { + _ = link.Close() return nil, err } reader, err := stream.NewReadAtSeeker(ss, offset) diff --git a/server/handles/archive.go b/server/handles/archive.go index af13e95c..0db4190e 100644 --- a/server/handles/archive.go +++ b/server/handles/archive.go @@ -321,7 +321,7 @@ func ArchiveDown(c *gin.Context) { common.ErrorResp(c, err, 500) return } - down(c, link) + redirect(c, link) } } @@ -351,7 +351,7 @@ func ArchiveProxy(c *gin.Context) { common.ErrorResp(c, err, 500) return } - localProxy(c, link, file, storage.GetStorage().ProxyRange) + proxy(c, link, file, storage.GetStorage().ProxyRange) } else { common.ErrorStrResp(c, "proxy not allowed", 403) return diff --git a/server/handles/down.go b/server/handles/down.go index e6f44974..686e2b2c 100644 --- a/server/handles/down.go +++ b/server/handles/down.go @@ -2,8 +2,8 @@ package handles import ( "bytes" + "errors" "fmt" - "io" stdpath "path" "strconv" "strings" @@ -12,6 +12,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/net" "github.com/OpenListTeam/OpenList/v4/internal/setting" "github.com/OpenListTeam/OpenList/v4/internal/sign" "github.com/OpenListTeam/OpenList/v4/pkg/utils" @@ -44,7 +45,7 @@ func Down(c *gin.Context) { common.ErrorResp(c, err, 500) return } - down(c, link) + redirect(c, link) } } @@ -77,22 +78,15 @@ func Proxy(c *gin.Context) { common.ErrorResp(c, err, 500) return } - localProxy(c, link, file, storage.GetStorage().ProxyRange) + proxy(c, link, file, storage.GetStorage().ProxyRange) } else { common.ErrorStrResp(c, "proxy not allowed", 403) return } } -func down(c *gin.Context, link *model.Link) { - if clr, ok := link.MFile.(io.Closer); ok { - defer func(clr io.Closer) { - err := clr.Close() - if err != nil { - log.Errorf("close link data error: %v", err) - } - }(clr) - } +func redirect(c *gin.Context, link *model.Link) { + defer link.Close() var err error c.Header("Referrer-Policy", "no-referrer") c.Header("Cache-Control", "max-age=0, no-cache, no-store, must-revalidate") @@ -110,7 +104,8 @@ func down(c *gin.Context, link *model.Link) { c.Redirect(302, link.URL) } -func localProxy(c *gin.Context, link *model.Link, file model.Obj, proxyRange bool) { +func proxy(c *gin.Context, link *model.Link, file model.Obj, proxyRange bool) { + defer link.Close() var err error if link.URL != "" && setting.GetBool(conf.ForwardDirectLinkParams) { query := c.Request.URL.Query() @@ -161,7 +156,11 @@ func localProxy(c *gin.Context, link *model.Link, file model.Obj, proxyRange boo if Writer.IsWritten() { log.Errorf("%s %s local proxy error: %+v", c.Request.Method, c.Request.URL.Path, err) } else { - common.ErrorResp(c, err, 500, true) + if statusCode, ok := errors.Unwrap(err).(net.ErrorHttpStatusCode); ok { + common.ErrorResp(c, err, int(statusCode), true) + } else { + common.ErrorResp(c, err, 500, true) + } } } diff --git a/server/handles/fsmanage.go b/server/handles/fsmanage.go index 96ababec..865717bb 100644 --- a/server/handles/fsmanage.go +++ b/server/handles/fsmanage.go @@ -2,7 +2,6 @@ package handles import ( "fmt" - "io" stdpath "path" "github.com/OpenListTeam/OpenList/v4/internal/task" @@ -17,7 +16,6 @@ import ( "github.com/OpenListTeam/OpenList/v4/server/common" "github.com/gin-gonic/gin" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) type MkdirOrLinkReq struct { @@ -376,7 +374,7 @@ func Link(c *gin.Context) { common.ErrorResp(c, err, 500) return } - if storage.Config().OnlyLocal { + if storage.Config().NoLinkURL || storage.Config().OnlyLinkMFile { common.SuccessResp(c, model.Link{ URL: fmt.Sprintf("%s/p%s?d&sign=%s", common.GetApiUrl(c), @@ -385,18 +383,11 @@ func Link(c *gin.Context) { }) return } - link, _, err := fs.Link(c, rawPath, model.LinkArgs{IP: c.ClientIP(), Header: c.Request.Header}) + link, _, err := fs.Link(c, rawPath, model.LinkArgs{IP: c.ClientIP(), Header: c.Request.Header, Redirect: true}) if err != nil { common.ErrorResp(c, err, 500) return } - if clr, ok := link.MFile.(io.Closer); ok { - defer func(clr io.Closer) { - err := clr.Close() - if err != nil { - log.Errorf("close link data error: %v", err) - } - }(clr) - } + defer link.Close() common.SuccessResp(c, link) } diff --git a/server/handles/fsread.go b/server/handles/fsread.go index d0623ff7..b5608750 100644 --- a/server/handles/fsread.go +++ b/server/handles/fsread.go @@ -315,6 +315,7 @@ func FsGet(c *gin.Context) { common.ErrorResp(c, err, 500) return } + defer link.Close() rawURL = link.URL } } diff --git a/server/handles/fsup.go b/server/handles/fsup.go index b361dd8c..ec47ebdd 100644 --- a/server/handles/fsup.go +++ b/server/handles/fsup.go @@ -28,6 +28,12 @@ func getLastModified(c *gin.Context) time.Time { } func FsStream(c *gin.Context) { + defer func() { + if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 { + _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) + } + _ = c.Request.Body.Close() + }() path := c.GetHeader("File-Path") path, err := url.PathUnescape(path) if err != nil { @@ -44,7 +50,6 @@ func FsStream(c *gin.Context) { } if !overwrite { if res, _ := fs.Get(c, path, &fs.GetArgs{NoLog: true}); res != nil { - _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) common.ErrorStrResp(c, "file exists", 403) return } @@ -90,15 +95,11 @@ func FsStream(c *gin.Context) { } else { err = fs.PutDirectly(c, dir, s, true) } - defer c.Request.Body.Close() if err != nil { common.ErrorResp(c, err, 500) return } if t == nil { - if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 { - _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) - } common.SuccessResp(c) return } @@ -108,6 +109,12 @@ func FsStream(c *gin.Context) { } func FsForm(c *gin.Context) { + defer func() { + if n, _ := io.ReadFull(c.Request.Body, []byte{0}); n == 1 { + _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) + } + _ = c.Request.Body.Close() + }() path := c.GetHeader("File-Path") path, err := url.PathUnescape(path) if err != nil { @@ -124,7 +131,6 @@ func FsForm(c *gin.Context) { } if !overwrite { if res, _ := fs.Get(c, path, &fs.GetArgs{NoLog: true}); res != nil { - _, _ = utils.CopyWithBuffer(io.Discard, c.Request.Body) common.ErrorStrResp(c, "file exists", 403) return } @@ -164,7 +170,7 @@ func FsForm(c *gin.Context) { if len(mimetype) == 0 { mimetype = utils.GetMimeType(name) } - s := stream.FileStream{ + s := &stream.FileStream{ Obj: &model.Object{ Name: name, Size: file.Size, @@ -180,9 +186,9 @@ func FsForm(c *gin.Context) { s.Reader = struct { io.Reader }{f} - t, err = fs.PutAsTask(c, dir, &s) + t, err = fs.PutAsTask(c, dir, s) } else { - err = fs.PutDirectly(c, dir, &s, true) + err = fs.PutDirectly(c, dir, s, true) } if err != nil { common.ErrorResp(c, err, 500) diff --git a/server/s3/backend.go b/server/s3/backend.go index 8912d03f..e79d57a9 100644 --- a/server/s3/backend.go +++ b/server/s3/backend.go @@ -142,7 +142,7 @@ func (b *s3Backend) HeadObject(ctx context.Context, bucketName, objectName strin } // GetObject fetchs the object from the filesystem. -func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string, rangeRequest *gofakes3.ObjectRangeRequest) (obj *gofakes3.Object, err error) { +func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string, rangeRequest *gofakes3.ObjectRangeRequest) (s3Obj *gofakes3.Object, err error) { bucket, err := getBucketByName(bucketName) if err != nil { return nil, err @@ -164,6 +164,11 @@ func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string if err != nil { return nil, err } + defer func() { + if s3Obj == nil { + _ = link.Close() + } + }() size := file.GetSize() rnge, err := rangeRequest.Range(size) @@ -171,49 +176,19 @@ func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string return nil, err } - if link.RangeReadCloser == nil && link.MFile == nil && len(link.URL) == 0 { + rrf, err := stream.GetRangeReaderFromLink(size, link) + if err != nil { return nil, fmt.Errorf("the remote storage driver need to be enhanced to support s3") } - var rdr io.ReadCloser - length := int64(-1) - start := int64(0) + var rd io.Reader if rnge != nil { - start, length = rnge.Start, rnge.Length - } - // 参考 server/common/proxy.go - if link.MFile != nil { - _, err := link.MFile.Seek(start, io.SeekStart) - if err != nil { - return nil, err - } - if rdr2, ok := link.MFile.(io.ReadCloser); ok { - rdr = rdr2 - } else { - rdr = io.NopCloser(link.MFile) - } + rd, err = rrf.RangeRead(ctx, http_range.Range(*rnge)) } else { - remoteFileSize := file.GetSize() - if length >= 0 && start+length >= remoteFileSize { - length = -1 - } - rrc := link.RangeReadCloser - if len(link.URL) > 0 { - var converted, err = stream.GetRangeReadCloserFromLink(remoteFileSize, link) - if err != nil { - return nil, err - } - rrc = converted - } - if rrc != nil { - remoteReader, err := rrc.RangeRead(ctx, http_range.Range{Start: start, Length: length}) - if err != nil { - return nil, err - } - rdr = utils.ReadCloser{Reader: remoteReader, Closer: rrc} - } else { - return nil, errs.NotSupport - } + rd, err = rrf.RangeRead(ctx, http_range.Range{Length: -1}) + } + if err != nil { + return nil, err } meta := map[string]string{ @@ -236,7 +211,7 @@ func (b *s3Backend) GetObject(ctx context.Context, bucketName, objectName string Metadata: meta, Size: size, Range: rnge, - Contents: rdr, + Contents: utils.ReadCloser{Reader: rd, Closer: link}, }, nil } @@ -318,11 +293,11 @@ func (b *s3Backend) PutObject( return result, err } - if err := stream.Close(); err != nil { - // remove file when close error occurred (FsPutErr) - _ = fs.Remove(ctx, fp) - return result, err - } + // if err := stream.Close(); err != nil { + // // remove file when close error occurred (FsPutErr) + // _ = fs.Remove(ctx, fp) + // return result, err + // } b.meta.Store(fp, meta) diff --git a/server/webdav/webdav.go b/server/webdav/webdav.go index 42f92db8..15e1ccee 100644 --- a/server/webdav/webdav.go +++ b/server/webdav/webdav.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "io" "net/http" "net/url" "os" @@ -16,6 +17,7 @@ import ( "strings" "time" + "github.com/OpenListTeam/OpenList/v4/internal/net" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/errs" @@ -245,11 +247,15 @@ func (h *Handler) handleGetHeadPost(w http.ResponseWriter, r *http.Request) (sta if err != nil { return http.StatusInternalServerError, err } + defer link.Close() if storage.GetStorage().ProxyRange { common.ProxyRange(ctx, link, fi.GetSize()) } err = common.Proxy(w, r, link, fi) if err != nil { + if statusCode, ok := errors.Unwrap(err).(net.ErrorHttpStatusCode); ok { + return int(statusCode), err + } return http.StatusInternalServerError, fmt.Errorf("webdav proxy error: %+v", err) } } else if storage.GetStorage().WebdavProxy() && downProxyUrl != "" { @@ -264,6 +270,7 @@ func (h *Handler) handleGetHeadPost(w http.ResponseWriter, r *http.Request) (sta if err != nil { return http.StatusInternalServerError, err } + defer link.Close() http.Redirect(w, r, link.URL, http.StatusFound) } return 0, nil @@ -305,6 +312,12 @@ func (h *Handler) handleDelete(w http.ResponseWriter, r *http.Request) (status i } func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request) (status int, err error) { + defer func() { + if n, _ := io.ReadFull(r.Body, []byte{0}); n == 1 { + _, _ = utils.CopyWithBuffer(io.Discard, r.Body) + } + _ = r.Body.Close() + }() reqPath, status, err := h.stripPrefix(r.URL.Path) if err != nil { return status, err @@ -344,8 +357,6 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request) (status int, return http.StatusNotFound, err } - _ = r.Body.Close() - _ = fsStream.Close() // TODO(rost): Returning 405 Method Not Allowed might not be appropriate. if err != nil { return http.StatusMethodNotAllowed, err