feat(task-group): introduce TaskGroupCoordinator for coordinated task execution (#721)

* feat(task): add task hook,batch task
refactor(move): move use CopyTask

* Update internal/task/batch_task/refresh.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>

* fix: upload task allFinish judge

* Update internal/task/batch_task/refresh.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>

* feat: enhance concurrency safety

* 优化代码

* 解压缩

* 修复死锁

* refactor(move): move as task

* 重构,优化

* .

* 优化,修复bug

* .

* 修复bug

* feat: add task retry judge

* 代理Task.SetState函数来判断Task的生命周期

* chore: use OnSucceeded、OnFailed、OnBeforeRetry functions

* 优化

* 优化,去除重复代码

* .

* 优化

* .

* webdav

* Revert "fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to."

This reverts commit 5f03edd683.

---------

Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: j2rong4cn <j2rong@qq.com>
This commit is contained in:
Seven
2025-07-24 16:15:24 +08:00
committed by GitHub
parent a9f02ecdac
commit e93ab76036
26 changed files with 726 additions and 1214 deletions

View File

@ -193,7 +193,8 @@ func (d *Alias) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
}
if len(srcPath) == len(dstPath) {
for i := range srcPath {
err = errors.Join(err, fs.Move(ctx, *srcPath[i], *dstPath[i]))
_, e := fs.Move(ctx, *srcPath[i], *dstPath[i])
err = errors.Join(err, e)
}
return err
} else {

View File

@ -14,7 +14,7 @@ import (
"math/rand"
"net/http"
"net/url"
"path/filepath"
stdpath "path"
"sort"
"strconv"
"strings"
@ -353,7 +353,7 @@ func (d *Doubao) getUploadConfig(upConfig *UploadConfig, dataType string, file m
"ServiceId": d.UploadToken.Alice[dataType].ServiceID,
"NeedFallback": "true",
"FileSize": strconv.FormatInt(file.GetSize(), 10),
"FileExtension": filepath.Ext(file.GetName()),
"FileExtension": stdpath.Ext(file.GetName()),
"s": randomString(),
}
}

View File

@ -22,11 +22,11 @@ func InitTaskManager() {
op.RegisterSettingChangingCallback(func() {
fs.UploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)))
})
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
fs.CopyTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
op.RegisterSettingChangingCallback(func() {
fs.CopyTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)))
})
fs.MoveTaskManager = tache.NewManager[*fs.MoveTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry))
fs.MoveTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry))
op.RegisterSettingChangingCallback(func() {
fs.MoveTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)))
})

View File

@ -6,11 +6,9 @@ import (
"fmt"
"io"
"math/rand"
"mime"
"os"
stdpath "path"
"path/filepath"
"strconv"
"strings"
"time"
@ -21,30 +19,22 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type ArchiveDownloadTask struct {
task.TaskExtension
TaskData
model.ArchiveDecompressArgs
status string
SrcObjPath string
DstDirPath string
srcStorage driver.Driver
dstStorage driver.Driver
SrcStorageMp string
DstStorageMp string
}
func (t *ArchiveDownloadTask) GetName() string {
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcObjPath,
t.InnerPath, t.DstStorageMp, t.DstDirPath, t.Password)
}
func (t *ArchiveDownloadTask) GetStatus() string {
return t.status
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcActualPath,
t.InnerPath, t.DstStorageMp, t.DstActualPath, t.Password)
}
func (t *ArchiveDownloadTask) Run() error {
@ -58,16 +48,21 @@ func (t *ArchiveDownloadTask) Run() error {
if err != nil {
return err
}
uploadTask.groupID = stdpath.Join(uploadTask.DstStorageMp, uploadTask.DstActualPath)
task_group.TransferCoordinator.AddTask(uploadTask.groupID, nil)
ArchiveContentUploadTaskManager.Add(uploadTask)
return nil
}
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
if t.SrcStorage == nil {
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
if err != nil {
return nil, err
}
}
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.srcStorage, t.SrcObjPath, model.LinkArgs{})
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
if err != nil {
return nil, err
}
@ -87,7 +82,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
total += s.GetSize()
}
t.SetTotalBytes(total)
t.status = "getting src object"
t.Status = "getting src object"
for _, s := range ss {
if s.GetFile() == nil {
_, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) {
@ -104,7 +99,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
} else {
decompressUp = t.SetProgress
}
t.status = "walking and decompressing"
t.Status = "walking and decompressing"
dir, err := os.MkdirTemp(conf.Conf.TempDir, "dir-*")
if err != nil {
return nil, err
@ -117,13 +112,14 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
uploadTask := &ArchiveContentUploadTask{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
ApiUrl: t.ApiUrl,
},
ObjName: baseName,
InPlace: !t.PutIntoNewDir,
FilePath: dir,
DstDirPath: t.DstDirPath,
dstStorage: t.dstStorage,
DstStorageMp: t.DstStorageMp,
ObjName: baseName,
InPlace: !t.PutIntoNewDir,
FilePath: dir,
DstActualPath: t.DstActualPath,
dstStorage: t.DstStorage,
DstStorageMp: t.DstStorageMp,
}
return uploadTask, nil
}
@ -132,18 +128,19 @@ var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
type ArchiveContentUploadTask struct {
task.TaskExtension
status string
ObjName string
InPlace bool
FilePath string
DstDirPath string
dstStorage driver.Driver
DstStorageMp string
finalized bool
status string
ObjName string
InPlace bool
FilePath string
DstActualPath string
dstStorage driver.Driver
DstStorageMp string
finalized bool
groupID string
}
func (t *ArchiveContentUploadTask) GetName() string {
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstDirPath)
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstActualPath)
}
func (t *ArchiveContentUploadTask) GetStatus() string {
@ -163,10 +160,31 @@ func (t *ArchiveContentUploadTask) Run() error {
})
}
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *ArchiveContentUploadTask) error) error {
func (t *ArchiveContentUploadTask) OnSucceeded() {
task_group.TransferCoordinator.Done(t.groupID, true)
}
func (t *ArchiveContentUploadTask) OnFailed() {
task_group.TransferCoordinator.Done(t.groupID, false)
}
func (t *ArchiveContentUploadTask) SetRetry(retry int, maxRetry int) {
t.TaskExtension.SetRetry(retry, maxRetry)
if retry == 0 &&
(len(t.groupID) == 0 || // 重启恢复
(t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试
t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath)
task_group.TransferCoordinator.AddTask(t.groupID, nil)
}
}
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error {
var err error
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
if err != nil {
return err
}
}
info, err := os.Stat(t.FilePath)
if err != nil {
@ -174,10 +192,10 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
}
if info.IsDir() {
t.status = "src object is dir, listing objs"
nextDstPath := t.DstDirPath
nextDstActualPath := t.DstActualPath
if !t.InPlace {
nextDstPath = stdpath.Join(nextDstPath, t.ObjName)
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstPath)
nextDstActualPath = stdpath.Join(nextDstActualPath, t.ObjName)
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstActualPath)
if err != nil {
return err
}
@ -186,6 +204,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
if err != nil {
return err
}
if !t.InPlace && len(t.groupID) > 0 {
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(nextDstActualPath))
}
var es error
for _, entry := range entries {
var nextFilePath string
@ -198,16 +219,21 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
es = stderrors.Join(es, err)
continue
}
if len(t.groupID) > 0 {
task_group.TransferCoordinator.AddTask(t.groupID, nil)
}
err = f(&ArchiveContentUploadTask{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
ApiUrl: t.ApiUrl,
},
ObjName: entry.Name(),
InPlace: false,
FilePath: nextFilePath,
DstDirPath: nextDstPath,
dstStorage: t.dstStorage,
DstStorageMp: t.DstStorageMp,
ObjName: entry.Name(),
InPlace: false,
FilePath: nextFilePath,
DstActualPath: nextDstActualPath,
dstStorage: t.dstStorage,
DstStorageMp: t.DstStorageMp,
groupID: t.groupID,
})
if err != nil {
es = stderrors.Join(es, err)
@ -228,13 +254,13 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
Size: info.Size(),
Modified: time.Now(),
},
Mimetype: mime.TypeByExtension(filepath.Ext(t.ObjName)),
Mimetype: utils.GetMimeType(stdpath.Ext(t.ObjName)),
WebPutAsTask: true,
Reader: file,
}
fs.Closers.Add(file)
t.status = "uploading"
err = op.Put(t.Ctx(), t.dstStorage, t.DstDirPath, fs, t.SetProgress, true)
err = op.Put(t.Ctx(), t.dstStorage, t.DstActualPath, fs, t.SetProgress, true)
if err != nil {
return err
}
@ -271,8 +297,9 @@ func moveToTempPath(path, prefix string) (string, error) {
func genTempFileName(prefix string) (string, error) {
retry := 0
t := time.Now().UnixMilli()
for retry < 10000 {
newPath := stdpath.Join(conf.Conf.TempDir, prefix+strconv.FormatUint(uint64(rand.Uint32()), 10))
newPath := filepath.Join(conf.Conf.TempDir, prefix+fmt.Sprintf("%x-%x", t, rand.Uint32()))
if _, err := os.Stat(newPath); err != nil {
if os.IsNotExist(err) {
return newPath, nil
@ -354,16 +381,19 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
}
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
tsk := &ArchiveDownloadTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
TaskData: TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcStorage: srcStorage,
DstStorage: dstStorage,
SrcActualPath: srcObjActualPath,
DstActualPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
ArchiveDecompressArgs: args,
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
if ctx.Value(conf.NoTaskKey) != nil {
uploadTask, err := tsk.RunWithoutPushUploadTask()

View File

@ -1,206 +0,0 @@
package fs
import (
"context"
"fmt"
stdpath "path"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
)
type CopyTask struct {
task.TaskExtension
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
}
func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}
func (t *CopyTask) GetStatus() string {
return t.Status
}
func (t *CopyTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
// Use the task object's memory address as a unique identifier
taskID := fmt.Sprintf("%p", t)
// Register task to batch tracker
copyBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath)
// Execute copy operation
err = copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
// Mark task completed and automatically refresh cache if needed
copyBatchTracker.MarkTaskCompletedWithRefresh(taskID)
return err
}
var CopyTaskManager *tache.Manager[*CopyTask]
// Batch tracker for copy operations
var copyBatchTracker = NewBatchTracker("copy")
// Copy if in the same storage, call move method
// if not, add copy task
func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get src storage")
}
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get dst storage")
}
// copy if in the same storage, just call driver.Copy
if srcStorage.GetStorage() == dstStorage.GetStorage() {
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
if err == nil {
// Refresh target directory cache after successful same-storage copy
op.ClearCache(dstStorage, dstDirActualPath)
}
return nil, err
}
}
if ctx.Value(conf.NoTaskKey) != nil {
srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
if err != nil {
return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
}
if !srcObj.IsDir() {
// copy file directly
link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{})
if err != nil {
return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath)
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(&stream.FileStream{
Obj: srcObj,
Ctx: ctx,
}, link)
if err != nil {
_ = link.Close()
return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
}
err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false)
if err == nil {
// Refresh target directory cache after successful direct file copy
op.ClearCache(dstStorage, dstDirActualPath)
}
return nil, err
}
}
// not in the same storage
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
t := &CopyTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
CopyTaskManager.Add(t)
return t, nil
}
func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath)
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcObjPath := stdpath.Join(srcObjPath, obj.GetName())
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
CopyTaskManager.Add(&CopyTask{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
ApiUrl: t.ApiUrl,
},
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjPath,
DstDirPath: dstObjPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
})
}
t.Status = "src object is dir, added all copy tasks of objs"
return nil
}
return copyFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath)
}
func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error {
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath)
}
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath)
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(&stream.FileStream{
Obj: srcFile,
Ctx: tsk.Ctx(),
}, link)
if err != nil {
_ = link.Close()
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath)
}
tsk.SetTotalBytes(ss.GetSize())
return op.Put(tsk.Ctx(), dstStorage, dstDirPath, ss, tsk.SetProgress, true)
}

