feat(offline-download): SimpleHttp: download stream direct upload (#523)

* feat(offline-download): stream download to upload

* 重命名stream_put为upload_download_stream

* chore
This commit is contained in:
j2rong4cn
2025-07-02 18:47:16 +08:00
committed by GitHub
parent d707f002eb
commit e83f8e197a
6 changed files with 80 additions and 7 deletions

View File

@ -16,8 +16,8 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common" "github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/pkg/errors"
"github.com/OpenListTeam/tache" "github.com/OpenListTeam/tache"
"github.com/pkg/errors"
) )
type CopyTask struct { type CopyTask struct {

View File

@ -16,8 +16,8 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common" "github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/pkg/errors"
"github.com/OpenListTeam/tache" "github.com/OpenListTeam/tache"
"github.com/pkg/errors"
) )
type MoveTask struct { type MoveTask struct {

View File

@ -11,6 +11,7 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/offline_download/tool" "github.com/OpenListTeam/OpenList/v4/internal/offline_download/tool"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
) )
@ -53,10 +54,18 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
if err != nil { if err != nil {
return err return err
} }
req, err := http.NewRequestWithContext(task.Ctx(), http.MethodGet, u, nil) streamPut := task.DeletePolicy == tool.UploadDownloadStream
method := http.MethodGet
if streamPut {
method = http.MethodHead
}
req, err := http.NewRequestWithContext(task.Ctx(), method, u, nil)
if err != nil { if err != nil {
return err return err
} }
if streamPut {
req.Header.Set("Range", "bytes=0-")
}
resp, err := s.client.Do(req) resp, err := s.client.Do(req)
if err != nil { if err != nil {
return err return err
@ -74,6 +83,17 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
if n, err := parseFilenameFromContentDisposition(resp.Header.Get("Content-Disposition")); err == nil { if n, err := parseFilenameFromContentDisposition(resp.Header.Get("Content-Disposition")); err == nil {
filename = n filename = n
} }
fileSize := resp.ContentLength
if streamPut {
if fileSize == 0 {
start, end, _ := http_range.ParseContentRange(resp.Header.Get("Content-Range"))
fileSize = start + end
}
task.SetTotalBytes(fileSize)
task.TempDir = filename
return nil
}
task.SetTotalBytes(fileSize)
// save to temp dir // save to temp dir
_ = os.MkdirAll(task.TempDir, os.ModePerm) _ = os.MkdirAll(task.TempDir, os.ModePerm)
filePath := filepath.Join(task.TempDir, filename) filePath := filepath.Join(task.TempDir, filename)
@ -82,8 +102,6 @@ func (s SimpleHttp) Run(task *tool.DownloadTask) error {
return err return err
} }
defer file.Close() defer file.Close()
fileSize := resp.ContentLength
task.SetTotalBytes(fileSize)
err = utils.CopyWithCtx(task.Ctx(), file, resp.Body, fileSize, task.SetProgress) err = utils.CopyWithCtx(task.Ctx(), file, resp.Body, fileSize, task.SetProgress)
return err return err
} }

View File

@ -29,6 +29,7 @@ const (
DeleteOnUploadFailed DeletePolicy = "delete_on_upload_failed" DeleteOnUploadFailed DeletePolicy = "delete_on_upload_failed"
DeleteNever DeletePolicy = "delete_never" DeleteNever DeletePolicy = "delete_never"
DeleteAlways DeletePolicy = "delete_always" DeleteAlways DeletePolicy = "delete_always"
UploadDownloadStream DeletePolicy = "upload_download_stream"
) )
type AddURLArgs struct { type AddURLArgs struct {

View File

@ -6,11 +6,13 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/setting" "github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/OpenListTeam/tache"
) )
type DownloadTask struct { type DownloadTask struct {
@ -171,6 +173,27 @@ func (t *DownloadTask) Transfer() error {
} }
return nil return nil
} }
if t.DeletePolicy == UploadDownloadStream {
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(t.DstDirPath)
if err != nil {
return errors.WithMessage(err, "failed get dst storage")
}
taskCreator, _ := t.Ctx().Value("user").(*model.User)
task := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
},
SrcObjPath: t.TempDir,
DstDirPath: dstDirActualPath,
DstStorage: dstStorage,
DstStorageMp: dstStorage.GetStorage().MountPath,
DeletePolicy: t.DeletePolicy,
Url: t.Url,
}
task.SetTotalBytes(t.GetTotalBytes())
TransferTaskManager.Add(task)
return nil
}
return transferStd(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy) return transferStd(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy)
} }

View File

@ -14,10 +14,11 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/OpenListTeam/tache"
) )
type TransferTask struct { type TransferTask struct {
@ -30,6 +31,7 @@ type TransferTask struct {
SrcStorageMp string `json:"src_storage_mp"` SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"` DstStorageMp string `json:"dst_storage_mp"`
DeletePolicy DeletePolicy `json:"delete_policy"` DeletePolicy DeletePolicy `json:"delete_policy"`
Url string `json:"-"`
} }
func (t *TransferTask) Run() error { func (t *TransferTask) Run() error {
@ -40,6 +42,32 @@ func (t *TransferTask) Run() error {
t.SetStartTime(time.Now()) t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }() defer func() { t.SetEndTime(time.Now()) }()
if t.SrcStorage == nil { if t.SrcStorage == nil {
if t.DeletePolicy == UploadDownloadStream {
rrc, err := stream.GetRangeReadCloserFromLink(t.GetTotalBytes(), &model.Link{URL: t.Url})
if err != nil {
return err
}
r, err := rrc.RangeRead(t.Ctx(), http_range.Range{Length: t.GetTotalBytes()})
if err != nil {
return err
}
name := t.SrcObjPath
mimetype := utils.GetMimeType(name)
s := &stream.FileStream{
Ctx: nil,
Obj: &model.Object{
Name: name,
Size: t.GetTotalBytes(),
Modified: time.Now(),
IsFolder: false,
},
Reader: r,
Mimetype: mimetype,
Closers: utils.NewClosers(rrc),
}
defer s.Close()
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
}
return transferStdPath(t) return transferStdPath(t)
} else { } else {
return transferObjPath(t) return transferObjPath(t)
@ -47,6 +75,9 @@ func (t *TransferTask) Run() error {
} }
func (t *TransferTask) GetName() string { func (t *TransferTask) GetName() string {
if t.DeletePolicy == UploadDownloadStream {
return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcObjPath, t.Url, t.DstStorageMp, t.DstDirPath)
}
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
} }