From e93ab76036b64cddbb6a2f3c2d86846dee44340a Mon Sep 17 00:00:00 2001 From: Seven <53081179+Seven66677731@users.noreply.github.com> Date: Thu, 24 Jul 2025 16:15:24 +0800 Subject: [PATCH] feat(task-group): introduce TaskGroupCoordinator for coordinated task execution (#721) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(task): add task hook,batch task refactor(move): move use CopyTask * Update internal/task/batch_task/refresh.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com> * fix: upload task allFinish judge * Update internal/task/batch_task/refresh.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com> * feat: enhance concurrency safety * 优化代码 * 解压缩 * 修复死锁 * refactor(move): move as task * 重构,优化 * . * 优化,修复bug * . * 修复bug * feat: add task retry judge * 代理Task.SetState函数来判断Task的生命周期 * chore: use OnSucceeded、OnFailed、OnBeforeRetry functions * 优化 * 优化,去除重复代码 * . * 优化 * . * webdav * Revert "fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to." This reverts commit 5f03edd683ee4e90bf650e500d52de73db26aaa8. --------- Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: j2rong4cn --- drivers/alias/driver.go | 3 +- drivers/doubao/util.go | 4 +- internal/bootstrap/task.go | 4 +- internal/fs/archive.go | 144 ++-- internal/fs/copy.go | 206 ------ internal/fs/copy_move.go | 252 +++++++ internal/fs/fs.go | 24 +- internal/fs/move.go | 638 ------------------ internal/fs/other.go | 33 +- internal/fs/put.go | 22 + internal/fs/task.go | 153 ----- internal/net/serve.go | 8 +- internal/offline_download/115_open/client.go | 1 + .../thunder_browser/thunder_browser.go | 3 +- internal/offline_download/tool/add.go | 3 + internal/offline_download/tool/download.go | 25 +- internal/offline_download/tool/transfer.go | 186 ++--- internal/op/archive.go | 6 +- internal/op/fs.go | 28 +- internal/task_group/group.go | 81 +++ internal/task_group/transfer.go | 103 +++ server/ftp/fsmanage.go | 2 +- server/handles/fsbatch.go | 2 +- server/handles/fsmanage.go | 2 +- server/webdav/file.go | 2 +- server/webdav/prop.go | 5 +- 26 files changed, 726 insertions(+), 1214 deletions(-) delete mode 100644 internal/fs/copy.go create mode 100644 internal/fs/copy_move.go delete mode 100644 internal/fs/move.go delete mode 100644 internal/fs/task.go create mode 100644 internal/task_group/group.go create mode 100644 internal/task_group/transfer.go diff --git a/drivers/alias/driver.go b/drivers/alias/driver.go index c516d868..5a1b6930 100644 --- a/drivers/alias/driver.go +++ b/drivers/alias/driver.go @@ -193,7 +193,8 @@ func (d *Alias) Move(ctx context.Context, srcObj, dstDir model.Obj) error { } if len(srcPath) == len(dstPath) { for i := range srcPath { - err = errors.Join(err, fs.Move(ctx, *srcPath[i], *dstPath[i])) + _, e := fs.Move(ctx, *srcPath[i], *dstPath[i]) + err = errors.Join(err, e) } return err } else { diff --git a/drivers/doubao/util.go b/drivers/doubao/util.go index 70b4231c..bc633baf 100644 --- a/drivers/doubao/util.go +++ b/drivers/doubao/util.go @@ -14,7 +14,7 @@ import ( "math/rand" "net/http" "net/url" - "path/filepath" + stdpath "path" "sort" "strconv" "strings" @@ -353,7 +353,7 @@ func (d *Doubao) getUploadConfig(upConfig *UploadConfig, dataType string, file m "ServiceId": d.UploadToken.Alice[dataType].ServiceID, "NeedFallback": "true", "FileSize": strconv.FormatInt(file.GetSize(), 10), - "FileExtension": filepath.Ext(file.GetName()), + "FileExtension": stdpath.Ext(file.GetName()), "s": randomString(), } } diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index 0ace27cf..47e0b59e 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -22,11 +22,11 @@ func InitTaskManager() { op.RegisterSettingChangingCallback(func() { fs.UploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers))) }) - fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) + fs.CopyTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.CopyTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers))) }) - fs.MoveTaskManager = tache.NewManager[*fs.MoveTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry)) + fs.MoveTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.MoveTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers))) }) diff --git a/internal/fs/archive.go b/internal/fs/archive.go index fb047b82..56b38e8c 100644 --- a/internal/fs/archive.go +++ b/internal/fs/archive.go @@ -6,11 +6,9 @@ import ( "fmt" "io" "math/rand" - "mime" "os" stdpath "path" "path/filepath" - "strconv" "strings" "time" @@ -21,30 +19,22 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/internal/task_group" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" "github.com/OpenListTeam/tache" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) type ArchiveDownloadTask struct { - task.TaskExtension + TaskData model.ArchiveDecompressArgs - status string - SrcObjPath string - DstDirPath string - srcStorage driver.Driver - dstStorage driver.Driver - SrcStorageMp string - DstStorageMp string } func (t *ArchiveDownloadTask) GetName() string { - return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcObjPath, - t.InnerPath, t.DstStorageMp, t.DstDirPath, t.Password) -} - -func (t *ArchiveDownloadTask) GetStatus() string { - return t.status + return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcActualPath, + t.InnerPath, t.DstStorageMp, t.DstActualPath, t.Password) } func (t *ArchiveDownloadTask) Run() error { @@ -58,16 +48,21 @@ func (t *ArchiveDownloadTask) Run() error { if err != nil { return err } + uploadTask.groupID = stdpath.Join(uploadTask.DstStorageMp, uploadTask.DstActualPath) + task_group.TransferCoordinator.AddTask(uploadTask.groupID, nil) ArchiveContentUploadTaskManager.Add(uploadTask) return nil } func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) { var err error - if t.srcStorage == nil { - t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) + if t.SrcStorage == nil { + t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) + if err != nil { + return nil, err + } } - srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.srcStorage, t.SrcObjPath, model.LinkArgs{}) + srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{}) if err != nil { return nil, err } @@ -87,7 +82,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT total += s.GetSize() } t.SetTotalBytes(total) - t.status = "getting src object" + t.Status = "getting src object" for _, s := range ss { if s.GetFile() == nil { _, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) { @@ -104,7 +99,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT } else { decompressUp = t.SetProgress } - t.status = "walking and decompressing" + t.Status = "walking and decompressing" dir, err := os.MkdirTemp(conf.Conf.TempDir, "dir-*") if err != nil { return nil, err @@ -117,13 +112,14 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT uploadTask := &ArchiveContentUploadTask{ TaskExtension: task.TaskExtension{ Creator: t.GetCreator(), + ApiUrl: t.ApiUrl, }, - ObjName: baseName, - InPlace: !t.PutIntoNewDir, - FilePath: dir, - DstDirPath: t.DstDirPath, - dstStorage: t.dstStorage, - DstStorageMp: t.DstStorageMp, + ObjName: baseName, + InPlace: !t.PutIntoNewDir, + FilePath: dir, + DstActualPath: t.DstActualPath, + dstStorage: t.DstStorage, + DstStorageMp: t.DstStorageMp, } return uploadTask, nil } @@ -132,18 +128,19 @@ var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask] type ArchiveContentUploadTask struct { task.TaskExtension - status string - ObjName string - InPlace bool - FilePath string - DstDirPath string - dstStorage driver.Driver - DstStorageMp string - finalized bool + status string + ObjName string + InPlace bool + FilePath string + DstActualPath string + dstStorage driver.Driver + DstStorageMp string + finalized bool + groupID string } func (t *ArchiveContentUploadTask) GetName() string { - return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstDirPath) + return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstActualPath) } func (t *ArchiveContentUploadTask) GetStatus() string { @@ -163,10 +160,31 @@ func (t *ArchiveContentUploadTask) Run() error { }) } -func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *ArchiveContentUploadTask) error) error { +func (t *ArchiveContentUploadTask) OnSucceeded() { + task_group.TransferCoordinator.Done(t.groupID, true) +} + +func (t *ArchiveContentUploadTask) OnFailed() { + task_group.TransferCoordinator.Done(t.groupID, false) +} + +func (t *ArchiveContentUploadTask) SetRetry(retry int, maxRetry int) { + t.TaskExtension.SetRetry(retry, maxRetry) + if retry == 0 && + (len(t.groupID) == 0 || // 重启恢复 + (t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试 + t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath) + task_group.TransferCoordinator.AddTask(t.groupID, nil) + } +} + +func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error { var err error if t.dstStorage == nil { t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) + if err != nil { + return err + } } info, err := os.Stat(t.FilePath) if err != nil { @@ -174,10 +192,10 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi } if info.IsDir() { t.status = "src object is dir, listing objs" - nextDstPath := t.DstDirPath + nextDstActualPath := t.DstActualPath if !t.InPlace { - nextDstPath = stdpath.Join(nextDstPath, t.ObjName) - err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstPath) + nextDstActualPath = stdpath.Join(nextDstActualPath, t.ObjName) + err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstActualPath) if err != nil { return err } @@ -186,6 +204,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi if err != nil { return err } + if !t.InPlace && len(t.groupID) > 0 { + task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(nextDstActualPath)) + } var es error for _, entry := range entries { var nextFilePath string @@ -198,16 +219,21 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi es = stderrors.Join(es, err) continue } + if len(t.groupID) > 0 { + task_group.TransferCoordinator.AddTask(t.groupID, nil) + } err = f(&ArchiveContentUploadTask{ TaskExtension: task.TaskExtension{ Creator: t.GetCreator(), + ApiUrl: t.ApiUrl, }, - ObjName: entry.Name(), - InPlace: false, - FilePath: nextFilePath, - DstDirPath: nextDstPath, - dstStorage: t.dstStorage, - DstStorageMp: t.DstStorageMp, + ObjName: entry.Name(), + InPlace: false, + FilePath: nextFilePath, + DstActualPath: nextDstActualPath, + dstStorage: t.dstStorage, + DstStorageMp: t.DstStorageMp, + groupID: t.groupID, }) if err != nil { es = stderrors.Join(es, err) @@ -228,13 +254,13 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi Size: info.Size(), Modified: time.Now(), }, - Mimetype: mime.TypeByExtension(filepath.Ext(t.ObjName)), + Mimetype: utils.GetMimeType(stdpath.Ext(t.ObjName)), WebPutAsTask: true, Reader: file, } fs.Closers.Add(file) t.status = "uploading" - err = op.Put(t.Ctx(), t.dstStorage, t.DstDirPath, fs, t.SetProgress, true) + err = op.Put(t.Ctx(), t.dstStorage, t.DstActualPath, fs, t.SetProgress, true) if err != nil { return err } @@ -271,8 +297,9 @@ func moveToTempPath(path, prefix string) (string, error) { func genTempFileName(prefix string) (string, error) { retry := 0 + t := time.Now().UnixMilli() for retry < 10000 { - newPath := stdpath.Join(conf.Conf.TempDir, prefix+strconv.FormatUint(uint64(rand.Uint32()), 10)) + newPath := filepath.Join(conf.Conf.TempDir, prefix+fmt.Sprintf("%x-%x", t, rand.Uint32())) if _, err := os.Stat(newPath); err != nil { if os.IsNotExist(err) { return newPath, nil @@ -354,16 +381,19 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args } taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) tsk := &ArchiveDownloadTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, + TaskData: TaskData{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), + }, + SrcStorage: srcStorage, + DstStorage: dstStorage, + SrcActualPath: srcObjActualPath, + DstActualPath: dstDirActualPath, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, }, ArchiveDecompressArgs: args, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcObjActualPath, - DstDirPath: dstDirActualPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, } if ctx.Value(conf.NoTaskKey) != nil { uploadTask, err := tsk.RunWithoutPushUploadTask() diff --git a/internal/fs/copy.go b/internal/fs/copy.go deleted file mode 100644 index bb8e98bc..00000000 --- a/internal/fs/copy.go +++ /dev/null @@ -1,206 +0,0 @@ -package fs - -import ( - "context" - "fmt" - stdpath "path" - "time" - - "github.com/OpenListTeam/OpenList/v4/internal/conf" - "github.com/OpenListTeam/OpenList/v4/internal/driver" - "github.com/OpenListTeam/OpenList/v4/internal/errs" - "github.com/OpenListTeam/OpenList/v4/internal/model" - "github.com/OpenListTeam/OpenList/v4/internal/op" - "github.com/OpenListTeam/OpenList/v4/internal/stream" - "github.com/OpenListTeam/OpenList/v4/internal/task" - "github.com/OpenListTeam/OpenList/v4/pkg/utils" - "github.com/OpenListTeam/OpenList/v4/server/common" - "github.com/OpenListTeam/tache" - "github.com/pkg/errors" -) - -type CopyTask struct { - task.TaskExtension - Status string `json:"-"` //don't save status to save space - SrcObjPath string `json:"src_path"` - DstDirPath string `json:"dst_path"` - srcStorage driver.Driver `json:"-"` - dstStorage driver.Driver `json:"-"` - SrcStorageMp string `json:"src_storage_mp"` - DstStorageMp string `json:"dst_storage_mp"` -} - -func (t *CopyTask) GetName() string { - return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) -} - -func (t *CopyTask) GetStatus() string { - return t.Status -} - -func (t *CopyTask) Run() error { - if err := t.ReinitCtx(); err != nil { - return err - } - t.ClearEndTime() - t.SetStartTime(time.Now()) - defer func() { t.SetEndTime(time.Now()) }() - - var err error - if t.srcStorage == nil { - t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) - } - if t.dstStorage == nil { - t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) - } - if err != nil { - return errors.WithMessage(err, "failed get storage") - } - - // Use the task object's memory address as a unique identifier - taskID := fmt.Sprintf("%p", t) - - // Register task to batch tracker - copyBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath) - - // Execute copy operation - err = copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - - // Mark task completed and automatically refresh cache if needed - copyBatchTracker.MarkTaskCompletedWithRefresh(taskID) - - return err -} - -var CopyTaskManager *tache.Manager[*CopyTask] - -// Batch tracker for copy operations -var copyBatchTracker = NewBatchTracker("copy") - -// Copy if in the same storage, call move method -// if not, add copy task -func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { - srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath) - if err != nil { - return nil, errors.WithMessage(err, "failed get src storage") - } - dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) - if err != nil { - return nil, errors.WithMessage(err, "failed get dst storage") - } - // copy if in the same storage, just call driver.Copy - if srcStorage.GetStorage() == dstStorage.GetStorage() { - err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) - if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) { - if err == nil { - // Refresh target directory cache after successful same-storage copy - op.ClearCache(dstStorage, dstDirActualPath) - } - return nil, err - } - } - if ctx.Value(conf.NoTaskKey) != nil { - srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath) - if err != nil { - return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath) - } - if !srcObj.IsDir() { - // copy file directly - link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{}) - if err != nil { - return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath) - } - // any link provided is seekable - ss, err := stream.NewSeekableStream(&stream.FileStream{ - Obj: srcObj, - Ctx: ctx, - }, link) - if err != nil { - _ = link.Close() - return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath) - } - err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false) - if err == nil { - // Refresh target directory cache after successful direct file copy - op.ClearCache(dstStorage, dstDirActualPath) - } - return nil, err - } - } - // not in the same storage - taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) - t := &CopyTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, - ApiUrl: common.GetApiUrl(ctx), - }, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcObjActualPath, - DstDirPath: dstDirActualPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, - } - CopyTaskManager.Add(t) - return t, nil -} - -func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error { - t.Status = "getting src object" - srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath) - } - if srcObj.IsDir() { - t.Status = "src object is dir, listing objs" - objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath) - } - - for _, obj := range objs { - if utils.IsCanceled(t.Ctx()) { - return nil - } - srcObjPath := stdpath.Join(srcObjPath, obj.GetName()) - dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName()) - CopyTaskManager.Add(&CopyTask{ - TaskExtension: task.TaskExtension{ - Creator: t.GetCreator(), - ApiUrl: t.ApiUrl, - }, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcObjPath, - DstDirPath: dstObjPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, - }) - } - t.Status = "src object is dir, added all copy tasks of objs" - return nil - } - return copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath) -} - -func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error { - srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) - } - link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed get [%s] link", srcFilePath) - } - // any link provided is seekable - ss, err := stream.NewSeekableStream(&stream.FileStream{ - Obj: srcFile, - Ctx: tsk.Ctx(), - }, link) - if err != nil { - _ = link.Close() - return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) - } - tsk.SetTotalBytes(ss.GetSize()) - return op.Put(tsk.Ctx(), dstStorage, dstDirPath, ss, tsk.SetProgress, true) -} diff --git a/internal/fs/copy_move.go b/internal/fs/copy_move.go new file mode 100644 index 00000000..645fb6b9 --- /dev/null +++ b/internal/fs/copy_move.go @@ -0,0 +1,252 @@ +package fs + +import ( + "context" + "fmt" + stdpath "path" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/internal/task_group" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/OpenListTeam/tache" + "github.com/pkg/errors" +) + +type taskType uint8 + +func (t taskType) String() string { + if t == 0 { + return "copy" + } else { + return "move" + } +} + +const ( + copy taskType = iota + move +) + +type FileTransferTask struct { + TaskData + TaskType taskType + groupID string +} + +func (t *FileTransferTask) GetName() string { + return fmt.Sprintf("%s [%s](%s) to [%s](%s)", t.TaskType, t.SrcStorageMp, t.SrcActualPath, t.DstStorageMp, t.DstActualPath) +} + +func (t *FileTransferTask) Run() error { + if err := t.ReinitCtx(); err != nil { + return err + } + t.ClearEndTime() + t.SetStartTime(time.Now()) + defer func() { t.SetEndTime(time.Now()) }() + var err error + if t.SrcStorage == nil { + t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) + } + if t.DstStorage == nil { + t.DstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) + } + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + return putBetween2Storages(t, t.SrcStorage, t.DstStorage, t.SrcActualPath, t.DstActualPath) +} + +func (t *FileTransferTask) OnSucceeded() { + task_group.TransferCoordinator.Done(t.groupID, true) +} + +func (t *FileTransferTask) OnFailed() { + task_group.TransferCoordinator.Done(t.groupID, false) +} + +func (t *FileTransferTask) SetRetry(retry int, maxRetry int) { + t.TaskExtension.SetRetry(retry, maxRetry) + if retry == 0 && + (len(t.groupID) == 0 || // 重启恢复 + (t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试 + t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath) + var payload any + if t.TaskType == move { + payload = task_group.SrcPathToRemove(stdpath.Join(t.SrcStorageMp, t.SrcActualPath)) + } + task_group.TransferCoordinator.AddTask(t.groupID, payload) + } +} + +func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { + srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath) + if err != nil { + return nil, errors.WithMessage(err, "failed get src storage") + } + dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) + if err != nil { + return nil, errors.WithMessage(err, "failed get dst storage") + } + + if srcStorage.GetStorage() == dstStorage.GetStorage() { + if taskType == copy { + err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) + if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) { + return nil, err + } + } else { + err = op.Move(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) + if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) { + return nil, err + } + } + } else if ctx.Value(conf.NoTaskKey) != nil { + return nil, fmt.Errorf("can't %s files between two storages, please use the front-end ", taskType) + } + + // if ctx.Value(conf.NoTaskKey) != nil { // webdav + // srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath) + // if err != nil { + // return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath) + // } + // if !srcObj.IsDir() { + // // copy file directly + // link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{}) + // if err != nil { + // return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath) + // } + // // any link provided is seekable + // ss, err := stream.NewSeekableStream(&stream.FileStream{ + // Obj: srcObj, + // Ctx: ctx, + // }, link) + // if err != nil { + // _ = link.Close() + // return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath) + // } + // if taskType == move { + // defer func() { + // task_group.TransferCoordinator.Done(dstDirPath, err == nil) + // }() + // task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath)) + // } + // err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, taskType == move) + // return nil, err + // } else { + // return nil, fmt.Errorf("can't %s dir two storages, please use the front-end ", taskType) + // } + // } + + // not in the same storage + taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) + t := &FileTransferTask{ + TaskData: TaskData{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), + }, + SrcStorage: srcStorage, + DstStorage: dstStorage, + SrcActualPath: srcObjActualPath, + DstActualPath: dstDirActualPath, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, + }, + TaskType: taskType, + groupID: dstDirPath, + } + if taskType == copy { + task_group.TransferCoordinator.AddTask(dstDirPath, nil) + CopyTaskManager.Add(t) + } else { + task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath)) + MoveTaskManager.Add(t) + } + return t, nil +} + +func putBetween2Storages(t *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error { + t.Status = "getting src object" + srcObj, err := op.Get(t.Ctx(), srcStorage, srcActualPath) + if err != nil { + return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath) + } + if srcObj.IsDir() { + t.Status = "src object is dir, listing objs" + objs, err := op.List(t.Ctx(), srcStorage, srcActualPath, model.ListArgs{}) + if err != nil { + return errors.WithMessagef(err, "failed list src [%s] objs", srcActualPath) + } + dstActualPath := stdpath.Join(dstDirActualPath, srcObj.GetName()) + if t.TaskType == copy { + task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath)) + } + for _, obj := range objs { + if utils.IsCanceled(t.Ctx()) { + return nil + } + task := &FileTransferTask{ + TaskType: t.TaskType, + TaskData: TaskData{ + TaskExtension: task.TaskExtension{ + Creator: t.GetCreator(), + ApiUrl: t.ApiUrl, + }, + SrcStorage: srcStorage, + DstStorage: dstStorage, + SrcActualPath: stdpath.Join(srcActualPath, obj.GetName()), + DstActualPath: dstActualPath, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, + }, + groupID: t.groupID, + } + task_group.TransferCoordinator.AddTask(t.groupID, nil) + if t.TaskType == copy { + CopyTaskManager.Add(task) + } else { + MoveTaskManager.Add(task) + } + } + t.Status = fmt.Sprintf("src object is dir, added all %s tasks of objs", t.TaskType) + return nil + } + return putFileBetween2Storages(t, srcStorage, dstStorage, srcActualPath, dstDirActualPath) +} + +func putFileBetween2Storages(tsk *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error { + srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcActualPath) + if err != nil { + return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath) + } + tsk.SetTotalBytes(srcFile.GetSize()) + link, _, err := op.Link(tsk.Ctx(), srcStorage, srcActualPath, model.LinkArgs{}) + if err != nil { + return errors.WithMessagef(err, "failed get [%s] link", srcActualPath) + } + // any link provided is seekable + ss, err := stream.NewSeekableStream(&stream.FileStream{ + Obj: srcFile, + Ctx: tsk.Ctx(), + }, link) + if err != nil { + _ = link.Close() + return errors.WithMessagef(err, "failed get [%s] stream", srcActualPath) + } + tsk.SetTotalBytes(ss.GetSize()) + return op.Put(tsk.Ctx(), dstStorage, dstDirActualPath, ss, tsk.SetProgress, true) +} + +var ( + CopyTaskManager *tache.Manager[*FileTransferTask] + MoveTaskManager *tache.Manager[*FileTransferTask] +) diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 297e7351..2f23ba71 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -66,32 +66,16 @@ func MakeDir(ctx context.Context, path string, lazyCache ...bool) error { return err } -func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) error { - err := move(ctx, srcPath, dstDirPath, lazyCache...) +func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { + req, err := transfer(ctx, move, srcPath, dstDirPath, lazyCache...) if err != nil { log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err) } - return err -} - -func MoveWithTask(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { - res, err := _move(ctx, srcPath, dstDirPath, lazyCache...) - if err != nil { - log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err) - } - return res, err -} - -func MoveWithTaskAndValidation(ctx context.Context, srcPath, dstDirPath string, validateExistence bool, lazyCache ...bool) (task.TaskExtensionInfo, error) { - res, err := _moveWithValidation(ctx, srcPath, dstDirPath, validateExistence, lazyCache...) - if err != nil { - log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err) - } - return res, err + return req, err } func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { - res, err := _copy(ctx, srcObjPath, dstDirPath, lazyCache...) + res, err := transfer(ctx, copy, srcObjPath, dstDirPath, lazyCache...) if err != nil { log.Errorf("failed copy %s to %s: %+v", srcObjPath, dstDirPath, err) } diff --git a/internal/fs/move.go b/internal/fs/move.go deleted file mode 100644 index 7ffbdc21..00000000 --- a/internal/fs/move.go +++ /dev/null @@ -1,638 +0,0 @@ -package fs - -import ( - "context" - "fmt" - stdpath "path" - "sync" - "time" - - "github.com/OpenListTeam/OpenList/v4/internal/conf" - "github.com/OpenListTeam/OpenList/v4/internal/driver" - "github.com/OpenListTeam/OpenList/v4/internal/errs" - "github.com/OpenListTeam/OpenList/v4/internal/model" - "github.com/OpenListTeam/OpenList/v4/internal/op" - "github.com/OpenListTeam/OpenList/v4/internal/stream" - "github.com/OpenListTeam/OpenList/v4/internal/task" - "github.com/OpenListTeam/OpenList/v4/pkg/utils" - "github.com/OpenListTeam/OpenList/v4/server/common" - "github.com/OpenListTeam/tache" - "github.com/pkg/errors" -) - -type MoveTask struct { - task.TaskExtension - Status string `json:"-"` - SrcObjPath string `json:"src_path"` - DstDirPath string `json:"dst_path"` - srcStorage driver.Driver `json:"-"` - dstStorage driver.Driver `json:"-"` - SrcStorageMp string `json:"src_storage_mp"` - DstStorageMp string `json:"dst_storage_mp"` - IsRootTask bool `json:"is_root_task"` - RootTaskID string `json:"root_task_id"` - TotalFiles int `json:"total_files"` - CompletedFiles int `json:"completed_files"` - Phase string `json:"phase"` // "copying", "verifying", "deleting", "completed" - ValidateExistence bool `json:"validate_existence"` - mu sync.RWMutex `json:"-"` -} - -type MoveProgress struct { - TaskID string `json:"task_id"` - Phase string `json:"phase"` - TotalFiles int `json:"total_files"` - CompletedFiles int `json:"completed_files"` - CurrentFile string `json:"current_file"` - Status string `json:"status"` - Progress int `json:"progress"` -} - -var moveProgressMap = sync.Map{} - -func (t *MoveTask) GetName() string { - return fmt.Sprintf("move [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) -} - -func (t *MoveTask) GetStatus() string { - t.mu.RLock() - defer t.mu.RUnlock() - return t.Status -} - -func (t *MoveTask) GetProgress() float64 { - t.mu.RLock() - defer t.mu.RUnlock() - - if t.TotalFiles == 0 { - return 0 - } - - switch t.Phase { - case "copying": - return float64(t.CompletedFiles*60) / float64(t.TotalFiles) - case "verifying": - return 60 + float64(t.CompletedFiles*20)/float64(t.TotalFiles) - case "deleting": - return 80 + float64(t.CompletedFiles*20)/float64(t.TotalFiles) - case "completed": - return 100 - default: - return 0 - } -} - -func (t *MoveTask) GetMoveProgress() *MoveProgress { - t.mu.RLock() - defer t.mu.RUnlock() - - progress := int(t.GetProgress()) - - return &MoveProgress{ - TaskID: t.GetID(), - Phase: t.Phase, - TotalFiles: t.TotalFiles, - CompletedFiles: t.CompletedFiles, - CurrentFile: t.SrcObjPath, - Status: t.Status, - Progress: progress, - } -} - -func (t *MoveTask) updateProgress() { - if t.IsRootTask { - progress := t.GetMoveProgress() - moveProgressMap.Store(t.GetID(), progress) - } -} - -func (t *MoveTask) Run() error { - if err := t.ReinitCtx(); err != nil { - return err - } - t.ClearEndTime() - t.SetStartTime(time.Now()) - defer func() { - t.SetEndTime(time.Now()) - if t.IsRootTask { - moveProgressMap.Delete(t.GetID()) - } - }() - - var err error - if t.srcStorage == nil { - t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) - } - if t.dstStorage == nil { - t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) - } - if err != nil { - return errors.WithMessage(err, "failed get storage") - } - - // Use the task object's memory address as a unique identifier - taskID := fmt.Sprintf("%p", t) - - // Register task to batch tracker - moveBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath) - - // Phase 1: Async validation (all validation happens in background) - t.mu.Lock() - t.Status = "validating source and destination" - t.mu.Unlock() - - // Check if source exists - srcObj, err := op.Get(t.Ctx(), t.srcStorage, t.SrcObjPath) - if err != nil { - // Clean up tracker records if task failed - moveBatchTracker.MarkTaskCompletedWithRefresh(taskID) - return errors.WithMessagef(err, "source file [%s] not found", stdpath.Base(t.SrcObjPath)) - } - - // Check if destination already exists (if validation is required) - if t.ValidateExistence { - dstFilePath := stdpath.Join(t.DstDirPath, srcObj.GetName()) - if res, _ := op.Get(t.Ctx(), t.dstStorage, dstFilePath); res != nil { - // Clean up tracker records if task failed - moveBatchTracker.MarkTaskCompletedWithRefresh(taskID) - return errors.Errorf("destination file [%s] already exists", srcObj.GetName()) - } - } - - // Phase 2: Execute move operation with proper sequencing - // Determine if we should use batch optimization for directories - if srcObj.IsDir() { - t.mu.Lock() - t.IsRootTask = true - t.RootTaskID = t.GetID() - t.mu.Unlock() - err = t.runRootMoveTask() - } else { - // Use safe move logic for files - err = t.safeMoveOperation(srcObj) - } - - // Mark task completed and automatically refresh cache if needed - moveBatchTracker.MarkTaskCompletedWithRefresh(taskID) - - return err -} - -func (t *MoveTask) runRootMoveTask() error { - // First check if source is actually a directory - // If not, fall back to regular move logic - srcObj, err := op.Get(t.Ctx(), t.srcStorage, t.SrcObjPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] object", t.SrcObjPath) - } - - if !srcObj.IsDir() { - // Source is not a directory, use regular move logic - t.mu.Lock() - t.IsRootTask = false - t.mu.Unlock() - return t.safeMoveOperation(srcObj) - } - - // Phase 1: Count total files and create directory structure - t.mu.Lock() - t.Phase = "preparing" - t.Status = "counting files and preparing directory structure" - t.mu.Unlock() - t.updateProgress() - - totalFiles, err := t.countFilesAndCreateDirs(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - if err != nil { - return errors.WithMessage(err, "failed to prepare directory structure") - } - - t.mu.Lock() - t.TotalFiles = totalFiles - t.Phase = "copying" - t.Status = "copying files" - t.mu.Unlock() - t.updateProgress() - - // Phase 2: Copy all files - err = t.copyAllFiles(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - if err != nil { - return errors.WithMessage(err, "failed to copy files") - } - - // Phase 3: Verify directory structure - t.mu.Lock() - t.Phase = "verifying" - t.Status = "verifying copied files" - t.CompletedFiles = 0 - t.mu.Unlock() - t.updateProgress() - - err = t.verifyDirectoryStructure(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - if err != nil { - return errors.WithMessage(err, "verification failed") - } - - // Phase 4: Delete source files and directories - t.mu.Lock() - t.Phase = "deleting" - t.Status = "deleting source files" - t.CompletedFiles = 0 - t.mu.Unlock() - t.updateProgress() - - err = t.deleteSourceRecursively(t.srcStorage, t.SrcObjPath) - if err != nil { - return errors.WithMessage(err, "failed to delete source files") - } - - t.mu.Lock() - t.Phase = "completed" - t.Status = "completed" - t.mu.Unlock() - t.updateProgress() - - - return nil -} - -var MoveTaskManager *tache.Manager[*MoveTask] - -// Batch tracker for move operations -var moveBatchTracker = NewBatchTracker("move") - -// GetMoveProgress returns the progress of a move task by task ID -func GetMoveProgress(taskID string) (*MoveProgress, bool) { - if progress, ok := moveProgressMap.Load(taskID); ok { - return progress.(*MoveProgress), true - } - return nil, false -} - -// GetMoveTaskProgress returns the progress of a specific move task -func GetMoveTaskProgress(task *MoveTask) *MoveProgress { - return task.GetMoveProgress() -} - -// countFilesAndCreateDirs recursively counts files and creates directory structure -func (t *MoveTask) countFilesAndCreateDirs(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) (int, error) { - srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath) - if err != nil { - return 0, errors.WithMessagef(err, "failed get src [%s] object", srcPath) - } - - if !srcObj.IsDir() { - return 1, nil - } - - // Create destination directory - dstObjPath := stdpath.Join(dstPath, srcObj.GetName()) - err = op.MakeDir(t.Ctx(), dstStorage, dstObjPath) - if err != nil { - if errors.Is(err, errs.UploadNotSupported) { - return 0, errors.WithMessagef(err, "destination storage [%s] does not support creating directories", dstStorage.GetStorage().MountPath) - } - return 0, errors.WithMessagef(err, "failed to create destination directory [%s] in storage [%s]", dstObjPath, dstStorage.GetStorage().MountPath) - } - - // List and count files recursively - objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{}) - if err != nil { - return 0, errors.WithMessagef(err, "failed list src [%s] objs", srcPath) - } - - totalFiles := 0 - for _, obj := range objs { - if utils.IsCanceled(t.Ctx()) { - return 0, nil - } - srcSubPath := stdpath.Join(srcPath, obj.GetName()) - subCount, err := t.countFilesAndCreateDirs(srcStorage, dstStorage, srcSubPath, dstObjPath) - if err != nil { - return 0, err - } - totalFiles += subCount - } - - return totalFiles, nil -} - -// copyAllFiles recursively copies all files -func (t *MoveTask) copyAllFiles(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) error { - srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] object", srcPath) - } - - if !srcObj.IsDir() { - // Copy single file - err := t.copyFile(srcStorage, dstStorage, srcPath, dstPath) - if err != nil { - return err - } - - t.mu.Lock() - t.CompletedFiles++ - t.mu.Unlock() - t.updateProgress() - return nil - } - - // Copy directory contents - objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs", srcPath) - } - - dstObjPath := stdpath.Join(dstPath, srcObj.GetName()) - for _, obj := range objs { - if utils.IsCanceled(t.Ctx()) { - return nil - } - srcSubPath := stdpath.Join(srcPath, obj.GetName()) - err := t.copyAllFiles(srcStorage, dstStorage, srcSubPath, dstObjPath) - if err != nil { - return err - } - } - - return nil -} - -// copyFile copies a single file between storages -func (t *MoveTask) copyFile(srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error { - srcFile, err := op.Get(t.Ctx(), srcStorage, srcFilePath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) - } - - link, _, err := op.Link(t.Ctx(), srcStorage, srcFilePath, model.LinkArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed get [%s] link", srcFilePath) - } - ss, err := stream.NewSeekableStream(&stream.FileStream{ - Obj: srcFile, - Ctx: t.Ctx(), - }, link) - if err != nil { - _ = link.Close() - return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) - } - return op.Put(t.Ctx(), dstStorage, dstDirPath, ss, nil, true) -} - -// verifyDirectoryStructure compares source and destination directory structures -func (t *MoveTask) verifyDirectoryStructure(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) error { - srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] object", srcPath) - } - - if !srcObj.IsDir() { - // Verify single file - dstFilePath := stdpath.Join(dstPath, srcObj.GetName()) - _, err := op.Get(t.Ctx(), dstStorage, dstFilePath) - if err != nil { - return errors.WithMessagef(err, "verification failed: destination file [%s] not found", dstFilePath) - } - - t.mu.Lock() - t.CompletedFiles++ - t.mu.Unlock() - t.updateProgress() - return nil - } - - // Verify directory - dstObjPath := stdpath.Join(dstPath, srcObj.GetName()) - _, err = op.Get(t.Ctx(), dstStorage, dstObjPath) - if err != nil { - return errors.WithMessagef(err, "verification failed: destination directory [%s] not found", dstObjPath) - } - - // Verify directory contents - srcObjs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs for verification", srcPath) - } - - for _, obj := range srcObjs { - if utils.IsCanceled(t.Ctx()) { - return nil - } - srcSubPath := stdpath.Join(srcPath, obj.GetName()) - err := t.verifyDirectoryStructure(srcStorage, dstStorage, srcSubPath, dstObjPath) - if err != nil { - return err - } - } - - return nil -} - -// deleteSourceRecursively deletes source files and directories recursively -func (t *MoveTask) deleteSourceRecursively(srcStorage driver.Driver, srcPath string) error { - srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] object for deletion", srcPath) - } - - if !srcObj.IsDir() { - // Delete single file - err := op.Remove(t.Ctx(), srcStorage, srcPath) - if err != nil { - return errors.WithMessagef(err, "failed to delete src [%s] file", srcPath) - } - - t.mu.Lock() - t.CompletedFiles++ - t.mu.Unlock() - t.updateProgress() - return nil - } - - // Delete directory contents first - objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs for deletion", srcPath) - } - - for _, obj := range objs { - if utils.IsCanceled(t.Ctx()) { - return nil - } - srcSubPath := stdpath.Join(srcPath, obj.GetName()) - err := t.deleteSourceRecursively(srcStorage, srcSubPath) - if err != nil { - return err - } - } - - // Delete the directory itself - err = op.Remove(t.Ctx(), srcStorage, srcPath) - if err != nil { - return errors.WithMessagef(err, "failed to delete src [%s] directory", srcPath) - } - - return nil -} - -func moveBetween2Storages(t *MoveTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error { - t.Status = "getting src object" - srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath) - if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath) - } - - if srcObj.IsDir() { - t.Status = "src object is dir, listing objs" - objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{}) - if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath) - } - - dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName()) - t.Status = "creating destination directory" - err = op.MakeDir(t.Ctx(), dstStorage, dstObjPath) - if err != nil { - // Check if this is an upload-related error and provide a clearer message - if errors.Is(err, errs.UploadNotSupported) { - return errors.WithMessagef(err, "destination storage [%s] does not support creating directories", dstStorage.GetStorage().MountPath) - } - return errors.WithMessagef(err, "failed to create destination directory [%s] in storage [%s]", dstObjPath, dstStorage.GetStorage().MountPath) - } - - for _, obj := range objs { - if utils.IsCanceled(t.Ctx()) { - return nil - } - srcSubObjPath := stdpath.Join(srcObjPath, obj.GetName()) - subTask := &MoveTask{ - TaskExtension: task.TaskExtension{ - Creator: t.GetCreator(), - ApiUrl: t.ApiUrl, - }, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcSubObjPath, - DstDirPath: dstObjPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, - } - MoveTaskManager.Add(subTask) - } - - t.Status = "cleaning up source directory" - err = op.Remove(t.Ctx(), srcStorage, srcObjPath) - if err != nil { - t.Status = "completed (source directory cleanup pending)" - } else { - t.Status = "completed" - } - - - return nil - } else { - return moveFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath) - } -} - -func moveFileBetween2Storages(tsk *MoveTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error { - tsk.Status = "copying file to destination" - - copyTask := &CopyTask{ - TaskExtension: task.TaskExtension{ - Creator: tsk.GetCreator(), - ApiUrl: tsk.ApiUrl, - }, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcFilePath, - DstDirPath: dstDirPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, - } - - copyTask.SetCtx(tsk.Ctx()) - - err := copyBetween2Storages(copyTask, srcStorage, dstStorage, srcFilePath, dstDirPath) - if err != nil { - // Check if this is an upload-related error and provide a clearer message - if errors.Is(err, errs.UploadNotSupported) { - return errors.WithMessagef(err, "destination storage [%s] does not support file uploads", dstStorage.GetStorage().MountPath) - } - return errors.WithMessagef(err, "failed to copy [%s] to destination storage [%s]", srcFilePath, dstStorage.GetStorage().MountPath) - } - - tsk.SetProgress(50) - - tsk.Status = "deleting source file" - err = op.Remove(tsk.Ctx(), srcStorage, srcFilePath) - if err != nil { - return errors.WithMessagef(err, "failed to delete src [%s] file from storage [%s] after successful copy", srcFilePath, srcStorage.GetStorage().MountPath) - } - - - tsk.SetProgress(100) - tsk.Status = "completed" - return nil -} - -// safeMoveOperation ensures copy-then-delete sequence for safe move operations -func (t *MoveTask) safeMoveOperation(srcObj model.Obj) error { - if srcObj.IsDir() { - // For directories, use the original logic but ensure proper sequencing - return moveBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - } else { - // For files, use the safe file move logic - return moveFileBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) - } -} - -func _move(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) { - return _moveWithValidation(ctx, srcObjPath, dstDirPath, false, lazyCache...) -} - -func _moveWithValidation(ctx context.Context, srcObjPath, dstDirPath string, validateExistence bool, lazyCache ...bool) (task.TaskExtensionInfo, error) { - srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath) - if err != nil { - return nil, errors.WithMessage(err, "failed get src storage") - } - dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) - if err != nil { - return nil, errors.WithMessage(err, "failed get dst storage") - } - - // Try native move first if in the same storage - if srcStorage.GetStorage() == dstStorage.GetStorage() { - err = op.Move(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) - if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) { - if err == nil { - // For same-storage moves, refresh cache immediately since no batch tracking is used - op.ClearCache(dstStorage, dstDirActualPath) - } - return nil, err - } - } - - taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) - - // Create task immediately without any synchronous checks to avoid blocking frontend - // All validation and type checking will be done asynchronously in the Run method - t := &MoveTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, - ApiUrl: common.GetApiUrl(ctx), - }, - srcStorage: srcStorage, - dstStorage: dstStorage, - SrcObjPath: srcObjActualPath, - DstDirPath: dstDirActualPath, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, - ValidateExistence: validateExistence, - Phase: "initializing", - } - - MoveTaskManager.Add(t) - return t, nil -} diff --git a/internal/fs/other.go b/internal/fs/other.go index f34093ce..8d16b000 100644 --- a/internal/fs/other.go +++ b/internal/fs/other.go @@ -3,9 +3,10 @@ package fs import ( "context" - "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/internal/task" "github.com/pkg/errors" ) @@ -17,21 +18,6 @@ func makeDir(ctx context.Context, path string, lazyCache ...bool) error { return op.MakeDir(ctx, storage, actualPath, lazyCache...) } -func move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) error { - srcStorage, srcActualPath, err := op.GetStorageAndActualPath(srcPath) - if err != nil { - return errors.WithMessage(err, "failed get src storage") - } - dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath) - if err != nil { - return errors.WithMessage(err, "failed get dst storage") - } - if srcStorage.GetStorage() != dstStorage.GetStorage() { - return errors.WithStack(errs.MoveBetweenTwoStorages) - } - return op.Move(ctx, srcStorage, srcActualPath, dstDirActualPath, lazyCache...) -} - func rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error { storage, srcActualPath, err := op.GetStorageAndActualPath(srcPath) if err != nil { @@ -56,3 +42,18 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) { args.Path = actualPath return op.Other(ctx, storage, args) } + +type TaskData struct { + task.TaskExtension + Status string `json:"-"` //don't save status to save space + SrcActualPath string `json:"src_path"` + DstActualPath string `json:"dst_path"` + SrcStorage driver.Driver `json:"-"` + DstStorage driver.Driver `json:"-"` + SrcStorageMp string `json:"src_storage_mp"` + DstStorageMp string `json:"dst_storage_mp"` +} + +func (t *TaskData) GetStatus() string { + return t.Status +} diff --git a/internal/fs/put.go b/internal/fs/put.go index c5872de0..887c8d63 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -3,14 +3,18 @@ package fs import ( "context" "fmt" + stdpath "path" "time" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/internal/task_group" "github.com/OpenListTeam/tache" "github.com/pkg/errors" ) @@ -37,6 +41,22 @@ func (t *UploadTask) Run() error { return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true) } +func (t *UploadTask) OnSucceeded() { + task_group.TransferCoordinator.Done(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), true) +} + +func (t *UploadTask) OnFailed() { + task_group.TransferCoordinator.Done(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), false) +} + +func (t *UploadTask) SetRetry(retry int, maxRetry int) { + t.TaskExtension.SetRetry(retry, maxRetry) + if retry == 0 && + (t.GetErr() == nil && t.GetState() != tache.StatePending) { // 手动重试 + task_group.TransferCoordinator.AddTask(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), nil) + } +} + var UploadTaskManager *tache.Manager[*UploadTask] // putAsTask add as a put task and return immediately @@ -60,12 +80,14 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) t := &UploadTask{ TaskExtension: task.TaskExtension{ Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), }, storage: storage, dstDirActualPath: dstDirActualPath, file: file, } t.SetTotalBytes(file.GetSize()) + task_group.TransferCoordinator.AddTask(dstDirPath, nil) UploadTaskManager.Add(t) return t, nil } diff --git a/internal/fs/task.go b/internal/fs/task.go deleted file mode 100644 index 94f84c77..00000000 --- a/internal/fs/task.go +++ /dev/null @@ -1,153 +0,0 @@ -package fs - -import ( - "sync" - "time" - - "github.com/OpenListTeam/OpenList/v4/internal/driver" - "github.com/OpenListTeam/OpenList/v4/internal/op" -) - -// BatchTracker manages batch operations for cache refresh optimization -// It aggregates multiple file operations by target directory and only refreshes -// the cache once when all operations in a directory are completed -type BatchTracker struct { - mu sync.Mutex - dirTasks map[string]*dirTaskInfo // dstStoragePath+dstDirPath -> dirTaskInfo - pendingTasks map[string]string // taskID -> dstStoragePath+dstDirPath - lastCleanup time.Time // last cleanup time - name string // tracker name for debugging -} - -type dirTaskInfo struct { - dstStorage driver.Driver - dstDirPath string - pendingTasks map[string]bool // taskID -> true - lastActivity time.Time // last activity time (used for detecting abnormal situations) -} - -// NewBatchTracker creates a new batch tracker instance -func NewBatchTracker(name string) *BatchTracker { - return &BatchTracker{ - dirTasks: make(map[string]*dirTaskInfo), - pendingTasks: make(map[string]string), - lastCleanup: time.Now(), - name: name, - } -} - -// getDirKey generates unique key for target directory -func (bt *BatchTracker) getDirKey(dstStorage driver.Driver, dstDirPath string) string { - return dstStorage.GetStorage().MountPath + ":" + dstDirPath -} - -// RegisterTask registers a task to target directory for batch tracking -func (bt *BatchTracker) RegisterTask(taskID string, dstStorage driver.Driver, dstDirPath string) { - bt.mu.Lock() - defer bt.mu.Unlock() - - // Periodically clean up expired entries - bt.cleanupIfNeeded() - - dirKey := bt.getDirKey(dstStorage, dstDirPath) - - // Record task to directory mapping - bt.pendingTasks[taskID] = dirKey - - // Initialize or update directory task information - if info, exists := bt.dirTasks[dirKey]; exists { - info.pendingTasks[taskID] = true - info.lastActivity = time.Now() - } else { - bt.dirTasks[dirKey] = &dirTaskInfo{ - dstStorage: dstStorage, - dstDirPath: dstDirPath, - pendingTasks: map[string]bool{taskID: true}, - lastActivity: time.Now(), - } - } -} - -// MarkTaskCompleted marks a task as completed and returns whether cache refresh is needed -// Returns (shouldRefresh, dstStorage, dstDirPath) -func (bt *BatchTracker) MarkTaskCompleted(taskID string) (bool, driver.Driver, string) { - bt.mu.Lock() - defer bt.mu.Unlock() - - dirKey, exists := bt.pendingTasks[taskID] - if !exists { - return false, nil, "" - } - - // Remove from pending tasks - delete(bt.pendingTasks, taskID) - - info, exists := bt.dirTasks[dirKey] - if !exists { - return false, nil, "" - } - - // Remove from directory tasks - delete(info.pendingTasks, taskID) - - // If no pending tasks left in this directory, trigger cache refresh - if len(info.pendingTasks) == 0 { - dstStorage := info.dstStorage - dstDirPath := info.dstDirPath - delete(bt.dirTasks, dirKey) // Delete directly, no need to update lastActivity - return true, dstStorage, dstDirPath - } - - // Only update lastActivity when there are other tasks (indicating the directory still has active tasks) - info.lastActivity = time.Now() - return false, nil, "" -} - -// MarkTaskCompletedWithRefresh marks a task as completed and automatically refreshes cache if needed -func (bt *BatchTracker) MarkTaskCompletedWithRefresh(taskID string) { - shouldRefresh, dstStorage, dstDirPath := bt.MarkTaskCompleted(taskID) - if shouldRefresh { - op.ClearCache(dstStorage, dstDirPath) - } -} - -// cleanupIfNeeded checks if cleanup is needed and executes cleanup if necessary -func (bt *BatchTracker) cleanupIfNeeded() { - now := time.Now() - // Clean up every 10 minutes - if now.Sub(bt.lastCleanup) > 10*time.Minute { - bt.cleanupStaleEntries() - bt.lastCleanup = now - } -} - -// cleanupStaleEntries cleans up timed-out tasks to prevent memory leaks -// Mainly used to clean up residual entries caused by abnormal situations (such as task crashes, process restarts, etc.) -func (bt *BatchTracker) cleanupStaleEntries() { - now := time.Now() - for dirKey, info := range bt.dirTasks { - // If no activity for more than 1 hour, it may indicate an abnormal situation, clean up this entry - // Under normal circumstances, MarkTaskCompleted will be called when the task is completed and the entire entry will be deleted - if now.Sub(info.lastActivity) > time.Hour { - // Clean up related pending tasks - for taskID := range info.pendingTasks { - delete(bt.pendingTasks, taskID) - } - delete(bt.dirTasks, dirKey) - } - } -} - -// GetPendingTaskCount returns the number of pending tasks for debugging -func (bt *BatchTracker) GetPendingTaskCount() int { - bt.mu.Lock() - defer bt.mu.Unlock() - return len(bt.pendingTasks) -} - -// GetDirTaskCount returns the number of directories being tracked for debugging -func (bt *BatchTracker) GetDirTaskCount() int { - bt.mu.Lock() - defer bt.mu.Unlock() - return len(bt.dirTasks) -} diff --git a/internal/net/serve.go b/internal/net/serve.go index 4177bf88..1fd40b1c 100644 --- a/internal/net/serve.go +++ b/internal/net/serve.go @@ -6,10 +6,8 @@ import ( "crypto/tls" "fmt" "io" - "mime" "mime/multipart" "net/http" - "path/filepath" "strconv" "strings" "sync" @@ -74,11 +72,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time contentTypes, haveType := w.Header()["Content-Type"] var contentType string if !haveType { - contentType = mime.TypeByExtension(filepath.Ext(name)) - if contentType == "" { - // most modern application can handle the default contentType - contentType = "application/octet-stream" - } + contentType = utils.GetMimeType(name) w.Header().Set("Content-Type", contentType) } else if len(contentTypes) > 0 { contentType = contentTypes[0] diff --git a/internal/offline_download/115_open/client.go b/internal/offline_download/115_open/client.go index 4b9773cc..d12e02ec 100644 --- a/internal/offline_download/115_open/client.go +++ b/internal/offline_download/115_open/client.go @@ -3,6 +3,7 @@ package _115_open import ( "context" "fmt" + _115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open" "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/setting" diff --git a/internal/offline_download/thunder_browser/thunder_browser.go b/internal/offline_download/thunder_browser/thunder_browser.go index a2e06008..9324d7a7 100644 --- a/internal/offline_download/thunder_browser/thunder_browser.go +++ b/internal/offline_download/thunder_browser/thunder_browser.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" + "strconv" + "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser" "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/setting" - "strconv" "github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/model" diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index 153d376d..7ab72810 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -2,7 +2,9 @@ package tool import ( "context" + _115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open" + "github.com/OpenListTeam/OpenList/v4/server/common" "net/url" stdpath "path" @@ -126,6 +128,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, erro t := &DownloadTask{ TaskExtension: task.TaskExtension{ Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), }, Url: args.URL, DstDirPath: args.DstDirPath, diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index ce36d189..ee85538b 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -6,10 +6,12 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/setting" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/internal/task_group" "github.com/OpenListTeam/tache" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -182,19 +184,24 @@ func (t *DownloadTask) Transfer() error { return errors.WithMessage(err, "failed get dst storage") } taskCreator, _ := t.Ctx().Value(conf.UserKey).(*model.User) - task := &TransferTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, + tsk := &TransferTask{ + TaskData: fs.TaskData{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: t.ApiUrl, + }, + SrcActualPath: t.TempDir, + DstActualPath: dstDirActualPath, + DstStorage: dstStorage, + DstStorageMp: dstStorage.GetStorage().MountPath, }, - SrcObjPath: t.TempDir, - DstDirPath: dstDirActualPath, - DstStorage: dstStorage, - DstStorageMp: dstStorage.GetStorage().MountPath, + groupID: t.DstDirPath, DeletePolicy: t.DeletePolicy, Url: t.Url, } - task.SetTotalBytes(t.GetTotalBytes()) - TransferTaskManager.Add(task) + tsk.SetTotalBytes(t.GetTotalBytes()) + task_group.TransferCoordinator.AddTask(tsk.groupID, nil) + TransferTaskManager.Add(tsk) return nil } return transferStd(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy) diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index 3b135a9e..1c1284a0 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -9,29 +9,25 @@ import ( "time" "github.com/OpenListTeam/OpenList/v4/internal/conf" - "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/stream" "github.com/OpenListTeam/OpenList/v4/internal/task" + "github.com/OpenListTeam/OpenList/v4/internal/task_group" "github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/OpenListTeam/OpenList/v4/server/common" "github.com/OpenListTeam/tache" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) type TransferTask struct { - task.TaskExtension - Status string `json:"-"` //don't save status to save space - SrcObjPath string `json:"src_obj_path"` - DstDirPath string `json:"dst_dir_path"` - SrcStorage driver.Driver `json:"-"` - DstStorage driver.Driver `json:"-"` - SrcStorageMp string `json:"src_storage_mp"` - DstStorageMp string `json:"dst_storage_mp"` - DeletePolicy DeletePolicy `json:"delete_policy"` - Url string `json:"-"` + fs.TaskData + DeletePolicy DeletePolicy `json:"delete_policy"` + Url string `json:"url"` + groupID string `json:"-"` } func (t *TransferTask) Run() error { @@ -51,10 +47,10 @@ func (t *TransferTask) Run() error { if err != nil { return err } - name := t.SrcObjPath + name := t.SrcActualPath mimetype := utils.GetMimeType(name) s := &stream.FileStream{ - Ctx: nil, + Ctx: t.Ctx(), Obj: &model.Object{ Name: name, Size: t.GetTotalBytes(), @@ -65,7 +61,7 @@ func (t *TransferTask) Run() error { Mimetype: mimetype, Closers: utils.NewClosers(r), } - return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress) + return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress) } return transferStdPath(t) } else { @@ -75,13 +71,9 @@ func (t *TransferTask) Run() error { func (t *TransferTask) GetName() string { if t.DeletePolicy == UploadDownloadStream { - return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcObjPath, t.Url, t.DstStorageMp, t.DstDirPath) + return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcActualPath, t.Url, t.DstStorageMp, t.DstActualPath) } - return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) -} - -func (t *TransferTask) GetStatus() string { - return t.Status + return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcActualPath, t.DstStorageMp, t.DstActualPath) } func (t *TransferTask) OnSucceeded() { @@ -92,6 +84,7 @@ func (t *TransferTask) OnSucceeded() { removeObjTemp(t) } } + task_group.TransferCoordinator.Done(t.groupID, true) } func (t *TransferTask) OnFailed() { @@ -102,6 +95,17 @@ func (t *TransferTask) OnFailed() { removeObjTemp(t) } } + task_group.TransferCoordinator.Done(t.groupID, false) +} + +func (t *TransferTask) SetRetry(retry int, maxRetry int) { + if retry == 0 && + (len(t.groupID) == 0 || // 重启恢复 + (t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试 + t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath) + task_group.TransferCoordinator.AddTask(t.groupID, nil) + } + t.TaskExtension.SetRetry(retry, maxRetry) } var ( @@ -120,15 +124,20 @@ func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy D taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) for _, entry := range entries { t := &TransferTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, + TaskData: fs.TaskData{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), + }, + SrcActualPath: stdpath.Join(tempDir, entry.Name()), + DstActualPath: dstDirActualPath, + DstStorage: dstStorage, + DstStorageMp: dstStorage.GetStorage().MountPath, }, - SrcObjPath: stdpath.Join(tempDir, entry.Name()), - DstDirPath: dstDirActualPath, - DstStorage: dstStorage, - DstStorageMp: dstStorage.GetStorage().MountPath, + groupID: dstDirPath, DeletePolicy: deletePolicy, } + task_group.TransferCoordinator.AddTask(dstDirPath, nil) TransferTaskManager.Add(t) } return nil @@ -136,31 +145,37 @@ func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy D func transferStdPath(t *TransferTask) error { t.Status = "getting src object" - info, err := os.Stat(t.SrcObjPath) + info, err := os.Stat(t.SrcActualPath) if err != nil { return err } if info.IsDir() { t.Status = "src object is dir, listing objs" - entries, err := os.ReadDir(t.SrcObjPath) + entries, err := os.ReadDir(t.SrcActualPath) if err != nil { return err } + dstDirActualPath := stdpath.Join(t.DstActualPath, info.Name()) + task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath)) for _, entry := range entries { - srcRawPath := stdpath.Join(t.SrcObjPath, entry.Name()) - dstObjPath := stdpath.Join(t.DstDirPath, info.Name()) - t := &TransferTask{ - TaskExtension: task.TaskExtension{ - Creator: t.Creator, + srcRawPath := stdpath.Join(t.SrcActualPath, entry.Name()) + task := &TransferTask{ + TaskData: fs.TaskData{ + TaskExtension: task.TaskExtension{ + Creator: t.Creator, + ApiUrl: t.ApiUrl, + }, + SrcActualPath: srcRawPath, + DstActualPath: dstDirActualPath, + DstStorage: t.DstStorage, + SrcStorageMp: t.SrcStorageMp, + DstStorageMp: t.DstStorageMp, }, - SrcObjPath: srcRawPath, - DstDirPath: dstObjPath, - DstStorage: t.DstStorage, - SrcStorageMp: t.SrcStorageMp, - DstStorageMp: t.DstStorageMp, + groupID: t.groupID, DeletePolicy: t.DeletePolicy, } - TransferTaskManager.Add(t) + task_group.TransferCoordinator.AddTask(t.groupID, nil) + TransferTaskManager.Add(task) } t.Status = "src object is dir, added all transfer tasks of files" return nil @@ -169,19 +184,19 @@ func transferStdPath(t *TransferTask) error { } func transferStdFile(t *TransferTask) error { - rc, err := os.Open(t.SrcObjPath) + rc, err := os.Open(t.SrcActualPath) if err != nil { - return errors.Wrapf(err, "failed to open file %s", t.SrcObjPath) + return errors.Wrapf(err, "failed to open file %s", t.SrcActualPath) } info, err := rc.Stat() if err != nil { - return errors.Wrapf(err, "failed to get file %s", t.SrcObjPath) + return errors.Wrapf(err, "failed to get file %s", t.SrcActualPath) } - mimetype := utils.GetMimeType(t.SrcObjPath) + mimetype := utils.GetMimeType(t.SrcActualPath) s := &stream.FileStream{ - Ctx: nil, + Ctx: t.Ctx(), Obj: &model.Object{ - Name: filepath.Base(t.SrcObjPath), + Name: filepath.Base(t.SrcActualPath), Size: info.Size(), Modified: info.ModTime(), IsFolder: false, @@ -191,16 +206,16 @@ func transferStdFile(t *TransferTask) error { Closers: utils.NewClosers(rc), } t.SetTotalBytes(info.Size()) - return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress) + return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress) } func removeStdTemp(t *TransferTask) { - info, err := os.Stat(t.SrcObjPath) + info, err := os.Stat(t.SrcActualPath) if err != nil || info.IsDir() { return } - if err := os.Remove(t.SrcObjPath); err != nil { - log.Errorf("failed to delete temp file %s, error: %s", t.SrcObjPath, err.Error()) + if err := os.Remove(t.SrcActualPath); err != nil { + log.Errorf("failed to delete temp file %s, error: %s", t.SrcActualPath, err.Error()) } } @@ -220,17 +235,22 @@ func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy D taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) // taskCreator is nil when convert failed for _, obj := range objs { t := &TransferTask{ - TaskExtension: task.TaskExtension{ - Creator: taskCreator, + TaskData: fs.TaskData{ + TaskExtension: task.TaskExtension{ + Creator: taskCreator, + ApiUrl: common.GetApiUrl(ctx), + }, + SrcActualPath: stdpath.Join(srcObjActualPath, obj.GetName()), + DstActualPath: dstDirActualPath, + SrcStorage: srcStorage, + DstStorage: dstStorage, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, }, - SrcObjPath: stdpath.Join(srcObjActualPath, obj.GetName()), - DstDirPath: dstDirActualPath, - SrcStorage: srcStorage, - DstStorage: dstStorage, - SrcStorageMp: srcStorage.GetStorage().MountPath, - DstStorageMp: dstStorage.GetStorage().MountPath, + groupID: dstDirPath, DeletePolicy: deletePolicy, } + task_group.TransferCoordinator.AddTask(dstDirPath, nil) TransferTaskManager.Add(t) } return nil @@ -238,32 +258,38 @@ func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy D func transferObjPath(t *TransferTask) error { t.Status = "getting src object" - srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath) + srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath) if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath) + return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath) } if srcObj.IsDir() { t.Status = "src object is dir, listing objs" - objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.ListArgs{}) + objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{}) if err != nil { - return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcObjPath) + return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath) } + dstDirActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName()) + task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath)) for _, obj := range objs { if utils.IsCanceled(t.Ctx()) { return nil } - srcObjPath := stdpath.Join(t.SrcObjPath, obj.GetName()) - dstObjPath := stdpath.Join(t.DstDirPath, srcObj.GetName()) + srcObjPath := stdpath.Join(t.SrcActualPath, obj.GetName()) + task_group.TransferCoordinator.AddTask(t.groupID, nil) TransferTaskManager.Add(&TransferTask{ - TaskExtension: task.TaskExtension{ - Creator: t.Creator, + TaskData: fs.TaskData{ + TaskExtension: task.TaskExtension{ + Creator: t.Creator, + ApiUrl: t.ApiUrl, + }, + SrcActualPath: srcObjPath, + DstActualPath: dstDirActualPath, + SrcStorage: t.SrcStorage, + DstStorage: t.DstStorage, + SrcStorageMp: t.SrcStorageMp, + DstStorageMp: t.DstStorageMp, }, - SrcObjPath: srcObjPath, - DstDirPath: dstObjPath, - SrcStorage: t.SrcStorage, - DstStorage: t.DstStorage, - SrcStorageMp: t.SrcStorageMp, - DstStorageMp: t.DstStorageMp, + groupID: t.groupID, DeletePolicy: t.DeletePolicy, }) } @@ -274,13 +300,13 @@ func transferObjPath(t *TransferTask) error { } func transferObjFile(t *TransferTask) error { - srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath) + srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath) if err != nil { - return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath) + return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath) } - link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{}) + link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{}) if err != nil { - return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath) + return errors.WithMessagef(err, "failed get [%s] link", t.SrcActualPath) } // any link provided is seekable ss, err := stream.NewSeekableStream(&stream.FileStream{ @@ -289,18 +315,18 @@ func transferObjFile(t *TransferTask) error { }, link) if err != nil { _ = link.Close() - return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath) + return errors.WithMessagef(err, "failed get [%s] stream", t.SrcActualPath) } t.SetTotalBytes(ss.GetSize()) - return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, ss, t.SetProgress) + return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, ss, t.SetProgress) } func removeObjTemp(t *TransferTask) { - srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath) + srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath) if err != nil || srcObj.IsDir() { return } - if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcObjPath); err != nil { - log.Errorf("failed to delete temp obj %s, error: %s", t.SrcObjPath, err.Error()) + if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcActualPath); err != nil { + log.Errorf("failed to delete temp obj %s, error: %s", t.SrcActualPath, err.Error()) } } diff --git a/internal/op/archive.go b/internal/op/archive.go index 0ae34fd2..c1f7546b 100644 --- a/internal/op/archive.go +++ b/internal/op/archive.go @@ -489,18 +489,18 @@ func ArchiveDecompress(ctx context.Context, storage driver.Driver, srcPath, dstD var newObjs []model.Obj newObjs, err = s.ArchiveDecompress(ctx, srcObj, dstDir, args) if err == nil { - if newObjs != nil && len(newObjs) > 0 { + if len(newObjs) > 0 { for _, newObj := range newObjs { addCacheObj(storage, dstDirPath, model.WrapObjName(newObj)) } } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } case driver.ArchiveDecompress: err = s.ArchiveDecompress(ctx, srcObj, dstDir, args) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } default: return errs.NotImplement diff --git a/internal/op/fs.go b/internal/op/fs.go index 6031f4e9..f3a9dcc0 100644 --- a/internal/op/fs.go +++ b/internal/op/fs.go @@ -103,6 +103,10 @@ func ClearCache(storage driver.Driver, path string) { listCache.Del(Key(storage, path)) } +func DeleteCache(storage driver.Driver, path string) { + listCache.Del(Key(storage, path)) +} + func Key(storage driver.Driver, path string) string { return stdpath.Join(storage.GetStorage().MountPath, utils.FixAndCleanPath(path)) } @@ -355,13 +359,13 @@ func MakeDir(ctx context.Context, storage driver.Driver, path string, lazyCache if newObj != nil { addCacheObj(storage, parentPath, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, parentPath) + DeleteCache(storage, parentPath) } } case driver.Mkdir: err = s.MakeDir(ctx, parentDir, dirName) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, parentPath) + DeleteCache(storage, parentPath) } default: return nil, errs.NotImplement @@ -406,7 +410,7 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string if newObj != nil { addCacheObj(storage, dstDirPath, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } case driver.Move: @@ -414,7 +418,7 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string if err == nil { delCacheObj(storage, srcDirPath, srcRawObj) if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } default: @@ -443,13 +447,13 @@ func Rename(ctx context.Context, storage driver.Driver, srcPath, dstName string, if newObj != nil { updateCacheObj(storage, srcDirPath, srcRawObj, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, srcDirPath) + DeleteCache(storage, srcDirPath) } } case driver.Rename: err = s.Rename(ctx, srcObj, dstName) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, srcDirPath) + DeleteCache(storage, srcDirPath) } default: return errs.NotImplement @@ -481,13 +485,13 @@ func Copy(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string if newObj != nil { addCacheObj(storage, dstDirPath, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } case driver.Copy: err = s.Copy(ctx, srcObj, dstDir) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } default: return errs.NotImplement @@ -590,13 +594,13 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file mod if newObj != nil { addCacheObj(storage, dstDirPath, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } case driver.Put: err = s.Put(ctx, parentDir, file, up) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } default: return errs.NotImplement @@ -648,13 +652,13 @@ func PutURL(ctx context.Context, storage driver.Driver, dstDirPath, dstName, url if newObj != nil { addCacheObj(storage, dstDirPath, model.WrapObjName(newObj)) } else if !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } } case driver.PutURL: err = s.PutURL(ctx, dstDir, dstName, url) if err == nil && !utils.IsBool(lazyCache...) { - ClearCache(storage, dstDirPath) + DeleteCache(storage, dstDirPath) } default: return errs.NotImplement diff --git a/internal/task_group/group.go b/internal/task_group/group.go new file mode 100644 index 00000000..90cb9588 --- /dev/null +++ b/internal/task_group/group.go @@ -0,0 +1,81 @@ +package task_group + +import ( + "sync" + + "github.com/sirupsen/logrus" +) + +type OnCompletionFunc func(groupID string, payloads []any) +type TaskGroupCoordinator struct { + name string + mu sync.Mutex + + groupPayloads map[string][]any + groupStates map[string]groupState + onCompletion OnCompletionFunc +} + +type groupState struct { + pending int + hasSuccess bool +} + +func NewTaskGroupCoordinator(name string, f OnCompletionFunc) *TaskGroupCoordinator { + return &TaskGroupCoordinator{ + name: name, + groupPayloads: map[string][]any{}, + groupStates: map[string]groupState{}, + onCompletion: f, + } +} + +// payload可为nil +func (tgc *TaskGroupCoordinator) AddTask(groupID string, payload any) { + tgc.mu.Lock() + defer tgc.mu.Unlock() + state := tgc.groupStates[groupID] + state.pending++ + tgc.groupStates[groupID] = state + logrus.Debugf("AddTask:%s ,count=%+v", groupID, state) + if payload == nil { + return + } + tgc.groupPayloads[groupID] = append(tgc.groupPayloads[groupID], payload) +} + +func (tgc *TaskGroupCoordinator) AppendPayload(groupID string, payload any) { + if payload == nil { + return + } + tgc.mu.Lock() + defer tgc.mu.Unlock() + tgc.groupPayloads[groupID] = append(tgc.groupPayloads[groupID], payload) +} + +func (tgc *TaskGroupCoordinator) Done(groupID string, success bool) { + tgc.mu.Lock() + defer tgc.mu.Unlock() + state, ok := tgc.groupStates[groupID] + if !ok || state.pending == 0 { + return + } + if success { + state.hasSuccess = true + } + logrus.Debugf("Done:%s ,state=%+v", groupID, state) + if state.pending == 1 { + payloads := tgc.groupPayloads[groupID] + delete(tgc.groupStates, groupID) + delete(tgc.groupPayloads, groupID) + if tgc.onCompletion != nil && state.hasSuccess { + logrus.Debugf("OnCompletion:%s", groupID) + tgc.mu.Unlock() + tgc.onCompletion(groupID, payloads) + tgc.mu.Lock() + } + return + } + state.pending-- + tgc.groupStates[groupID] = state +} diff --git a/internal/task_group/transfer.go b/internal/task_group/transfer.go new file mode 100644 index 00000000..b2eca7f2 --- /dev/null +++ b/internal/task_group/transfer.go @@ -0,0 +1,103 @@ +package task_group + +import ( + "context" + "fmt" + "path" + + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +type SrcPathToRemove string + +// ActualPath +type DstPathToRefresh string + +func refreshAndRemove(dstPath string, payloads []any) { + dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath) + if err != nil { + log.Error(errors.WithMessage(err, "failed get dst storage")) + return + } + _, dstNeedRefresh := dstStorage.(driver.Put) + dstNeedRefresh = dstNeedRefresh && !dstStorage.Config().NoCache + if dstNeedRefresh { + op.DeleteCache(dstStorage, dstActualPath) + } + var ctx context.Context + for _, payload := range payloads { + switch p := payload.(type) { + case DstPathToRefresh: + if dstNeedRefresh { + op.DeleteCache(dstStorage, string(p)) + } + case SrcPathToRemove: + if ctx == nil { + ctx = context.Background() + } + srcStorage, srcActualPath, err := op.GetStorageAndActualPath(string(p)) + if err != nil { + log.Error(errors.WithMessage(err, "failed get src storage")) + continue + } + err = verifyAndRemove(ctx, srcStorage, dstStorage, srcActualPath, dstActualPath, dstNeedRefresh) + if err != nil { + log.Error(err) + } + } + } +} + +func verifyAndRemove(ctx context.Context, srcStorage, dstStorage driver.Driver, srcPath, dstPath string, refresh bool) error { + srcObj, err := op.Get(ctx, srcStorage, srcPath) + if err != nil { + return errors.WithMessagef(err, "failed get src [%s] file", path.Join(srcStorage.GetStorage().MountPath, srcPath)) + } + + dstObjPath := path.Join(dstPath, srcObj.GetName()) + dstObj, err := op.Get(ctx, dstStorage, dstObjPath) + if err != nil { + return errors.WithMessagef(err, "failed get dst [%s] file", path.Join(dstStorage.GetStorage().MountPath, dstObjPath)) + } + + if !dstObj.IsDir() { + err = op.Remove(ctx, srcStorage, srcPath) + if err != nil { + return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err) + } + return nil + } + + // Verify directory + srcObjs, err := op.List(ctx, srcStorage, srcPath, model.ListArgs{}) + if err != nil { + return errors.WithMessagef(err, "failed list src [%s] objs", path.Join(srcStorage.GetStorage().MountPath, srcPath)) + } + + if refresh { + op.DeleteCache(dstStorage, dstObjPath) + } + hasErr := false + for _, obj := range srcObjs { + srcSubPath := path.Join(srcPath, obj.GetName()) + err := verifyAndRemove(ctx, srcStorage, dstStorage, srcSubPath, dstObjPath, refresh) + if err != nil { + log.Error(err) + hasErr = true + } + } + if hasErr { + return errors.Errorf("some subitems of [%s] failed to verify and remove", path.Join(srcStorage.GetStorage().MountPath, srcPath)) + } + err = op.Remove(ctx, srcStorage, srcPath) + if err != nil { + return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err) + } + return nil +} + +var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", refreshAndRemove) diff --git a/server/ftp/fsmanage.go b/server/ftp/fsmanage.go index b00e779f..045ce76c 100644 --- a/server/ftp/fsmanage.go +++ b/server/ftp/fsmanage.go @@ -67,7 +67,7 @@ func Rename(ctx context.Context, oldPath, newPath string) error { if !user.CanFTPManage() || !user.CanMove() || (srcBase != dstBase && !user.CanRename()) { return errs.PermissionDenied } - if err = fs.Move(ctx, srcPath, dstDir); err != nil { + if _, err = fs.Move(ctx, srcPath, dstDir); err != nil { if srcBase != dstBase { return err } diff --git a/server/handles/fsbatch.go b/server/handles/fsbatch.go index 47e9bf1b..74339871 100644 --- a/server/handles/fsbatch.go +++ b/server/handles/fsbatch.go @@ -125,7 +125,7 @@ func FsRecursiveMove(c *gin.Context) { var count = 0 for i, fileName := range movingFileNames { // move - err := fs.Move(c.Request.Context(), fileName, dstDir, len(movingFileNames) > i+1) + _, err := fs.Move(c.Request.Context(), fileName, dstDir, len(movingFileNames) > i+1) if err != nil { common.ErrorResp(c, err, 500) return diff --git a/server/handles/fsmanage.go b/server/handles/fsmanage.go index 38152494..6313b78c 100644 --- a/server/handles/fsmanage.go +++ b/server/handles/fsmanage.go @@ -101,7 +101,7 @@ func FsMove(c *gin.Context) { // All validation will be done asynchronously in the background var addedTasks []task.TaskExtensionInfo for i, name := range req.Names { - t, err := fs.MoveWithTaskAndValidation(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, !req.Overwrite, len(req.Names) > i+1) + t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1) if t != nil { addedTasks = append(addedTasks, t) } diff --git a/server/webdav/file.go b/server/webdav/file.go index ab341152..debfcfe9 100644 --- a/server/webdav/file.go +++ b/server/webdav/file.go @@ -43,7 +43,7 @@ func moveFiles(ctx context.Context, src, dst string, overwrite bool) (status int if srcDir == dstDir { err = fs.Rename(ctx, src, dstName) } else { - err = fs.Move(ctx, src, dstDir) + _, err = fs.Move(context.WithValue(ctx, conf.NoTaskKey, struct{}{}), src, dstDir) if err != nil { return http.StatusInternalServerError, err } diff --git a/server/webdav/prop.go b/server/webdav/prop.go index 7abc2712..5c888934 100644 --- a/server/webdav/prop.go +++ b/server/webdav/prop.go @@ -10,15 +10,14 @@ import ( "encoding/xml" "errors" "fmt" - "mime" "net/http" - "path" "strconv" "strings" "time" "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/server/common" ) @@ -433,7 +432,7 @@ func findContentType(ctx context.Context, ls LockSystem, name string, fi model.O //} //defer f.Close() // This implementation is based on serveContent's code in the standard net/http package. - ctype := mime.TypeByExtension(path.Ext(name)) + ctype := utils.GetMimeType(name) return ctype, nil //if ctype != "" { // return ctype, nil