252
internal/fs/copy_move.go Normal file
View File

@ -0,0 +1,252 @@
package fs
import (
"context"
"fmt"
stdpath "path"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
)
type taskType uint8
func (t taskType) String() string {
if t == 0 {
return "copy"
} else {
return "move"
}
}
const (
copy taskType = iota
move
)
type FileTransferTask struct {
TaskData
TaskType taskType
groupID string
}
func (t *FileTransferTask) GetName() string {
return fmt.Sprintf("%s [%s](%s) to [%s](%s)", t.TaskType, t.SrcStorageMp, t.SrcActualPath, t.DstStorageMp, t.DstActualPath)
}
func (t *FileTransferTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }()
var err error
if t.SrcStorage == nil {
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.DstStorage == nil {
t.DstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
return putBetween2Storages(t, t.SrcStorage, t.DstStorage, t.SrcActualPath, t.DstActualPath)
}
func (t *FileTransferTask) OnSucceeded() {
task_group.TransferCoordinator.Done(t.groupID, true)
}
func (t *FileTransferTask) OnFailed() {
task_group.TransferCoordinator.Done(t.groupID, false)
}
func (t *FileTransferTask) SetRetry(retry int, maxRetry int) {
t.TaskExtension.SetRetry(retry, maxRetry)
if retry == 0 &&
(len(t.groupID) == 0 || // 重启恢复
(t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试
t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath)
var payload any
if t.TaskType == move {
payload = task_group.SrcPathToRemove(stdpath.Join(t.SrcStorageMp, t.SrcActualPath))
}
task_group.TransferCoordinator.AddTask(t.groupID, payload)
}
}
func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get src storage")
}
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get dst storage")
}
if srcStorage.GetStorage() == dstStorage.GetStorage() {
if taskType == copy {
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
return nil, err
}
} else {
err = op.Move(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
return nil, err
}
}
} else if ctx.Value(conf.NoTaskKey) != nil {
return nil, fmt.Errorf("can't %s files between two storages, please use the front-end ", taskType)
}
// if ctx.Value(conf.NoTaskKey) != nil { // webdav
// srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
// if err != nil {
// return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
// }
// if !srcObj.IsDir() {
// // copy file directly
// link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{})
// if err != nil {
// return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath)
// }
// // any link provided is seekable
// ss, err := stream.NewSeekableStream(&stream.FileStream{
// Obj: srcObj,
// Ctx: ctx,
// }, link)
// if err != nil {
// _ = link.Close()
// return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
// }
// if taskType == move {
// defer func() {
// task_group.TransferCoordinator.Done(dstDirPath, err == nil)
// }()
// task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
// }
// err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, taskType == move)
// return nil, err
// } else {
// return nil, fmt.Errorf("can't %s dir two storages, please use the front-end ", taskType)
// }
// }
// not in the same storage
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
t := &FileTransferTask{
TaskData: TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcStorage: srcStorage,
DstStorage: dstStorage,
SrcActualPath: srcObjActualPath,
DstActualPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
TaskType: taskType,
groupID: dstDirPath,
}
if taskType == copy {
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
CopyTaskManager.Add(t)
} else {
task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
MoveTaskManager.Add(t)
}
return t, nil
}
func putBetween2Storages(t *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, srcActualPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, srcActualPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcActualPath)
}
dstActualPath := stdpath.Join(dstDirActualPath, srcObj.GetName())
if t.TaskType == copy {
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
task := &FileTransferTask{
TaskType: t.TaskType,
TaskData: TaskData{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
ApiUrl: t.ApiUrl,
},
SrcStorage: srcStorage,
DstStorage: dstStorage,
SrcActualPath: stdpath.Join(srcActualPath, obj.GetName()),
DstActualPath: dstActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
groupID: t.groupID,
}
task_group.TransferCoordinator.AddTask(t.groupID, nil)
if t.TaskType == copy {
CopyTaskManager.Add(task)
} else {
MoveTaskManager.Add(task)
}
}
t.Status = fmt.Sprintf("src object is dir, added all %s tasks of objs", t.TaskType)
return nil
}
return putFileBetween2Storages(t, srcStorage, dstStorage, srcActualPath, dstDirActualPath)
}
func putFileBetween2Storages(tsk *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error {
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcActualPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath)
}
tsk.SetTotalBytes(srcFile.GetSize())
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcActualPath, model.LinkArgs{})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcActualPath)
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(&stream.FileStream{
Obj: srcFile,
Ctx: tsk.Ctx(),
}, link)
if err != nil {
_ = link.Close()
return errors.WithMessagef(err, "failed get [%s] stream", srcActualPath)
}
tsk.SetTotalBytes(ss.GetSize())
return op.Put(tsk.Ctx(), dstStorage, dstDirActualPath, ss, tsk.SetProgress, true)
}
var (
CopyTaskManager *tache.Manager[*FileTransferTask]
MoveTaskManager *tache.Manager[*FileTransferTask]
)

