mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-09-19 20:26:26 +08:00

* feat(task): add task hook,batch task
refactor(move): move use CopyTask
* Update internal/task/batch_task/refresh.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
* fix: upload task allFinish judge
* Update internal/task/batch_task/refresh.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
* feat: enhance concurrency safety
* 优化代码
* 解压缩
* 修复死锁
* refactor(move): move as task
* 重构,优化
* .
* 优化,修复bug
* .
* 修复bug
* feat: add task retry judge
* 代理Task.SetState函数来判断Task的生命周期
* chore: use OnSucceeded、OnFailed、OnBeforeRetry functions
* 优化
* 优化,去除重复代码
* .
* 优化
* .
* webdav
* Revert "fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to."
This reverts commit 5f03edd683
.
---------
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: j2rong4cn <j2rong@qq.com>
333 lines
9.8 KiB
Go
333 lines
9.8 KiB
Go
package tool
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
stdpath "path"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/fs"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/stream"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/task"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
|
|
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
|
|
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
|
"github.com/OpenListTeam/OpenList/v4/server/common"
|
|
"github.com/OpenListTeam/tache"
|
|
"github.com/pkg/errors"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type TransferTask struct {
|
|
fs.TaskData
|
|
DeletePolicy DeletePolicy `json:"delete_policy"`
|
|
Url string `json:"url"`
|
|
groupID string `json:"-"`
|
|
}
|
|
|
|
func (t *TransferTask) Run() error {
|
|
if err := t.ReinitCtx(); err != nil {
|
|
return err
|
|
}
|
|
t.ClearEndTime()
|
|
t.SetStartTime(time.Now())
|
|
defer func() { t.SetEndTime(time.Now()) }()
|
|
if t.SrcStorage == nil {
|
|
if t.DeletePolicy == UploadDownloadStream {
|
|
rr, err := stream.GetRangeReaderFromLink(t.GetTotalBytes(), &model.Link{URL: t.Url})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r, err := rr.RangeRead(t.Ctx(), http_range.Range{Length: t.GetTotalBytes()})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
name := t.SrcActualPath
|
|
mimetype := utils.GetMimeType(name)
|
|
s := &stream.FileStream{
|
|
Ctx: t.Ctx(),
|
|
Obj: &model.Object{
|
|
Name: name,
|
|
Size: t.GetTotalBytes(),
|
|
Modified: time.Now(),
|
|
IsFolder: false,
|
|
},
|
|
Reader: r,
|
|
Mimetype: mimetype,
|
|
Closers: utils.NewClosers(r),
|
|
}
|
|
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress)
|
|
}
|
|
return transferStdPath(t)
|
|
} else {
|
|
return transferObjPath(t)
|
|
}
|
|
}
|
|
|
|
func (t *TransferTask) GetName() string {
|
|
if t.DeletePolicy == UploadDownloadStream {
|
|
return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcActualPath, t.Url, t.DstStorageMp, t.DstActualPath)
|
|
}
|
|
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcActualPath, t.DstStorageMp, t.DstActualPath)
|
|
}
|
|
|
|
func (t *TransferTask) OnSucceeded() {
|
|
if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways {
|
|
if t.SrcStorage == nil {
|
|
removeStdTemp(t)
|
|
} else {
|
|
removeObjTemp(t)
|
|
}
|
|
}
|
|
task_group.TransferCoordinator.Done(t.groupID, true)
|
|
}
|
|
|
|
func (t *TransferTask) OnFailed() {
|
|
if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways {
|
|
if t.SrcStorage == nil {
|
|
removeStdTemp(t)
|
|
} else {
|
|
removeObjTemp(t)
|
|
}
|
|
}
|
|
task_group.TransferCoordinator.Done(t.groupID, false)
|
|
}
|
|
|
|
func (t *TransferTask) SetRetry(retry int, maxRetry int) {
|
|
if retry == 0 &&
|
|
(len(t.groupID) == 0 || // 重启恢复
|
|
(t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试
|
|
t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath)
|
|
task_group.TransferCoordinator.AddTask(t.groupID, nil)
|
|
}
|
|
t.TaskExtension.SetRetry(retry, maxRetry)
|
|
}
|
|
|
|
var (
|
|
TransferTaskManager *tache.Manager[*TransferTask]
|
|
)
|
|
|
|
func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
|
|
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
|
if err != nil {
|
|
return errors.WithMessage(err, "failed get dst storage")
|
|
}
|
|
entries, err := os.ReadDir(tempDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
|
|
for _, entry := range entries {
|
|
t := &TransferTask{
|
|
TaskData: fs.TaskData{
|
|
TaskExtension: task.TaskExtension{
|
|
Creator: taskCreator,
|
|
ApiUrl: common.GetApiUrl(ctx),
|
|
},
|
|
SrcActualPath: stdpath.Join(tempDir, entry.Name()),
|
|
DstActualPath: dstDirActualPath,
|
|
DstStorage: dstStorage,
|
|
DstStorageMp: dstStorage.GetStorage().MountPath,
|
|
},
|
|
groupID: dstDirPath,
|
|
DeletePolicy: deletePolicy,
|
|
}
|
|
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
|
|
TransferTaskManager.Add(t)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func transferStdPath(t *TransferTask) error {
|
|
t.Status = "getting src object"
|
|
info, err := os.Stat(t.SrcActualPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.IsDir() {
|
|
t.Status = "src object is dir, listing objs"
|
|
entries, err := os.ReadDir(t.SrcActualPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dstDirActualPath := stdpath.Join(t.DstActualPath, info.Name())
|
|
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath))
|
|
for _, entry := range entries {
|
|
srcRawPath := stdpath.Join(t.SrcActualPath, entry.Name())
|
|
task := &TransferTask{
|
|
TaskData: fs.TaskData{
|
|
TaskExtension: task.TaskExtension{
|
|
Creator: t.Creator,
|
|
ApiUrl: t.ApiUrl,
|
|
},
|
|
SrcActualPath: srcRawPath,
|
|
DstActualPath: dstDirActualPath,
|
|
DstStorage: t.DstStorage,
|
|
SrcStorageMp: t.SrcStorageMp,
|
|
DstStorageMp: t.DstStorageMp,
|
|
},
|
|
groupID: t.groupID,
|
|
DeletePolicy: t.DeletePolicy,
|
|
}
|
|
task_group.TransferCoordinator.AddTask(t.groupID, nil)
|
|
TransferTaskManager.Add(task)
|
|
}
|
|
t.Status = "src object is dir, added all transfer tasks of files"
|
|
return nil
|
|
}
|
|
return transferStdFile(t)
|
|
}
|
|
|
|
func transferStdFile(t *TransferTask) error {
|
|
rc, err := os.Open(t.SrcActualPath)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to open file %s", t.SrcActualPath)
|
|
}
|
|
info, err := rc.Stat()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to get file %s", t.SrcActualPath)
|
|
}
|
|
mimetype := utils.GetMimeType(t.SrcActualPath)
|
|
s := &stream.FileStream{
|
|
Ctx: t.Ctx(),
|
|
Obj: &model.Object{
|
|
Name: filepath.Base(t.SrcActualPath),
|
|
Size: info.Size(),
|
|
Modified: info.ModTime(),
|
|
IsFolder: false,
|
|
},
|
|
Reader: rc,
|
|
Mimetype: mimetype,
|
|
Closers: utils.NewClosers(rc),
|
|
}
|
|
t.SetTotalBytes(info.Size())
|
|
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress)
|
|
}
|
|
|
|
func removeStdTemp(t *TransferTask) {
|
|
info, err := os.Stat(t.SrcActualPath)
|
|
if err != nil || info.IsDir() {
|
|
return
|
|
}
|
|
if err := os.Remove(t.SrcActualPath); err != nil {
|
|
log.Errorf("failed to delete temp file %s, error: %s", t.SrcActualPath, err.Error())
|
|
}
|
|
}
|
|
|
|
func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
|
|
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(tempDir)
|
|
if err != nil {
|
|
return errors.WithMessage(err, "failed get src storage")
|
|
}
|
|
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
|
if err != nil {
|
|
return errors.WithMessage(err, "failed get dst storage")
|
|
}
|
|
objs, err := op.List(ctx, srcStorage, srcObjActualPath, model.ListArgs{})
|
|
if err != nil {
|
|
return errors.WithMessagef(err, "failed list src [%s] objs", tempDir)
|
|
}
|
|
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) // taskCreator is nil when convert failed
|
|
for _, obj := range objs {
|
|
t := &TransferTask{
|
|
TaskData: fs.TaskData{
|
|
TaskExtension: task.TaskExtension{
|
|
Creator: taskCreator,
|
|
ApiUrl: common.GetApiUrl(ctx),
|
|
},
|
|
SrcActualPath: stdpath.Join(srcObjActualPath, obj.GetName()),
|
|
DstActualPath: dstDirActualPath,
|
|
SrcStorage: srcStorage,
|
|
DstStorage: dstStorage,
|
|
SrcStorageMp: srcStorage.GetStorage().MountPath,
|
|
DstStorageMp: dstStorage.GetStorage().MountPath,
|
|
},
|
|
groupID: dstDirPath,
|
|
DeletePolicy: deletePolicy,
|
|
}
|
|
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
|
|
TransferTaskManager.Add(t)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func transferObjPath(t *TransferTask) error {
|
|
t.Status = "getting src object"
|
|
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
|
|
if err != nil {
|
|
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
|
|
}
|
|
if srcObj.IsDir() {
|
|
t.Status = "src object is dir, listing objs"
|
|
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
|
|
if err != nil {
|
|
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
|
|
}
|
|
dstDirActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
|
|
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath))
|
|
for _, obj := range objs {
|
|
if utils.IsCanceled(t.Ctx()) {
|
|
return nil
|
|
}
|
|
srcObjPath := stdpath.Join(t.SrcActualPath, obj.GetName())
|
|
task_group.TransferCoordinator.AddTask(t.groupID, nil)
|
|
TransferTaskManager.Add(&TransferTask{
|
|
TaskData: fs.TaskData{
|
|
TaskExtension: task.TaskExtension{
|
|
Creator: t.Creator,
|
|
ApiUrl: t.ApiUrl,
|
|
},
|
|
SrcActualPath: srcObjPath,
|
|
DstActualPath: dstDirActualPath,
|
|
SrcStorage: t.SrcStorage,
|
|
DstStorage: t.DstStorage,
|
|
SrcStorageMp: t.SrcStorageMp,
|
|
DstStorageMp: t.DstStorageMp,
|
|
},
|
|
groupID: t.groupID,
|
|
DeletePolicy: t.DeletePolicy,
|
|
})
|
|
}
|
|
t.Status = "src object is dir, added all transfer tasks of objs"
|
|
return nil
|
|
}
|
|
return transferObjFile(t)
|
|
}
|
|
|
|
func transferObjFile(t *TransferTask) error {
|
|
srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
|
|
if err != nil {
|
|
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
|
|
}
|
|
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
|
|
if err != nil {
|
|
return errors.WithMessagef(err, "failed get [%s] link", t.SrcActualPath)
|
|
}
|
|
// any link provided is seekable
|
|
ss, err := stream.NewSeekableStream(&stream.FileStream{
|
|
Obj: srcFile,
|
|
Ctx: t.Ctx(),
|
|
}, link)
|
|
if err != nil {
|
|
_ = link.Close()
|
|
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcActualPath)
|
|
}
|
|
t.SetTotalBytes(ss.GetSize())
|
|
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, ss, t.SetProgress)
|
|
}
|
|
|
|
func removeObjTemp(t *TransferTask) {
|
|
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
|
|
if err != nil || srcObj.IsDir() {
|
|
return
|
|
}
|
|
if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcActualPath); err != nil {
|
|
log.Errorf("failed to delete temp obj %s, error: %s", t.SrcActualPath, err.Error())
|
|
}
|
|
}
|