feat(fs): full support webdav cross-driver copy and move (#823)

* fix(fs): restore webdav cross-driver copy and move

* fix bug

* webdav支持复制、移动 文件夹

* 优化

* 。
This commit is contained in:
j2rong4cn
2025-07-26 00:27:46 +08:00
committed by GitHub
parent 0ee31a3f36
commit 4e9c30f49d
4 changed files with 78 additions and 115 deletions

View File

@ -55,13 +55,6 @@ func (t *ArchiveDownloadTask) Run() error {
} }
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) { func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
var err error
if t.SrcStorage == nil {
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
if err != nil {
return nil, err
}
}
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{}) srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
if err != nil { if err != nil {
return nil, err return nil, err
@ -111,7 +104,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
baseName := strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName())) baseName := strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName()))
uploadTask := &ArchiveContentUploadTask{ uploadTask := &ArchiveContentUploadTask{
TaskExtension: task.TaskExtension{ TaskExtension: task.TaskExtension{
Creator: t.GetCreator(), Creator: t.Creator,
ApiUrl: t.ApiUrl, ApiUrl: t.ApiUrl,
}, },
ObjName: baseName, ObjName: baseName,
@ -179,13 +172,6 @@ func (t *ArchiveContentUploadTask) SetRetry(retry int, maxRetry int) {
} }
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error { func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error {
var err error
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
if err != nil {
return err
}
}
info, err := os.Stat(t.FilePath) info, err := os.Stat(t.FilePath)
if err != nil { if err != nil {
return err return err
@ -224,7 +210,7 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
} }
err = f(&ArchiveContentUploadTask{ err = f(&ArchiveContentUploadTask{
TaskExtension: task.TaskExtension{ TaskExtension: task.TaskExtension{
Creator: t.GetCreator(), Creator: t.Creator,
ApiUrl: t.ApiUrl, ApiUrl: t.ApiUrl,
}, },
ObjName: entry.Name(), ObjName: entry.Name(),
@ -243,11 +229,11 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *Arch
return es return es
} }
} else { } else {
t.SetTotalBytes(info.Size())
file, err := os.Open(t.FilePath) file, err := os.Open(t.FilePath)
if err != nil { if err != nil {
return err return err
} }
t.SetTotalBytes(info.Size())
fs := &stream.FileStream{ fs := &stream.FileStream{
Obj: &model.Object{ Obj: &model.Object{
Name: t.ObjName, Name: t.ObjName,
@ -379,13 +365,8 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
return nil, err return nil, err
} }
} }
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
tsk := &ArchiveDownloadTask{ tsk := &ArchiveDownloadTask{
TaskData: TaskData{ TaskData: TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcStorage: srcStorage, SrcStorage: srcStorage,
DstStorage: dstStorage, DstStorage: dstStorage,
SrcActualPath: srcObjActualPath, SrcActualPath: srcObjActualPath,
@ -396,6 +377,7 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
ArchiveDecompressArgs: args, ArchiveDecompressArgs: args,
} }
if ctx.Value(conf.NoTaskKey) != nil { if ctx.Value(conf.NoTaskKey) != nil {
tsk.Base.SetCtx(ctx)
uploadTask, err := tsk.RunWithoutPushUploadTask() uploadTask, err := tsk.RunWithoutPushUploadTask()
if err != nil { if err != nil {
return nil, errors.WithMessagef(err, "failed download [%s]", srcObjPath) return nil, errors.WithMessagef(err, "failed download [%s]", srcObjPath)
@ -403,12 +385,16 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
defer uploadTask.deleteSrcFile() defer uploadTask.deleteSrcFile()
var callback func(t *ArchiveContentUploadTask) error var callback func(t *ArchiveContentUploadTask) error
callback = func(t *ArchiveContentUploadTask) error { callback = func(t *ArchiveContentUploadTask) error {
t.Base.SetCtx(ctx)
e := t.RunWithNextTaskCallback(callback) e := t.RunWithNextTaskCallback(callback)
t.deleteSrcFile() t.deleteSrcFile()
return e return e
} }
uploadTask.Base.SetCtx(ctx)
return nil, uploadTask.RunWithNextTaskCallback(callback) return nil, uploadTask.RunWithNextTaskCallback(callback)
} else { } else {
tsk.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
tsk.ApiUrl = common.GetApiUrl(ctx)
ArchiveDownloadTaskManager.Add(tsk) ArchiveDownloadTaskManager.Add(tsk)
return tsk, nil return tsk, nil
} }

View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/op"
@ -52,17 +51,16 @@ func (t *FileTransferTask) Run() error {
t.ClearEndTime() t.ClearEndTime()
t.SetStartTime(time.Now()) t.SetStartTime(time.Now())
defer func() { t.SetEndTime(time.Now()) }() defer func() { t.SetEndTime(time.Now()) }()
var err error return t.RunWithNextTaskCallback(func(nextTask *FileTransferTask) error {
if t.SrcStorage == nil { nextTask.groupID = t.groupID
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) task_group.TransferCoordinator.AddTask(t.groupID, nil)
} if t.TaskType == copy {
if t.DstStorage == nil { CopyTaskManager.Add(nextTask)
t.DstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) } else {
} MoveTaskManager.Add(nextTask)
if err != nil { }
return errors.WithMessage(err, "failed get storage") return nil
} })
return putBetween2Storages(t, t.SrcStorage, t.DstStorage, t.SrcActualPath, t.DstActualPath)
} }
func (t *FileTransferTask) OnSucceeded() { func (t *FileTransferTask) OnSucceeded() {
@ -109,51 +107,11 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
return nil, err return nil, err
} }
} }
} else if ctx.Value(conf.NoTaskKey) != nil {
return nil, fmt.Errorf("can't %s files between two storages, please use the front-end ", taskType)
} }
// if ctx.Value(conf.NoTaskKey) != nil { // webdav
// srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
// if err != nil {
// return nil, errors.WithMessagef(err, "failed get src [%s] file", srcObjPath)
// }
// if !srcObj.IsDir() {
// // copy file directly
// link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{})
// if err != nil {
// return nil, errors.WithMessagef(err, "failed get [%s] link", srcObjPath)
// }
// // any link provided is seekable
// ss, err := stream.NewSeekableStream(&stream.FileStream{
// Obj: srcObj,
// Ctx: ctx,
// }, link)
// if err != nil {
// _ = link.Close()
// return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
// }
// if taskType == move {
// defer func() {
// task_group.TransferCoordinator.Done(dstDirPath, err == nil)
// }()
// task_group.TransferCoordinator.AddTask(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
// }
// err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, taskType == move)
// return nil, err
// } else {
// return nil, fmt.Errorf("can't %s dir two storages, please use the front-end ", taskType)
// }
// }
// not in the same storage // not in the same storage
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
t := &FileTransferTask{ t := &FileTransferTask{
TaskData: TaskData{ TaskData: TaskData{
TaskExtension: task.TaskExtension{
Creator: taskCreator,
ApiUrl: common.GetApiUrl(ctx),
},
SrcStorage: srcStorage, SrcStorage: srcStorage,
DstStorage: dstStorage, DstStorage: dstStorage,
SrcActualPath: srcObjActualPath, SrcActualPath: srcObjActualPath,
@ -162,8 +120,34 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
DstStorageMp: dstStorage.GetStorage().MountPath, DstStorageMp: dstStorage.GetStorage().MountPath,
}, },
TaskType: taskType, TaskType: taskType,
groupID: dstDirPath,
} }
if ctx.Value(conf.NoTaskKey) != nil {
var callback func(nextTask *FileTransferTask) error
hasSuccess := false
callback = func(nextTask *FileTransferTask) error {
nextTask.Base.SetCtx(ctx)
err := nextTask.RunWithNextTaskCallback(callback)
if err == nil {
hasSuccess = true
}
return err
}
t.Base.SetCtx(ctx)
err = t.RunWithNextTaskCallback(callback)
if hasSuccess || err == nil {
if taskType == move {
task_group.RefreshAndRemove(dstDirPath, task_group.SrcPathToRemove(srcObjPath))
} else {
op.DeleteCache(t.DstStorage, dstDirActualPath)
}
}
return nil, err
}
t.Creator, _ = ctx.Value(conf.UserKey).(*model.User)
t.ApiUrl = common.GetApiUrl(ctx)
t.groupID = dstDirPath
if taskType == copy { if taskType == copy {
task_group.TransferCoordinator.AddTask(dstDirPath, nil) task_group.TransferCoordinator.AddTask(dstDirPath, nil)
CopyTaskManager.Add(t) CopyTaskManager.Add(t)
@ -174,76 +158,69 @@ func transfer(ctx context.Context, taskType taskType, srcObjPath, dstDirPath str
return t, nil return t, nil
} }
func putBetween2Storages(t *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error { func (t *FileTransferTask) RunWithNextTaskCallback(f func(nextTask *FileTransferTask) error) error {
t.Status = "getting src object" t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, srcActualPath) srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcActualPath)
if err != nil { if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath) return errors.WithMessagef(err, "failed get src [%s] file", t.SrcActualPath)
} }
if srcObj.IsDir() { if srcObj.IsDir() {
t.Status = "src object is dir, listing objs" t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, srcActualPath, model.ListArgs{}) objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.ListArgs{})
if err != nil { if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", srcActualPath) return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcActualPath)
} }
dstActualPath := stdpath.Join(dstDirActualPath, srcObj.GetName()) dstActualPath := stdpath.Join(t.DstActualPath, srcObj.GetName())
if t.TaskType == copy { if t.TaskType == copy {
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath)) if t.Ctx().Value(conf.NoTaskKey) != nil {
defer op.DeleteCache(t.DstStorage, dstActualPath)
} else {
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(dstActualPath))
}
} }
for _, obj := range objs { for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) { if utils.IsCanceled(t.Ctx()) {
return nil return nil
} }
task := &FileTransferTask{ err = f(&FileTransferTask{
TaskType: t.TaskType, TaskType: t.TaskType,
TaskData: TaskData{ TaskData: TaskData{
TaskExtension: task.TaskExtension{ TaskExtension: task.TaskExtension{
Creator: t.GetCreator(), Creator: t.Creator,
ApiUrl: t.ApiUrl, ApiUrl: t.ApiUrl,
}, },
SrcStorage: srcStorage, SrcStorage: t.SrcStorage,
DstStorage: dstStorage, DstStorage: t.DstStorage,
SrcActualPath: stdpath.Join(srcActualPath, obj.GetName()), SrcActualPath: stdpath.Join(t.SrcActualPath, obj.GetName()),
DstActualPath: dstActualPath, DstActualPath: dstActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath, SrcStorageMp: t.SrcStorageMp,
DstStorageMp: dstStorage.GetStorage().MountPath, DstStorageMp: t.DstStorageMp,
}, },
groupID: t.groupID, })
} if err != nil {
task_group.TransferCoordinator.AddTask(t.groupID, nil) return err
if t.TaskType == copy {
CopyTaskManager.Add(task)
} else {
MoveTaskManager.Add(task)
} }
} }
t.Status = fmt.Sprintf("src object is dir, added all %s tasks of objs", t.TaskType) t.Status = fmt.Sprintf("src object is dir, added all %s tasks of objs", t.TaskType)
return nil return nil
} }
return putFileBetween2Storages(t, srcStorage, dstStorage, srcActualPath, dstDirActualPath)
}
func putFileBetween2Storages(tsk *FileTransferTask, srcStorage, dstStorage driver.Driver, srcActualPath, dstDirActualPath string) error { link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcActualPath)
if err != nil { if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcActualPath) return errors.WithMessagef(err, "failed get [%s] link", t.SrcActualPath)
}
tsk.SetTotalBytes(srcFile.GetSize())
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcActualPath, model.LinkArgs{})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcActualPath)
} }
// any link provided is seekable // any link provided is seekable
ss, err := stream.NewSeekableStream(&stream.FileStream{ ss, err := stream.NewSeekableStream(&stream.FileStream{
Obj: srcFile, Obj: srcObj,
Ctx: tsk.Ctx(), Ctx: t.Ctx(),
}, link) }, link)
if err != nil { if err != nil {
_ = link.Close() _ = link.Close()
return errors.WithMessagef(err, "failed get [%s] stream", srcActualPath) return errors.WithMessagef(err, "failed get [%s] stream", t.SrcActualPath)
} }
tsk.SetTotalBytes(ss.GetSize()) t.SetTotalBytes(ss.GetSize())
return op.Put(tsk.Ctx(), dstStorage, dstDirActualPath, ss, tsk.SetProgress, true) t.Status = "uploading"
return op.Put(t.Ctx(), t.DstStorage, t.DstActualPath, ss, t.SetProgress, true)
} }
var ( var (

View File

@ -6,7 +6,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type OnCompletionFunc func(groupID string, payloads []any) type OnCompletionFunc func(groupID string, payloads ...any)
type TaskGroupCoordinator struct { type TaskGroupCoordinator struct {
name string name string
mu sync.Mutex mu sync.Mutex
@ -71,7 +71,7 @@ func (tgc *TaskGroupCoordinator) Done(groupID string, success bool) {
if tgc.onCompletion != nil && state.hasSuccess { if tgc.onCompletion != nil && state.hasSuccess {
logrus.Debugf("OnCompletion:%s", groupID) logrus.Debugf("OnCompletion:%s", groupID)
tgc.mu.Unlock() tgc.mu.Unlock()
tgc.onCompletion(groupID, payloads) tgc.onCompletion(groupID, payloads...)
tgc.mu.Lock() tgc.mu.Lock()
} }
return return

View File

@ -17,7 +17,7 @@ type SrcPathToRemove string
// ActualPath // ActualPath
type DstPathToRefresh string type DstPathToRefresh string
func refreshAndRemove(dstPath string, payloads []any) { func RefreshAndRemove(dstPath string, payloads ...any) {
dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath) dstStorage, dstActualPath, err := op.GetStorageAndActualPath(dstPath)
if err != nil { if err != nil {
log.Error(errors.WithMessage(err, "failed get dst storage")) log.Error(errors.WithMessage(err, "failed get dst storage"))
@ -100,4 +100,4 @@ func verifyAndRemove(ctx context.Context, srcStorage, dstStorage driver.Driver,
return nil return nil
} }
var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", refreshAndRemove) var TransferCoordinator *TaskGroupCoordinator = NewTaskGroupCoordinator("RefreshAndRemove", RefreshAndRemove)