View File

@ -66,32 +66,16 @@ func MakeDir(ctx context.Context, path string, lazyCache ...bool) error {
return err
}
func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) error {
err := move(ctx, srcPath, dstDirPath, lazyCache...)
func Move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
req, err := transfer(ctx, move, srcPath, dstDirPath, lazyCache...)
if err != nil {
log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err)
}
return err
}
func MoveWithTask(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
res, err := _move(ctx, srcPath, dstDirPath, lazyCache...)
if err != nil {
log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err)
}
return res, err
}
func MoveWithTaskAndValidation(ctx context.Context, srcPath, dstDirPath string, validateExistence bool, lazyCache ...bool) (task.TaskExtensionInfo, error) {
res, err := _moveWithValidation(ctx, srcPath, dstDirPath, validateExistence, lazyCache...)
if err != nil {
log.Errorf("failed move %s to %s: %+v", srcPath, dstDirPath, err)
}
return res, err
return req, err
}
func Copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
res, err := _copy(ctx, srcObjPath, dstDirPath, lazyCache...)
res, err := transfer(ctx, copy, srcObjPath, dstDirPath, lazyCache...)
if err != nil {
log.Errorf("failed copy %s to %s: %+v", srcObjPath, dstDirPath, err)
}

View File

@ -1,638 +0,0 @@
package fs
import (
"context"
"fmt"
stdpath "path"
"sync"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
)
type MoveTask struct {
task.TaskExtension
Status string `json:"-"`
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
IsRootTask bool `json:"is_root_task"`
RootTaskID string `json:"root_task_id"`
TotalFiles int `json:"total_files"`
CompletedFiles int `json:"completed_files"`
Phase string `json:"phase"` // "copying", "verifying", "deleting", "completed"
ValidateExistence bool `json:"validate_existence"`
mu sync.RWMutex `json:"-"`
}
type MoveProgress struct {
TaskID string `json:"task_id"`
Phase string `json:"phase"`
TotalFiles int `json:"total_files"`
CompletedFiles int `json:"completed_files"`
CurrentFile string `json:"current_file"`
Status string `json:"status"`
Progress int `json:"progress"`
}
var moveProgressMap = sync.Map{}
func (t *MoveTask) GetName() string {
return fmt.Sprintf("move [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}
func (t *MoveTask) GetStatus() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.Status
}
func (t *MoveTask) GetProgress() float64 {
t.mu.RLock()
defer t.mu.RUnlock()
if t.TotalFiles == 0 {
return 0
}
switch t.Phase {
case "copying":
return float64(t.CompletedFiles*60) / float64(t.TotalFiles)
case "verifying":
return 60 + float64(t.CompletedFiles*20)/float64(t.TotalFiles)
case "deleting":
return 80 + float64(t.CompletedFiles*20)/float64(t.TotalFiles)
case "completed":
return 100
default:
return 0
}
}
func (t *MoveTask) GetMoveProgress() *MoveProgress {
t.mu.RLock()
defer t.mu.RUnlock()
progress := int(t.GetProgress())
return &MoveProgress{
TaskID: t.GetID(),
Phase: t.Phase,
TotalFiles: t.TotalFiles,
CompletedFiles: t.CompletedFiles,
CurrentFile: t.SrcObjPath,
Status: t.Status,
Progress: progress,
}
}
func (t *MoveTask) updateProgress() {
if t.IsRootTask {
progress := t.GetMoveProgress()
moveProgressMap.Store(t.GetID(), progress)
}
}
func (t *MoveTask) Run() error {
if err := t.ReinitCtx(); err != nil {
return err
}
t.ClearEndTime()
t.SetStartTime(time.Now())
defer func() {
t.SetEndTime(time.Now())
if t.IsRootTask {
moveProgressMap.Delete(t.GetID())
}
}()
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
// Use the task object's memory address as a unique identifier
taskID := fmt.Sprintf("%p", t)
// Register task to batch tracker
moveBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath)
// Phase 1: Async validation (all validation happens in background)
t.mu.Lock()
t.Status = "validating source and destination"
t.mu.Unlock()
// Check if source exists
srcObj, err := op.Get(t.Ctx(), t.srcStorage, t.SrcObjPath)
if err != nil {
// Clean up tracker records if task failed
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
return errors.WithMessagef(err, "source file [%s] not found", stdpath.Base(t.SrcObjPath))
}
// Check if destination already exists (if validation is required)
if t.ValidateExistence {
dstFilePath := stdpath.Join(t.DstDirPath, srcObj.GetName())
if res, _ := op.Get(t.Ctx(), t.dstStorage, dstFilePath); res != nil {
// Clean up tracker records if task failed
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
return errors.Errorf("destination file [%s] already exists", srcObj.GetName())
}
}
// Phase 2: Execute move operation with proper sequencing
// Determine if we should use batch optimization for directories
if srcObj.IsDir() {
t.mu.Lock()
t.IsRootTask = true
t.RootTaskID = t.GetID()
t.mu.Unlock()
err = t.runRootMoveTask()
} else {
// Use safe move logic for files
err = t.safeMoveOperation(srcObj)
}
// Mark task completed and automatically refresh cache if needed
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
return err
}
func (t *MoveTask) runRootMoveTask() error {
// First check if source is actually a directory
// If not, fall back to regular move logic
srcObj, err := op.Get(t.Ctx(), t.srcStorage, t.SrcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] object", t.SrcObjPath)
}
if !srcObj.IsDir() {
// Source is not a directory, use regular move logic
t.mu.Lock()
t.IsRootTask = false
t.mu.Unlock()
return t.safeMoveOperation(srcObj)
}
// Phase 1: Count total files and create directory structure
t.mu.Lock()
t.Phase = "preparing"
t.Status = "counting files and preparing directory structure"
t.mu.Unlock()
t.updateProgress()
totalFiles, err := t.countFilesAndCreateDirs(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
if err != nil {
return errors.WithMessage(err, "failed to prepare directory structure")
}
t.mu.Lock()
t.TotalFiles = totalFiles
t.Phase = "copying"
t.Status = "copying files"
t.mu.Unlock()
t.updateProgress()
// Phase 2: Copy all files
err = t.copyAllFiles(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
if err != nil {
return errors.WithMessage(err, "failed to copy files")
}
// Phase 3: Verify directory structure
t.mu.Lock()
t.Phase = "verifying"
t.Status = "verifying copied files"
t.CompletedFiles = 0
t.mu.Unlock()
t.updateProgress()
err = t.verifyDirectoryStructure(t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
if err != nil {
return errors.WithMessage(err, "verification failed")
}
// Phase 4: Delete source files and directories
t.mu.Lock()
t.Phase = "deleting"
t.Status = "deleting source files"
t.CompletedFiles = 0
t.mu.Unlock()
t.updateProgress()
err = t.deleteSourceRecursively(t.srcStorage, t.SrcObjPath)
if err != nil {
return errors.WithMessage(err, "failed to delete source files")
}
t.mu.Lock()
t.Phase = "completed"
t.Status = "completed"
t.mu.Unlock()
t.updateProgress()
return nil
}
var MoveTaskManager *tache.Manager[*MoveTask]
// Batch tracker for move operations
var moveBatchTracker = NewBatchTracker("move")
// GetMoveProgress returns the progress of a move task by task ID
func GetMoveProgress(taskID string) (*MoveProgress, bool) {
if progress, ok := moveProgressMap.Load(taskID); ok {
return progress.(*MoveProgress), true
}
return nil, false
}
// GetMoveTaskProgress returns the progress of a specific move task
func GetMoveTaskProgress(task *MoveTask) *MoveProgress {
return task.GetMoveProgress()
}
// countFilesAndCreateDirs recursively counts files and creates directory structure
func (t *MoveTask) countFilesAndCreateDirs(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) (int, error) {
srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath)
if err != nil {
return 0, errors.WithMessagef(err, "failed get src [%s] object", srcPath)
}
if !srcObj.IsDir() {
return 1, nil
}
// Create destination directory
dstObjPath := stdpath.Join(dstPath, srcObj.GetName())
err = op.MakeDir(t.Ctx(), dstStorage, dstObjPath)
if err != nil {
if errors.Is(err, errs.UploadNotSupported) {
return 0, errors.WithMessagef(err, "destination storage [%s] does not support creating directories", dstStorage.GetStorage().MountPath)
}
return 0, errors.WithMessagef(err, "failed to create destination directory [%s] in storage [%s]", dstObjPath, dstStorage.GetStorage().MountPath)
}
// List and count files recursively
objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{})
if err != nil {
return 0, errors.WithMessagef(err, "failed list src [%s] objs", srcPath)
}
totalFiles := 0
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return 0, nil
}
srcSubPath := stdpath.Join(srcPath, obj.GetName())
subCount, err := t.countFilesAndCreateDirs(srcStorage, dstStorage, srcSubPath, dstObjPath)
if err != nil {
return 0, err
}
totalFiles += subCount
}
return totalFiles, nil
}
// copyAllFiles recursively copies all files
func (t *MoveTask) copyAllFiles(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) error {
srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] object", srcPath)
}
if !srcObj.IsDir() {
// Copy single file
err := t.copyFile(srcStorage, dstStorage, srcPath, dstPath)
if err != nil {
return err
}
t.mu.Lock()
t.CompletedFiles++
t.mu.Unlock()
t.updateProgress()
return nil
}
// Copy directory contents
objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcPath)
}
dstObjPath := stdpath.Join(dstPath, srcObj.GetName())
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcSubPath := stdpath.Join(srcPath, obj.GetName())
err := t.copyAllFiles(srcStorage, dstStorage, srcSubPath, dstObjPath)
if err != nil {
return err
}
}
return nil
}
// copyFile copies a single file between storages
func (t *MoveTask) copyFile(srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error {
srcFile, err := op.Get(t.Ctx(), srcStorage, srcFilePath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath)
}
link, _, err := op.Link(t.Ctx(), srcStorage, srcFilePath, model.LinkArgs{})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath)
}
ss, err := stream.NewSeekableStream(&stream.FileStream{
Obj: srcFile,
Ctx: t.Ctx(),
}, link)
if err != nil {
_ = link.Close()
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath)
}
return op.Put(t.Ctx(), dstStorage, dstDirPath, ss, nil, true)
}
// verifyDirectoryStructure compares source and destination directory structures
func (t *MoveTask) verifyDirectoryStructure(srcStorage, dstStorage driver.Driver, srcPath, dstPath string) error {
srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] object", srcPath)
}
if !srcObj.IsDir() {
// Verify single file
dstFilePath := stdpath.Join(dstPath, srcObj.GetName())
_, err := op.Get(t.Ctx(), dstStorage, dstFilePath)
if err != nil {
return errors.WithMessagef(err, "verification failed: destination file [%s] not found", dstFilePath)
}
t.mu.Lock()
t.CompletedFiles++
t.mu.Unlock()
t.updateProgress()
return nil
}
// Verify directory
dstObjPath := stdpath.Join(dstPath, srcObj.GetName())
_, err = op.Get(t.Ctx(), dstStorage, dstObjPath)
if err != nil {
return errors.WithMessagef(err, "verification failed: destination directory [%s] not found", dstObjPath)
}
// Verify directory contents
srcObjs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs for verification", srcPath)
}
for _, obj := range srcObjs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcSubPath := stdpath.Join(srcPath, obj.GetName())
err := t.verifyDirectoryStructure(srcStorage, dstStorage, srcSubPath, dstObjPath)
if err != nil {
return err
}
}
return nil
}
// deleteSourceRecursively deletes source files and directories recursively
func (t *MoveTask) deleteSourceRecursively(srcStorage driver.Driver, srcPath string) error {
srcObj, err := op.Get(t.Ctx(), srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] object for deletion", srcPath)
}
if !srcObj.IsDir() {
// Delete single file
err := op.Remove(t.Ctx(), srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed to delete src [%s] file", srcPath)
}
t.mu.Lock()
t.CompletedFiles++
t.mu.Unlock()
t.updateProgress()
return nil
}
// Delete directory contents first
objs, err := op.List(t.Ctx(), srcStorage, srcPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs for deletion", srcPath)
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcSubPath := stdpath.Join(srcPath, obj.GetName())
err := t.deleteSourceRecursively(srcStorage, srcSubPath)
if err != nil {
return err
}
}
// Delete the directory itself
err = op.Remove(t.Ctx(), srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed to delete src [%s] directory", srcPath)
}
return nil
}
func moveBetween2Storages(t *MoveTask, srcStorage, dstStorage driver.Driver, srcObjPath, dstDirPath string) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, srcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, srcObjPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath)
}
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
t.Status = "creating destination directory"
err = op.MakeDir(t.Ctx(), dstStorage, dstObjPath)
if err != nil {
// Check if this is an upload-related error and provide a clearer message
if errors.Is(err, errs.UploadNotSupported) {
return errors.WithMessagef(err, "destination storage [%s] does not support creating directories", dstStorage.GetStorage().MountPath)
}
return errors.WithMessagef(err, "failed to create destination directory [%s] in storage [%s]", dstObjPath, dstStorage.GetStorage().MountPath)
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcSubObjPath := stdpath.Join(srcObjPath, obj.GetName())
subTask := &MoveTask{
TaskExtension: task.TaskExtension{
Creator: t.GetCreator(),
ApiUrl: t.ApiUrl,
},
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcSubObjPath,
DstDirPath: dstObjPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
MoveTaskManager.Add(subTask)
}
t.Status = "cleaning up source directory"
err = op.Remove(t.Ctx(), srcStorage, srcObjPath)
if err != nil {
t.Status = "completed (source directory cleanup pending)"
} else {
t.Status = "completed"
}
return nil
} else {
return moveFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath)
}
}
func moveFileBetween2Storages(tsk *MoveTask, srcStorage, dstStorage driver.Driver, srcFilePath, dstDirPath string) error {
tsk.Status = "copying file to destination"
copyTask := &CopyTask{
TaskExtension: task.TaskExtension{
Creator: tsk.GetCreator(),
ApiUrl: tsk.ApiUrl,
},
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcFilePath,
DstDirPath: dstDirPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
copyTask.SetCtx(tsk.Ctx())
err := copyBetween2Storages(copyTask, srcStorage, dstStorage, srcFilePath, dstDirPath)
if err != nil {
// Check if this is an upload-related error and provide a clearer message
if errors.Is(err, errs.UploadNotSupported) {
return errors.WithMessagef(err, "destination storage [%s] does not support file uploads", dstStorage.GetStorage().MountPath)
}
return errors.WithMessagef(err, "failed to copy [%s] to destination storage [%s]", srcFilePath, dstStorage.GetStorage().MountPath)
}
tsk.SetProgress(50)
tsk.Status = "deleting source file"
err = op.Remove(tsk.Ctx(), srcStorage, srcFilePath)
if err != nil {
return errors.WithMessagef(err, "failed to delete src [%s] file from storage [%s] after successful copy", srcFilePath, srcStorage.GetStorage().MountPath)
}
tsk.SetProgress(100)
tsk.Status = "completed"
return nil
}
// safeMoveOperation ensures copy-then-delete sequence for safe move operations
func (t *MoveTask) safeMoveOperation(srcObj model.Obj) error {
if srcObj.IsDir() {
// For directories, use the original logic but ensure proper sequencing
return moveBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
} else {
// For files, use the safe file move logic
return moveFileBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
}
}
func _move(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
return _moveWithValidation(ctx, srcObjPath, dstDirPath, false, lazyCache...)
}
func _moveWithValidation(ctx context.Context, srcObjPath, dstDirPath string, validateExistence bool, lazyCache ...bool) (task.TaskExtensionInfo, error) {
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get src storage")
}
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get dst storage")
}
// Try native move first if in the same storage
if srcStorage.GetStorage() == dstStorage.GetStorage() {
err = op.Move(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
if err == nil {
// For same-storage moves, refresh cache immediately since no batch tracking is used
op.ClearCache(dstStorage, dstDirActualPath)
}
return nil, err
}
}
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
// Create task immediately without any synchronous checks to avoid blocking frontend
// All validation and type checking will be done asynchronously in the Run method
t := &MoveTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
ValidateExistence: validateExistence,
Phase: "initializing",
}
MoveTaskManager.Add(t)
return t, nil
}

