feat(traffic): update progress when caching file (#646)

* feat(traffic): update progress when caching file

* 调整参数位置和命名

---------

Co-authored-by: j2rong4cn <j2rong@qq.com>
This commit is contained in:
Seven
2025-07-08 21:41:45 +08:00
committed by GitHub
parent 9e610af114
commit 3838ef0663
16 changed files with 96 additions and 45 deletions

View File

@ -22,9 +22,9 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/OpenListTeam/tache"
)
type ArchiveDownloadTask struct {
@ -93,9 +93,9 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
t.status = "getting src object"
for _, s := range ss {
if s.GetFile() == nil {
_, err = stream.CacheFullInTempFileAndUpdateProgress(s, func(p float64) {
_, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) {
t.SetProgress((float64(cur) + float64(s.GetSize())*p/100.0) / float64(total))
})
}, nil)
}
cur += s.GetSize()
if err != nil {

View File

@ -55,6 +55,19 @@ type FileStreamer interface {
type UpdateProgress func(percentage float64)
func UpdateProgressWithRange(inner UpdateProgress, start, end float64) UpdateProgress {
return func(p float64) {
if p < 0 {
p = 0
}
if p > 100 {
p = 100
}
scaled := start + (end-start)*(p/100.0)
inner(scaled)
}
}
type URL interface {
URL() string
}

View File

@ -98,42 +98,52 @@ func (r *ReaderWithCtx) Close() error {
return nil
}
func CacheFullInTempFileAndUpdateProgress(stream model.FileStreamer, up model.UpdateProgress) (model.File, error) {
func CacheFullInTempFileAndWriter(stream model.FileStreamer, up model.UpdateProgress, w io.Writer) (model.File, error) {
if cache := stream.GetFile(); cache != nil {
up(100)
if w != nil {
_, err := cache.Seek(0, io.SeekStart)
if err == nil {
var reader io.Reader = stream
if up != nil {
reader = &ReaderUpdatingProgress{
Reader: stream,
UpdateProgress: up,
}
}
_, err = utils.CopyWithBuffer(w, reader)
if err == nil {
_, err = cache.Seek(0, io.SeekStart)
}
}
return cache, err
}
if up != nil {
up(100)
}
return cache, nil
}
tmpF, err := utils.CreateTempFile(&ReaderUpdatingProgress{
Reader: stream,
UpdateProgress: up,
}, stream.GetSize())
if err == nil {
stream.SetTmpFile(tmpF)
}
return tmpF, err
}
func CacheFullInTempFileAndWriter(stream model.FileStreamer, w io.Writer) (model.File, error) {
if cache := stream.GetFile(); cache != nil {
_, err := cache.Seek(0, io.SeekStart)
if err == nil {
_, err = utils.CopyWithBuffer(w, cache)
if err == nil {
_, err = cache.Seek(0, io.SeekStart)
}
var reader io.Reader = stream
if up != nil {
reader = &ReaderUpdatingProgress{
Reader: stream,
UpdateProgress: up,
}
return cache, err
}
tmpF, err := utils.CreateTempFile(io.TeeReader(stream, w), stream.GetSize())
if w != nil {
reader = io.TeeReader(reader, w)
}
tmpF, err := utils.CreateTempFile(reader, stream.GetSize())
if err == nil {
stream.SetTmpFile(tmpF)
}
return tmpF, err
}
func CacheFullInTempFileAndHash(stream model.FileStreamer, hashType *utils.HashType, params ...any) (model.File, string, error) {
h := hashType.NewFunc(params...)
tmpF, err := CacheFullInTempFileAndWriter(stream, h)
func CacheFullInTempFileAndHash(stream model.FileStreamer, up model.UpdateProgress, hashType *utils.HashType, hashParams ...any) (model.File, string, error) {
h := hashType.NewFunc(hashParams...)
tmpF, err := CacheFullInTempFileAndWriter(stream, up, h)
if err != nil {
return nil, "", err
}