mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-09-19 20:26:26 +08:00

* feat(task): add task hook,batch task
refactor(move): move use CopyTask
* Update internal/task/batch_task/refresh.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
* fix: upload task allFinish judge
* Update internal/task/batch_task/refresh.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
* feat: enhance concurrency safety
* 优化代码
* 解压缩
* 修复死锁
* refactor(move): move as task
* 重构,优化
* .
* 优化,修复bug
* .
* 修复bug
* feat: add task retry judge
* 代理Task.SetState函数来判断Task的生命周期
* chore: use OnSucceeded、OnFailed、OnBeforeRetry functions
* 优化
* 优化,去除重复代码
* .
* 优化
* .
* webdav
* Revert "fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to."
This reverts commit 5f03edd683
.
---------
Signed-off-by: Seven <53081179+Seven66677731@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: j2rong4cn <j2rong@qq.com>
53 lines
4.4 KiB
Go
53 lines
4.4 KiB
Go
package bootstrap
|
|
|
|
import (
|
|
"github.com/OpenListTeam/OpenList/v4/internal/conf"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/db"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/fs"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/offline_download/tool"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
|
"github.com/OpenListTeam/OpenList/v4/internal/setting"
|
|
"github.com/OpenListTeam/tache"
|
|
)
|
|
|
|
func taskFilterNegative(num int) int64 {
|
|
if num < 0 {
|
|
num = 0
|
|
}
|
|
return int64(num)
|
|
}
|
|
|
|
func InitTaskManager() {
|
|
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist
|
|
op.RegisterSettingChangingCallback(func() {
|
|
fs.UploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)))
|
|
})
|
|
fs.CopyTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
|
|
op.RegisterSettingChangingCallback(func() {
|
|
fs.CopyTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)))
|
|
})
|
|
fs.MoveTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry))
|
|
op.RegisterSettingChangingCallback(func() {
|
|
fs.MoveTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)))
|
|
})
|
|
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(setting.GetInt(conf.TaskOfflineDownloadThreadsNum, conf.Conf.Tasks.Download.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant), db.UpdateTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
|
|
op.RegisterSettingChangingCallback(func() {
|
|
tool.DownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskOfflineDownloadThreadsNum, conf.Conf.Tasks.Download.Workers)))
|
|
})
|
|
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(setting.GetInt(conf.TaskOfflineDownloadTransferThreadsNum, conf.Conf.Tasks.Transfer.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant), db.UpdateTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
|
|
op.RegisterSettingChangingCallback(func() {
|
|
tool.TransferTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskOfflineDownloadTransferThreadsNum, conf.Conf.Tasks.Transfer.Workers)))
|
|
})
|
|
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
|
|
CleanTempDir()
|
|
}
|
|
fs.ArchiveDownloadTaskManager = tache.NewManager[*fs.ArchiveDownloadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant), db.UpdateTaskDataFunc("decompress", conf.Conf.Tasks.Decompress.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Decompress.MaxRetry))
|
|
op.RegisterSettingChangingCallback(func() {
|
|
fs.ArchiveDownloadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressDownloadThreadsNum, conf.Conf.Tasks.Decompress.Workers)))
|
|
})
|
|
fs.ArchiveContentUploadTaskManager.Manager = tache.NewManager[*fs.ArchiveContentUploadTask](tache.WithWorks(setting.GetInt(conf.TaskDecompressUploadThreadsNum, conf.Conf.Tasks.DecompressUpload.Workers)), tache.WithMaxRetry(conf.Conf.Tasks.DecompressUpload.MaxRetry)) //decompress upload will not support persist
|
|
op.RegisterSettingChangingCallback(func() {
|
|
fs.ArchiveContentUploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressUploadThreadsNum, conf.Conf.Tasks.DecompressUpload.Workers)))
|
|
})
|
|
}
|