View File

@ -3,9 +3,10 @@ package fs
import (
"context"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/pkg/errors"
)
@ -17,21 +18,6 @@ func makeDir(ctx context.Context, path string, lazyCache ...bool) error {
return op.MakeDir(ctx, storage, actualPath, lazyCache...)
}
func move(ctx context.Context, srcPath, dstDirPath string, lazyCache ...bool) error {
srcStorage, srcActualPath, err := op.GetStorageAndActualPath(srcPath)
if err != nil {
return errors.WithMessage(err, "failed get src storage")
}
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
if err != nil {
return errors.WithMessage(err, "failed get dst storage")
}
if srcStorage.GetStorage() != dstStorage.GetStorage() {
return errors.WithStack(errs.MoveBetweenTwoStorages)
}
return op.Move(ctx, srcStorage, srcActualPath, dstDirActualPath, lazyCache...)
}
func rename(ctx context.Context, srcPath, dstName string, lazyCache ...bool) error {
storage, srcActualPath, err := op.GetStorageAndActualPath(srcPath)
if err != nil {
@ -56,3 +42,18 @@ func other(ctx context.Context, args model.FsOtherArgs) (interface{}, error) {
args.Path = actualPath
return op.Other(ctx, storage, args)
}
type TaskData struct {
task.TaskExtension
Status string `json:"-"` //don't save status to save space
SrcActualPath string `json:"src_path"`
DstActualPath string `json:"dst_path"`
SrcStorage driver.Driver `json:"-"`
DstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
}
func (t *TaskData) GetStatus() string {
return t.Status
}

View File

@ -3,14 +3,18 @@ package fs
import (
"context"
"fmt"
stdpath "path"
"time"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
)
@ -37,6 +41,22 @@ func (t *UploadTask) Run() error {
return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true)
}
func (t *UploadTask) OnSucceeded() {
task_group.TransferCoordinator.Done(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), true)
}
func (t *UploadTask) OnFailed() {
task_group.TransferCoordinator.Done(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), false)
}
func (t *UploadTask) SetRetry(retry int, maxRetry int) {
t.TaskExtension.SetRetry(retry, maxRetry)
if retry == 0 &&
(t.GetErr() == nil && t.GetState() != tache.StatePending) { // 手动重试
task_group.TransferCoordinator.AddTask(stdpath.Join(t.storage.GetStorage().MountPath, t.dstDirActualPath), nil)
}
}
var UploadTaskManager *tache.Manager[*UploadTask]
// putAsTask add as a put task and return immediately
@ -60,12 +80,14 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer)
t := &UploadTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
storage: storage,
dstDirActualPath: dstDirActualPath,
file: file,
}
t.SetTotalBytes(file.GetSize())
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
UploadTaskManager.Add(t)
return t, nil
}

