From 3838ef06637f502f299b2bcd90752d7788164db5 Mon Sep 17 00:00:00 2001 From: Seven <53081179+Seven66677731@users.noreply.github.com> Date: Tue, 8 Jul 2025 21:41:45 +0800 Subject: [PATCH] feat(traffic): update progress when caching file (#646) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(traffic): update progress when caching file * 调整参数位置和命名 --------- Co-authored-by: j2rong4cn --- drivers/115/driver.go | 4 ++- drivers/115_open/driver.go | 8 +++-- drivers/123/driver.go | 4 ++- drivers/123_open/driver.go | 5 +-- drivers/139/driver.go | 4 ++- drivers/189pc/utils.go | 4 ++- drivers/aliyundrive_open/upload.go | 4 ++- drivers/ilanzou/driver.go | 4 ++- drivers/quark_open/driver.go | 11 +++--- drivers/quark_uc/driver.go | 4 ++- drivers/thunder/driver.go | 4 ++- drivers/thunder_browser/driver.go | 4 ++- drivers/thunderx/driver.go | 4 ++- internal/fs/archive.go | 6 ++-- internal/model/obj.go | 13 +++++++ internal/stream/util.go | 58 +++++++++++++++++------------- 16 files changed, 96 insertions(+), 45 deletions(-) diff --git a/drivers/115/driver.go b/drivers/115/driver.go index dc8e2726..49570984 100644 --- a/drivers/115/driver.go +++ b/drivers/115/driver.go @@ -186,7 +186,9 @@ func (d *Pan115) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr preHash = strings.ToUpper(preHash) fullHash := stream.GetHash().GetHash(utils.SHA1) if len(fullHash) != utils.SHA1.Width { - _, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA1) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA1) if err != nil { return nil, err } diff --git a/drivers/115_open/driver.go b/drivers/115_open/driver.go index ef57ae63..2d184e4d 100644 --- a/drivers/115_open/driver.go +++ b/drivers/115_open/driver.go @@ -8,6 +8,7 @@ import ( "strings" "time" + sdk "github.com/OpenListTeam/115-sdk-go" "github.com/OpenListTeam/OpenList/v4/cmd/flags" "github.com/OpenListTeam/OpenList/v4/drivers/base" "github.com/OpenListTeam/OpenList/v4/internal/driver" @@ -16,7 +17,6 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/utils" - sdk "github.com/OpenListTeam/115-sdk-go" "golang.org/x/time/rate" ) @@ -222,7 +222,9 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre } sha1 := file.GetHash().GetHash(utils.SHA1) if len(sha1) != utils.SHA1.Width { - _, sha1, err = stream.CacheFullInTempFileAndHash(file, utils.SHA1) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, sha1, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.SHA1) if err != nil { return err } @@ -252,6 +254,7 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre return err } if resp.Status == 2 { + up(100) return nil } // 2. two way verify @@ -286,6 +289,7 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre return err } if resp.Status == 2 { + up(100) return nil } } diff --git a/drivers/123/driver.go b/drivers/123/driver.go index 6d473d17..0ecaf45c 100644 --- a/drivers/123/driver.go +++ b/drivers/123/driver.go @@ -188,7 +188,9 @@ func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea etag := file.GetHash().GetHash(utils.MD5) var err error if len(etag) < utils.MD5.Width { - _, etag, err = stream.CacheFullInTempFileAndHash(file, utils.MD5) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5) if err != nil { return err } diff --git a/drivers/123_open/driver.go b/drivers/123_open/driver.go index e7f4602b..8d1be567 100644 --- a/drivers/123_open/driver.go +++ b/drivers/123_open/driver.go @@ -109,7 +109,9 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre etag := file.GetHash().GetHash(utils.MD5) if len(etag) < utils.MD5.Width { - _, etag, err = stream.CacheFullInTempFileAndHash(file, utils.MD5) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5) if err != nil { return err } @@ -121,7 +123,6 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre if createResp.Data.Reuse { return nil } - up(10) return d.Upload(ctx, file, createResp, up) } diff --git a/drivers/139/driver.go b/drivers/139/driver.go index f5ad71e9..b3e5e2a6 100644 --- a/drivers/139/driver.go +++ b/drivers/139/driver.go @@ -522,7 +522,9 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr var err error fullHash := stream.GetHash().GetHash(utils.SHA256) if len(fullHash) != utils.SHA256.Width { - _, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA256) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA256) if err != nil { return err } diff --git a/drivers/189pc/utils.go b/drivers/189pc/utils.go index 56f44a5f..fc7cb98a 100644 --- a/drivers/189pc/utils.go +++ b/drivers/189pc/utils.go @@ -820,7 +820,9 @@ func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, isFamily bool, uplo // 旧版本上传,家庭云不支持覆盖 func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) { - tempFile, fileMd5, err := stream.CacheFullInTempFileAndHash(file, utils.MD5) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + tempFile, fileMd5, err := stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5) if err != nil { return nil, err } diff --git a/drivers/aliyundrive_open/upload.go b/drivers/aliyundrive_open/upload.go index 10ae4bfc..51a63017 100644 --- a/drivers/aliyundrive_open/upload.go +++ b/drivers/aliyundrive_open/upload.go @@ -194,7 +194,9 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m hash := stream.GetHash().GetHash(utils.SHA1) if len(hash) != utils.SHA1.Width { - _, hash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA1) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, hash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA1) if err != nil { return nil, err } diff --git a/drivers/ilanzou/driver.go b/drivers/ilanzou/driver.go index 0b447818..59b24b53 100644 --- a/drivers/ilanzou/driver.go +++ b/drivers/ilanzou/driver.go @@ -276,7 +276,9 @@ func (d *ILanZou) Put(ctx context.Context, dstDir model.Obj, s model.FileStreame etag := s.GetHash().GetHash(utils.MD5) var err error if len(etag) != utils.MD5.Width { - _, etag, err = stream.CacheFullInTempFileAndHash(s, utils.MD5) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, etag, err = stream.CacheFullInTempFileAndHash(s, cacheFileProgress, utils.MD5) if err != nil { return nil, err } diff --git a/drivers/quark_open/driver.go b/drivers/quark_open/driver.go index 5e8e7a53..cc757580 100644 --- a/drivers/quark_open/driver.go +++ b/drivers/quark_open/driver.go @@ -6,6 +6,10 @@ import ( "encoding/hex" "errors" "fmt" + "hash" + "io" + "net/http" + "github.com/OpenListTeam/OpenList/v4/drivers/base" "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" @@ -13,9 +17,6 @@ import ( streamPkg "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/go-resty/resty/v2" - "hash" - "io" - "net/http" ) type QuarkOpen struct { @@ -157,7 +158,9 @@ func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.File } if len(writers) > 0 { - _, err := streamPkg.CacheFullInTempFileAndWriter(stream, io.MultiWriter(writers...)) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, err := streamPkg.CacheFullInTempFileAndWriter(stream, cacheFileProgress, io.MultiWriter(writers...)) if err != nil { return err } diff --git a/drivers/quark_uc/driver.go b/drivers/quark_uc/driver.go index 4235acc6..5d658fb8 100644 --- a/drivers/quark_uc/driver.go +++ b/drivers/quark_uc/driver.go @@ -144,7 +144,9 @@ func (d *QuarkOrUC) Put(ctx context.Context, dstDir model.Obj, stream model.File } if len(writers) > 0 { - _, err := streamPkg.CacheFullInTempFileAndWriter(stream, io.MultiWriter(writers...)) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, err := streamPkg.CacheFullInTempFileAndWriter(stream, cacheFileProgress, io.MultiWriter(writers...)) if err != nil { return err } diff --git a/drivers/thunder/driver.go b/drivers/thunder/driver.go index 78dd2273..83cb4b3f 100644 --- a/drivers/thunder/driver.go +++ b/drivers/thunder/driver.go @@ -371,7 +371,9 @@ func (xc *XunLeiCommon) Put(ctx context.Context, dstDir model.Obj, file model.Fi gcid := file.GetHash().GetHash(hash_extend.GCID) var err error if len(gcid) < hash_extend.GCID.Width { - _, gcid, err = stream.CacheFullInTempFileAndHash(file, hash_extend.GCID, file.GetSize()) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, gcid, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, hash_extend.GCID, file.GetSize()) if err != nil { return err } diff --git a/drivers/thunder_browser/driver.go b/drivers/thunder_browser/driver.go index 6dfc0c8f..89d68ea1 100644 --- a/drivers/thunder_browser/driver.go +++ b/drivers/thunder_browser/driver.go @@ -491,7 +491,9 @@ func (xc *XunLeiBrowserCommon) Put(ctx context.Context, dstDir model.Obj, stream gcid := stream.GetHash().GetHash(hash_extend.GCID) var err error if len(gcid) < hash_extend.GCID.Width { - _, gcid, err = streamPkg.CacheFullInTempFileAndHash(stream, hash_extend.GCID, stream.GetSize()) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, gcid, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, hash_extend.GCID, stream.GetSize()) if err != nil { return err } diff --git a/drivers/thunderx/driver.go b/drivers/thunderx/driver.go index 9ad1cd7c..81e9adf0 100644 --- a/drivers/thunderx/driver.go +++ b/drivers/thunderx/driver.go @@ -369,7 +369,9 @@ func (xc *XunLeiXCommon) Put(ctx context.Context, dstDir model.Obj, file model.F gcid := file.GetHash().GetHash(hash_extend.GCID) var err error if len(gcid) < hash_extend.GCID.Width { - _, gcid, err = stream.CacheFullInTempFileAndHash(file, hash_extend.GCID, file.GetSize()) + cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50) + up = model.UpdateProgressWithRange(up, 50, 100) + _, gcid, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, hash_extend.GCID, file.GetSize()) if err != nil { return err } diff --git a/internal/fs/archive.go b/internal/fs/archive.go index 4a6c5d99..ea8c5e84 100644 --- a/internal/fs/archive.go +++ b/internal/fs/archive.go @@ -22,9 +22,9 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/tache" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/OpenListTeam/tache" ) type ArchiveDownloadTask struct { @@ -93,9 +93,9 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT t.status = "getting src object" for _, s := range ss { if s.GetFile() == nil { - _, err = stream.CacheFullInTempFileAndUpdateProgress(s, func(p float64) { + _, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) { t.SetProgress((float64(cur) + float64(s.GetSize())*p/100.0) / float64(total)) - }) + }, nil) } cur += s.GetSize() if err != nil { diff --git a/internal/model/obj.go b/internal/model/obj.go index 75509b8f..5bfcca47 100644 --- a/internal/model/obj.go +++ b/internal/model/obj.go @@ -55,6 +55,19 @@ type FileStreamer interface { type UpdateProgress func(percentage float64) +func UpdateProgressWithRange(inner UpdateProgress, start, end float64) UpdateProgress { + return func(p float64) { + if p < 0 { + p = 0 + } + if p > 100 { + p = 100 + } + scaled := start + (end-start)*(p/100.0) + inner(scaled) + } +} + type URL interface { URL() string } diff --git a/internal/stream/util.go b/internal/stream/util.go index 94bf750a..8bbc57a5 100644 --- a/internal/stream/util.go +++ b/internal/stream/util.go @@ -98,42 +98,52 @@ func (r *ReaderWithCtx) Close() error { return nil } -func CacheFullInTempFileAndUpdateProgress(stream model.FileStreamer, up model.UpdateProgress) (model.File, error) { +func CacheFullInTempFileAndWriter(stream model.FileStreamer, up model.UpdateProgress, w io.Writer) (model.File, error) { if cache := stream.GetFile(); cache != nil { - up(100) + if w != nil { + _, err := cache.Seek(0, io.SeekStart) + if err == nil { + var reader io.Reader = stream + if up != nil { + reader = &ReaderUpdatingProgress{ + Reader: stream, + UpdateProgress: up, + } + } + _, err = utils.CopyWithBuffer(w, reader) + if err == nil { + _, err = cache.Seek(0, io.SeekStart) + } + } + return cache, err + } + if up != nil { + up(100) + } return cache, nil } - tmpF, err := utils.CreateTempFile(&ReaderUpdatingProgress{ - Reader: stream, - UpdateProgress: up, - }, stream.GetSize()) - if err == nil { - stream.SetTmpFile(tmpF) - } - return tmpF, err -} -func CacheFullInTempFileAndWriter(stream model.FileStreamer, w io.Writer) (model.File, error) { - if cache := stream.GetFile(); cache != nil { - _, err := cache.Seek(0, io.SeekStart) - if err == nil { - _, err = utils.CopyWithBuffer(w, cache) - if err == nil { - _, err = cache.Seek(0, io.SeekStart) - } + var reader io.Reader = stream + if up != nil { + reader = &ReaderUpdatingProgress{ + Reader: stream, + UpdateProgress: up, } - return cache, err } - tmpF, err := utils.CreateTempFile(io.TeeReader(stream, w), stream.GetSize()) + + if w != nil { + reader = io.TeeReader(reader, w) + } + tmpF, err := utils.CreateTempFile(reader, stream.GetSize()) if err == nil { stream.SetTmpFile(tmpF) } return tmpF, err } -func CacheFullInTempFileAndHash(stream model.FileStreamer, hashType *utils.HashType, params ...any) (model.File, string, error) { - h := hashType.NewFunc(params...) - tmpF, err := CacheFullInTempFileAndWriter(stream, h) +func CacheFullInTempFileAndHash(stream model.FileStreamer, up model.UpdateProgress, hashType *utils.HashType, hashParams ...any) (model.File, string, error) { + h := hashType.NewFunc(hashParams...) + tmpF, err := CacheFullInTempFileAndWriter(stream, up, h) if err != nil { return nil, "", err }