View File

@ -1,153 +0,0 @@
package fs
import (
"sync"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/op"
)
// BatchTracker manages batch operations for cache refresh optimization
// It aggregates multiple file operations by target directory and only refreshes
// the cache once when all operations in a directory are completed
type BatchTracker struct {
mu sync.Mutex
dirTasks map[string]*dirTaskInfo // dstStoragePath+dstDirPath -> dirTaskInfo
pendingTasks map[string]string // taskID -> dstStoragePath+dstDirPath
lastCleanup time.Time // last cleanup time
name string // tracker name for debugging
}
type dirTaskInfo struct {
dstStorage driver.Driver
dstDirPath string
pendingTasks map[string]bool // taskID -> true
lastActivity time.Time // last activity time (used for detecting abnormal situations)
}
// NewBatchTracker creates a new batch tracker instance
func NewBatchTracker(name string) *BatchTracker {
return &BatchTracker{
dirTasks: make(map[string]*dirTaskInfo),
pendingTasks: make(map[string]string),
lastCleanup: time.Now(),
name: name,
}
}
// getDirKey generates unique key for target directory
func (bt *BatchTracker) getDirKey(dstStorage driver.Driver, dstDirPath string) string {
return dstStorage.GetStorage().MountPath + ":" + dstDirPath
}
// RegisterTask registers a task to target directory for batch tracking
func (bt *BatchTracker) RegisterTask(taskID string, dstStorage driver.Driver, dstDirPath string) {
bt.mu.Lock()
defer bt.mu.Unlock()
// Periodically clean up expired entries
bt.cleanupIfNeeded()
dirKey := bt.getDirKey(dstStorage, dstDirPath)
// Record task to directory mapping
bt.pendingTasks[taskID] = dirKey
// Initialize or update directory task information
if info, exists := bt.dirTasks[dirKey]; exists {
info.pendingTasks[taskID] = true
info.lastActivity = time.Now()
} else {
bt.dirTasks[dirKey] = &dirTaskInfo{
dstStorage: dstStorage,
dstDirPath: dstDirPath,
pendingTasks: map[string]bool{taskID: true},
lastActivity: time.Now(),
}
}
}
// MarkTaskCompleted marks a task as completed and returns whether cache refresh is needed
// Returns (shouldRefresh, dstStorage, dstDirPath)
func (bt *BatchTracker) MarkTaskCompleted(taskID string) (bool, driver.Driver, string) {
bt.mu.Lock()
defer bt.mu.Unlock()
dirKey, exists := bt.pendingTasks[taskID]
if !exists {
return false, nil, ""
}
// Remove from pending tasks
delete(bt.pendingTasks, taskID)
info, exists := bt.dirTasks[dirKey]
if !exists {
return false, nil, ""
}
// Remove from directory tasks
delete(info.pendingTasks, taskID)
// If no pending tasks left in this directory, trigger cache refresh
if len(info.pendingTasks) == 0 {
dstStorage := info.dstStorage
dstDirPath := info.dstDirPath
delete(bt.dirTasks, dirKey) // Delete directly, no need to update lastActivity
return true, dstStorage, dstDirPath
}
// Only update lastActivity when there are other tasks (indicating the directory still has active tasks)
info.lastActivity = time.Now()
return false, nil, ""
}
// MarkTaskCompletedWithRefresh marks a task as completed and automatically refreshes cache if needed
func (bt *BatchTracker) MarkTaskCompletedWithRefresh(taskID string) {
shouldRefresh, dstStorage, dstDirPath := bt.MarkTaskCompleted(taskID)
if shouldRefresh {
op.ClearCache(dstStorage, dstDirPath)
}
}
// cleanupIfNeeded checks if cleanup is needed and executes cleanup if necessary
func (bt *BatchTracker) cleanupIfNeeded() {
now := time.Now()
// Clean up every 10 minutes
if now.Sub(bt.lastCleanup) > 10*time.Minute {
bt.cleanupStaleEntries()
bt.lastCleanup = now
}
}
// cleanupStaleEntries cleans up timed-out tasks to prevent memory leaks
// Mainly used to clean up residual entries caused by abnormal situations (such as task crashes, process restarts, etc.)
func (bt *BatchTracker) cleanupStaleEntries() {
now := time.Now()
for dirKey, info := range bt.dirTasks {
// If no activity for more than 1 hour, it may indicate an abnormal situation, clean up this entry
// Under normal circumstances, MarkTaskCompleted will be called when the task is completed and the entire entry will be deleted
if now.Sub(info.lastActivity) > time.Hour {
// Clean up related pending tasks
for taskID := range info.pendingTasks {
delete(bt.pendingTasks, taskID)
}
delete(bt.dirTasks, dirKey)
}
}
}
// GetPendingTaskCount returns the number of pending tasks for debugging
func (bt *BatchTracker) GetPendingTaskCount() int {
bt.mu.Lock()
defer bt.mu.Unlock()
return len(bt.pendingTasks)
}
// GetDirTaskCount returns the number of directories being tracked for debugging
func (bt *BatchTracker) GetDirTaskCount() int {
bt.mu.Lock()
defer bt.mu.Unlock()
return len(bt.dirTasks)
}

View File

@ -6,10 +6,8 @@ import (
"crypto/tls"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
@ -74,11 +72,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
contentTypes, haveType := w.Header()["Content-Type"]
var contentType string
if !haveType {
contentType = mime.TypeByExtension(filepath.Ext(name))
if contentType == "" {
// most modern application can handle the default contentType
contentType = "application/octet-stream"
}
contentType = utils.GetMimeType(name)
w.Header().Set("Content-Type", contentType)
} else if len(contentTypes) > 0 {
contentType = contentTypes[0]

View File

@ -3,6 +3,7 @@ package _115_open
import (
"context"
"fmt"
_115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/setting"

View File

@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"strconv"
"github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"strconv"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"

View File

@ -2,7 +2,9 @@ package tool
import (
"context"
_115_open "github.com/OpenListTeam/OpenList/v4/drivers/115_open"
"github.com/OpenListTeam/OpenList/v4/server/common"
"net/url"
stdpath "path"
@ -126,6 +128,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, erro
t := &DownloadTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
Url: args.URL,
DstDirPath: args.DstDirPath,

View File

@ -6,10 +6,12 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/fs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/setting"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -182,19 +184,24 @@ func (t *DownloadTask) Transfer() error {
return errors.WithMessage(err, "failed get dst storage")
}
taskCreator, _ := t.Ctx().Value(conf.UserKey).(*model.User)
task := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
tsk := &TransferTask{
TaskData: fs.TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: t.ApiUrl,
},
SrcActualPath: t.TempDir,
DstActualPath: dstDirActualPath,
DstStorage: dstStorage,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
SrcObjPath: t.TempDir,
DstDirPath: dstDirActualPath,
DstStorage: dstStorage,
DstStorageMp: dstStorage.GetStorage().MountPath,
groupID: t.DstDirPath,
DeletePolicy: t.DeletePolicy,
Url: t.Url,
}
task.SetTotalBytes(t.GetTotalBytes())
TransferTaskManager.Add(task)
tsk.SetTotalBytes(t.GetTotalBytes())
task_group.TransferCoordinator.AddTask(tsk.groupID, nil)
TransferTaskManager.Add(tsk)
return nil
}
return transferStd(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy)

View File

@ -9,29 +9,25 @@ import (
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/fs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/internal/task"
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
"github.com/OpenListTeam/tache"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type TransferTask struct {
task.TaskExtension
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_obj_path"`
DstDirPath string `json:"dst_dir_path"`
SrcStorage driver.Driver `json:"-"`
DstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
DeletePolicy DeletePolicy `json:"delete_policy"`
Url string `json:"-"`
fs.TaskData
DeletePolicy DeletePolicy `json:"delete_policy"`
Url string `json:"url"`
groupID string `json:"-"`
}
func (t *TransferTask) Run() error {
@ -51,10 +47,10 @@ func (t *TransferTask) Run() error {
if err != nil {
return err
}
name := t.SrcObjPath
name := t.SrcActualPath
mimetype := utils.GetMimeType(name)
s := &stream.FileStream{
Ctx: nil,
Ctx: t.Ctx(),
Obj: &model.Object{
Name: name,
Size: t.GetTotalBytes(),
@ -65,7 +61,7 @@ func (t *TransferTask) Run() error {
Mimetype: mimetype,
Closers: utils.NewClosers(r),
}
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress)
}
return transferStdPath(t)
} else {
@ -75,13 +71,9 @@ func (t *TransferTask) Run() error {
func (t *TransferTask) GetName() string {
if t.DeletePolicy == UploadDownloadStream {
return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcObjPath, t.Url, t.DstStorageMp, t.DstDirPath)
return fmt.Sprintf("upload [%s](%s) to [%s](%s)", t.SrcActualPath, t.Url, t.DstStorageMp, t.DstActualPath)
}
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}
func (t *TransferTask) GetStatus() string {
return t.Status
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcActualPath, t.DstStorageMp, t.DstActualPath)
}
func (t *TransferTask) OnSucceeded() {
@ -92,6 +84,7 @@ func (t *TransferTask) OnSucceeded() {
removeObjTemp(t)
}
}
task_group.TransferCoordinator.Done(t.groupID, true)
}
func (t *TransferTask) OnFailed() {
@ -102,6 +95,17 @@ func (t *TransferTask) OnFailed() {
removeObjTemp(t)
}
}
task_group.TransferCoordinator.Done(t.groupID, false)
}
func (t *TransferTask) SetRetry(retry int, maxRetry int) {
if retry == 0 &&
(len(t.groupID) == 0 || // 重启恢复
(t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试
t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath)
task_group.TransferCoordinator.AddTask(t.groupID, nil)
}
t.TaskExtension.SetRetry(retry, maxRetry)
}
var (
@ -120,15 +124,20 @@ func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy D
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
for _, entry := range entries {
t := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
TaskData: fs.TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcActualPath: stdpath.Join(tempDir, entry.Name()),
DstActualPath: dstDirActualPath,
DstStorage: dstStorage,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
SrcObjPath: stdpath.Join(tempDir, entry.Name()),
DstDirPath: dstDirActualPath,
DstStorage: dstStorage,
DstStorageMp: dstStorage.GetStorage().MountPath,
groupID: dstDirPath,
DeletePolicy: deletePolicy,
}
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
TransferTaskManager.Add(t)
}
return nil
@ -136,31 +145,37 @@ func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy D
func transferStdPath(t *TransferTask) error {
t.Status = "getting src object"
info, err := os.Stat(t.SrcObjPath)
info, err := os.Stat(t.SrcActualPath)
if err != nil {
return err
}
if info.IsDir() {
t.Status = "src object is dir, listing objs"
entries, err := os.ReadDir(t.SrcObjPath)
entries, err := os.ReadDir(t.SrcActualPath)
if err != nil {
return err
}
dstDirActualPath := stdpath.Join(t.DstActualPath, info.Name())
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath))
for _, entry := range entries {
srcRawPath := stdpath.Join(t.SrcObjPath, entry.Name())
dstObjPath := stdpath.Join(t.DstDirPath, info.Name())
t := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: t.Creator,
srcRawPath := stdpath.Join(t.SrcActualPath, entry.Name())
task := &TransferTask{
TaskData: fs.TaskData{
TaskExtension: task.TaskExtension{
Creator: t.Creator,
ApiUrl: t.ApiUrl,
},
SrcActualPath: srcRawPath,
DstActualPath: dstDirActualPath,
DstStorage: t.DstStorage,
SrcStorageMp: t.SrcStorageMp,
DstStorageMp: t.DstStorageMp,
},
SrcObjPath: srcRawPath,
DstDirPath: dstObjPath,
DstStorage: t.DstStorage,
SrcStorageMp: t.SrcStorageMp,
DstStorageMp: t.DstStorageMp,
groupID: t.groupID,
DeletePolicy: t.DeletePolicy,
}
TransferTaskManager.Add(t)
task_group.TransferCoordinator.AddTask(t.groupID, nil)
TransferTaskManager.Add(task)
}
t.Status = "src object is dir, added all transfer tasks of files"
return nil
@ -169,19 +184,19 @@ func transferStdPath(t *TransferTask) error {
}
func transferStdFile(t *TransferTask) error {
rc, err := os.Open(t.SrcObjPath)
rc, err := os.Open(t.SrcActualPath)
if err != nil {
return errors.Wrapf(err, "failed to open file %s", t.SrcObjPath)
return errors.Wrapf(err, "failed to open file %s", t.SrcActualPath)
}
info, err := rc.Stat()
if err != nil {
return errors.Wrapf(err, "failed to get file %s", t.SrcObjPath)
return errors.Wrapf(err, "failed to get file %s", t.SrcActualPath)
}
mimetype := utils.GetMimeType(t.SrcObjPath)
mimetype := utils.GetMimeType(t.SrcActualPath)
s := &stream.FileStream{
Ctx: nil,
Ctx: t.Ctx(),
Obj: &model.Object{
Name: filepath.Base(t.SrcObjPath),
Name: filepath.Base(t.SrcActualPath),
Size: info.Size(),
Modified: info.ModTime(),
IsFolder: false,
@ -191,16 +206,16 @@ func transferStdFile(t *TransferTask) error {
Closers: utils.NewClosers(rc),
}
t.SetTotalBytes(info.Size())
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, s, t.SetProgress)
}
func removeStdTemp(t *TransferTask) {
info, err := os.Stat(t.SrcObjPath)
info, err := os.Stat(t.SrcActualPath)
if err != nil || info.IsDir() {
return
}
if err := os.Remove(t.SrcObjPath); err != nil {
log.Errorf("failed to delete temp file %s, error: %s", t.SrcObjPath, err.Error())
if err := os.Remove(t.SrcActualPath); err != nil {
log.Errorf("failed to delete temp file %s, error: %s", t.SrcActualPath, err.Error())
}
}
@ -220,17 +235,22 @@ func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy D
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User) // taskCreator is nil when convert failed
for _, obj := range objs {
t := &TransferTask{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
TaskData: fs.TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcActualPath: stdpath.Join(srcObjActualPath, obj.GetName()),
DstActualPath: dstDirActualPath,
SrcStorage: srcStorage,
DstStorage: dstStorage,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
},
SrcObjPath: stdpath.Join(srcObjActualPath, obj.GetName()),
DstDirPath: dstDirActualPath,
SrcStorage: srcStorage,
DstStorage: dstStorage,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
groupID: dstDirPath,
DeletePolicy: deletePolicy,
}
task_group.TransferCoordinator.AddTask(dstDirPath, nil)
TransferTaskManager.Add(t)
}
return nil
@ -238,32 +258,38 @@ func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy D
func transferObjPath(t *TransferTask) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.ListArgs{})
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcObjPath)
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
}
dstDirActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstDirActualPath))
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
srcObjPath := stdpath.Join(t.SrcObjPath, obj.GetName())
dstObjPath := stdpath.Join(t.DstDirPath, srcObj.GetName())
srcObjPath := stdpath.Join(t.SrcActualPath, obj.GetName())
task_group.TransferCoordinator.AddTask(t.groupID, nil)
TransferTaskManager.Add(&TransferTask{
TaskExtension: task.TaskExtension{
Creator: t.Creator,
TaskData: fs.TaskData{
TaskExtension: task.TaskExtension{
Creator: t.Creator,
ApiUrl: t.ApiUrl,
},
SrcActualPath: srcObjPath,
DstActualPath: dstDirActualPath,
SrcStorage: t.SrcStorage,
DstStorage: t.DstStorage,
SrcStorageMp: t.SrcStorageMp,
DstStorageMp: t.DstStorageMp,
},
SrcObjPath: srcObjPath,
DstDirPath: dstObjPath,
SrcStorage: t.SrcStorage,
DstStorage: t.DstStorage,
SrcStorageMp: t.SrcStorageMp,
DstStorageMp: t.DstStorageMp,
groupID: t.groupID,
DeletePolicy: t.DeletePolicy,
})
}
@ -274,13 +300,13 @@ func transferObjPath(t *TransferTask) error {
}
func transferObjFile(t *TransferTask) error {
srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
}
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{})
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath)
return errors.WithMessagef(err, "failed get [%s] link", t.SrcActualPath)
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(&stream.FileStream{
@ -289,18 +315,18 @@ func transferObjFile(t *TransferTask) error {
}, link)
if err != nil {
_ = link.Close()
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath)
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcActualPath)
}
t.SetTotalBytes(ss.GetSize())
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, ss, t.SetProgress)
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, ss, t.SetProgress)
}
func removeObjTemp(t *TransferTask) {
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
if err != nil || srcObj.IsDir() {
return
}
if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcObjPath); err != nil {
log.Errorf("failed to delete temp obj %s, error: %s", t.SrcObjPath, err.Error())
if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcActualPath); err != nil {
log.Errorf("failed to delete temp obj %s, error: %s", t.SrcActualPath, err.Error())
}
}

View File

@ -489,18 +489,18 @@ func ArchiveDecompress(ctx context.Context, storage driver.Driver, srcPath, dstD
var newObjs []model.Obj
newObjs, err = s.ArchiveDecompress(ctx, srcObj, dstDir, args)
if err == nil {
if newObjs != nil && len(newObjs) > 0 {
if len(newObjs) > 0 {
for _, newObj := range newObjs {
addCacheObj(storage, dstDirPath, model.WrapObjName(newObj))
}
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
case driver.ArchiveDecompress:
err = s.ArchiveDecompress(ctx, srcObj, dstDir, args)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
default:
return errs.NotImplement

View File

@ -103,6 +103,10 @@ func ClearCache(storage driver.Driver, path string) {
listCache.Del(Key(storage, path))
}
func DeleteCache(storage driver.Driver, path string) {
listCache.Del(Key(storage, path))
}
func Key(storage driver.Driver, path string) string {
return stdpath.Join(storage.GetStorage().MountPath, utils.FixAndCleanPath(path))
}
@ -355,13 +359,13 @@ func MakeDir(ctx context.Context, storage driver.Driver, path string, lazyCache
if newObj != nil {
addCacheObj(storage, parentPath, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, parentPath)
DeleteCache(storage, parentPath)
}
}
case driver.Mkdir:
err = s.MakeDir(ctx, parentDir, dirName)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, parentPath)
DeleteCache(storage, parentPath)
}
default:
return nil, errs.NotImplement
@ -406,7 +410,7 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
if newObj != nil {
addCacheObj(storage, dstDirPath, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
case driver.Move:
@ -414,7 +418,7 @@ func Move(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
if err == nil {
delCacheObj(storage, srcDirPath, srcRawObj)
if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
default:
@ -443,13 +447,13 @@ func Rename(ctx context.Context, storage driver.Driver, srcPath, dstName string,
if newObj != nil {
updateCacheObj(storage, srcDirPath, srcRawObj, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, srcDirPath)
DeleteCache(storage, srcDirPath)
}
}
case driver.Rename:
err = s.Rename(ctx, srcObj, dstName)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, srcDirPath)
DeleteCache(storage, srcDirPath)
}
default:
return errs.NotImplement
@ -481,13 +485,13 @@ func Copy(ctx context.Context, storage driver.Driver, srcPath, dstDirPath string
if newObj != nil {
addCacheObj(storage, dstDirPath, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
case driver.Copy:
err = s.Copy(ctx, srcObj, dstDir)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
default:
return errs.NotImplement
@ -590,13 +594,13 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file mod
if newObj != nil {
addCacheObj(storage, dstDirPath, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
case driver.Put:
err = s.Put(ctx, parentDir, file, up)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
default:
return errs.NotImplement
@ -648,13 +652,13 @@ func PutURL(ctx context.Context, storage driver.Driver, dstDirPath, dstName, url
if newObj != nil {
addCacheObj(storage, dstDirPath, model.WrapObjName(newObj))
} else if !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
}
case driver.PutURL:
err = s.PutURL(ctx, dstDir, dstName, url)
if err == nil && !utils.IsBool(lazyCache...) {
ClearCache(storage, dstDirPath)
DeleteCache(storage, dstDirPath)
}
default:
return errs.NotImplement

View File

@ -0,0 +1,81 @@
package task_group
import (
"sync"
"github.com/sirupsen/logrus"
)
type OnCompletionFunc func(groupID string, payloads []any)
type TaskGroupCoordinator struct {
name string
mu sync.Mutex
groupPayloads map[string][]any
groupStates map[string]groupState
onCompletion OnCompletionFunc
}
type groupState struct {
pending int
hasSuccess bool
}
func NewTaskGroupCoordinator(name string, f OnCompletionFunc) *TaskGroupCoordinator {
return &TaskGroupCoordinator{
name: name,
groupPayloads: map[string][]any{},
groupStates: map[string]groupState{},
onCompletion: f,
}
}
// payload可为nil
func (tgc *TaskGroupCoordinator) AddTask(groupID string, payload any) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
state := tgc.groupStates[groupID]
state.pending++
tgc.groupStates[groupID] = state
logrus.Debugf("AddTask:%s ,count=%+v", groupID, state)
if payload == nil {
return
}
tgc.groupPayloads[groupID] = append(tgc.groupPayloads[groupID], payload)
}
func (tgc *TaskGroupCoordinator) AppendPayload(groupID string, payload any) {
if payload == nil {
return
}
tgc.mu.Lock()
defer tgc.mu.Unlock()
tgc.groupPayloads[groupID] = append(tgc.groupPayloads[groupID], payload)
}
func (tgc *TaskGroupCoordinator) Done(groupID string, success bool) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
state, ok := tgc.groupStates[groupID]
if !ok || state.pending == 0 {
return
}
if success {
state.hasSuccess = true
}
logrus.Debugf("Done:%s ,state=%+v", groupID, state)
if state.pending == 1 {
payloads := tgc.groupPayloads[groupID]
delete(tgc.groupStates, groupID)
delete(tgc.groupPayloads, groupID)
if tgc.onCompletion != nil && state.hasSuccess {
logrus.Debugf("OnCompletion:%s", groupID)
tgc.mu.Unlock()
tgc.onCompletion(groupID, payloads)
tgc.mu.Lock()
}
return
}
state.pending--
tgc.groupStates[groupID] = state
}

View File

@ -0,0 +1,103 @@
package task_group
import (
"context"
"fmt"
"path"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type SrcPathToRemove string
// ActualPath
type DstPathToRefresh string
func refreshAndRemove(dstPath string, payloads []any) {
dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath)
if err != nil {
log.Error(errors.WithMessage(err, "failed get dst storage"))
return
}
_, dstNeedRefresh := dstStorage.(driver.Put)
dstNeedRefresh = dstNeedRefresh && !dstStorage.Config().NoCache
if dstNeedRefresh {
op.DeleteCache(dstStorage, dstActualPath)
}
var ctx context.Context
for _, payload := range payloads {
switch p := payload.(type) {
case DstPathToRefresh:
if dstNeedRefresh {
op.DeleteCache(dstStorage, string(p))
}
case SrcPathToRemove:
if ctx == nil {
ctx = context.Background()
}
srcStorage, srcActualPath, err := op.GetStorageAndActualPath(string(p))
if err != nil {
log.Error(errors.WithMessage(err, "failed get src storage"))
continue
}
err = verifyAndRemove(ctx, srcStorage, dstStorage, srcActualPath, dstActualPath, dstNeedRefresh)
if err != nil {
log.Error(err)
}
}
}
}
func verifyAndRemove(ctx context.Context, srcStorage, dstStorage driver.Driver, srcPath, dstPath string, refresh bool) error {
srcObj, err := op.Get(ctx, srcStorage, srcPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
dstObjPath := path.Join(dstPath, srcObj.GetName())
dstObj, err := op.Get(ctx, dstStorage, dstObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get dst [%s] file", path.Join(dstStorage.GetStorage().MountPath, dstObjPath))
}
if !dstObj.IsDir() {
err = op.Remove(ctx, srcStorage, srcPath)
if err != nil {
return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err)
}
return nil
}
// Verify directory
srcObjs, err := op.List(ctx, srcStorage, srcPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
if refresh {
op.DeleteCache(dstStorage, dstObjPath)
}
hasErr := false
for _, obj := range srcObjs {
srcSubPath := path.Join(srcPath, obj.GetName())
err := verifyAndRemove(ctx, srcStorage, dstStorage, srcSubPath, dstObjPath, refresh)
if err != nil {
log.Error(err)
hasErr = true
}
}
if hasErr {
return errors.Errorf("some subitems of [%s] failed to verify and remove", path.Join(srcStorage.GetStorage().MountPath, srcPath))
}
err = op.Remove(ctx, srcStorage, srcPath)
if err != nil {
return fmt.Errorf("failed remove %s: %+v", path.Join(srcStorage.GetStorage().MountPath, srcPath), err)
}
return nil
}
var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", refreshAndRemove)

View File

@ -67,7 +67,7 @@ func Rename(ctx context.Context, oldPath, newPath string) error {
if !user.CanFTPManage() || !user.CanMove() || (srcBase != dstBase && !user.CanRename()) {
return errs.PermissionDenied
}
if err = fs.Move(ctx, srcPath, dstDir); err != nil {
if _, err = fs.Move(ctx, srcPath, dstDir); err != nil {
if srcBase != dstBase {
return err
}

View File

@ -125,7 +125,7 @@ func FsRecursiveMove(c *gin.Context) {
var count = 0
for i, fileName := range movingFileNames {
// move
err := fs.Move(c.Request.Context(), fileName, dstDir, len(movingFileNames) > i+1)
_, err := fs.Move(c.Request.Context(), fileName, dstDir, len(movingFileNames) > i+1)
if err != nil {
common.ErrorResp(c, err, 500)
return

View File

@ -101,7 +101,7 @@ func FsMove(c *gin.Context) {
// All validation will be done asynchronously in the background
var addedTasks []task.TaskExtensionInfo
for i, name := range req.Names {
t, err := fs.MoveWithTaskAndValidation(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, !req.Overwrite, len(req.Names) > i+1)
t, err := fs.Move(c.Request.Context(), stdpath.Join(srcDir, name), dstDir, len(req.Names) > i+1)
if t != nil {
addedTasks = append(addedTasks, t)
}

View File

@ -43,7 +43,7 @@ func moveFiles(ctx context.Context, src, dst string, overwrite bool) (status int
if srcDir == dstDir {
err = fs.Rename(ctx, src, dstName)
} else {
err = fs.Move(ctx, src, dstDir)
_, err = fs.Move(context.WithValue(ctx, conf.NoTaskKey, struct{}{}), src, dstDir)
if err != nil {
return http.StatusInternalServerError, err
}

View File

@ -10,15 +10,14 @@ import (
"encoding/xml"
"errors"
"fmt"
"mime"
"net/http"
"path"
"strconv"
"strings"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/OpenListTeam/OpenList/v4/server/common"
)
@ -433,7 +432,7 @@ func findContentType(ctx context.Context, ls LockSystem, name string, fi model.O
//}
//defer f.Close()
// This implementation is based on serveContent's code in the standard net/http package.
ctype := mime.TypeByExtension(path.Ext(name))
ctype := utils.GetMimeType(name)
return ctype, nil
//if ctype != "" {
// return ctype, nil