diff --git a/drivers/all.go b/drivers/all.go index d135aae6..2932ee22 100644 --- a/drivers/all.go +++ b/drivers/all.go @@ -61,6 +61,7 @@ import ( _ "github.com/OpenListTeam/OpenList/v4/drivers/smb" _ "github.com/OpenListTeam/OpenList/v4/drivers/strm" _ "github.com/OpenListTeam/OpenList/v4/drivers/teambition" + _ "github.com/OpenListTeam/OpenList/v4/drivers/teldrive" _ "github.com/OpenListTeam/OpenList/v4/drivers/terabox" _ "github.com/OpenListTeam/OpenList/v4/drivers/thunder" _ "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser" diff --git a/drivers/teldrive/copy.go b/drivers/teldrive/copy.go new file mode 100644 index 00000000..1118f63a --- /dev/null +++ b/drivers/teldrive/copy.go @@ -0,0 +1,137 @@ +package teldrive + +import ( + "fmt" + "net/http" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/go-resty/resty/v2" + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +func NewCopyManager(ctx context.Context, concurrent int, d *Teldrive) *CopyManager { + g, ctx := errgroup.WithContext(ctx) + + return &CopyManager{ + TaskChan: make(chan CopyTask, concurrent*2), + Sem: semaphore.NewWeighted(int64(concurrent)), + G: g, + Ctx: ctx, + d: d, + } +} + +func (cm *CopyManager) startWorkers() { + workerCount := cap(cm.TaskChan) / 2 + for i := 0; i < workerCount; i++ { + cm.G.Go(func() error { + return cm.worker() + }) + } +} + +func (cm *CopyManager) worker() error { + for { + select { + case task, ok := <-cm.TaskChan: + if !ok { + return nil + } + + if err := cm.Sem.Acquire(cm.Ctx, 1); err != nil { + return err + } + + var err error + + err = cm.processFile(task) + + cm.Sem.Release(1) + + if err != nil { + return fmt.Errorf("task processing failed: %w", err) + } + + case <-cm.Ctx.Done(): + return cm.Ctx.Err() + } + } +} + +func (cm *CopyManager) generateTasks(ctx context.Context, srcObj, dstDir model.Obj) error { + if srcObj.IsDir() { + return cm.generateFolderTasks(ctx, srcObj, dstDir) + } else { + // add single file task directly + select { + case cm.TaskChan <- CopyTask{SrcObj: srcObj, DstDir: dstDir}: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (cm *CopyManager) generateFolderTasks(ctx context.Context, srcDir, dstDir model.Obj) error { + objs, err := cm.d.List(ctx, srcDir, model.ListArgs{}) + if err != nil { + return fmt.Errorf("failed to list directory %s: %w", srcDir.GetPath(), err) + } + + err = cm.d.MakeDir(cm.Ctx, dstDir, srcDir.GetName()) + if err != nil || len(objs) == 0 { + return err + } + newDstDir := &model.Object{ + ID: dstDir.GetID(), + Path: dstDir.GetPath() + "/" + srcDir.GetName(), + Name: srcDir.GetName(), + IsFolder: true, + } + + for _, file := range objs { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + + srcFile := &model.Object{ + ID: file.GetID(), + Path: srcDir.GetPath() + "/" + file.GetName(), + Name: file.GetName(), + IsFolder: file.IsDir(), + } + + // 递归生成任务 + if err := cm.generateTasks(ctx, srcFile, newDstDir); err != nil { + return err + } + } + + return nil +} + +func (cm *CopyManager) processFile(task CopyTask) error { + return cm.copySingleFile(cm.Ctx, task.SrcObj, task.DstDir) +} + +func (cm *CopyManager) copySingleFile(ctx context.Context, srcObj, dstDir model.Obj) error { + // `override copy mode` should delete the existing file + if obj, err := cm.d.getFile(dstDir.GetPath(), srcObj.GetName(), srcObj.IsDir()); err == nil { + if err := cm.d.Remove(ctx, obj); err != nil { + return fmt.Errorf("failed to remove existing file: %w", err) + } + } + + // Do copy + return cm.d.request(http.MethodPost, "/api/files/{id}/copy", func(req *resty.Request) { + req.SetPathParam("id", srcObj.GetID()) + req.SetBody(base.Json{ + "newName": srcObj.GetName(), + "destination": dstDir.GetPath(), + }) + }, nil) +} diff --git a/drivers/teldrive/driver.go b/drivers/teldrive/driver.go new file mode 100644 index 00000000..541d2e3b --- /dev/null +++ b/drivers/teldrive/driver.go @@ -0,0 +1,217 @@ +package teldrive + +import ( + "context" + "fmt" + "math" + "net/http" + "net/url" + "strings" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/go-resty/resty/v2" + "github.com/google/uuid" +) + +type Teldrive struct { + model.Storage + Addition +} + +func (d *Teldrive) Config() driver.Config { + return config +} + +func (d *Teldrive) GetAddition() driver.Additional { + return &d.Addition +} + +func (d *Teldrive) Init(ctx context.Context) error { + d.Address = strings.TrimSuffix(d.Address, "/") + if d.Cookie == "" || !strings.HasPrefix(d.Cookie, "access_token=") { + return fmt.Errorf("cookie must start with 'access_token='") + } + if d.UploadConcurrency == 0 { + d.UploadConcurrency = 4 + } + if d.ChunkSize == 0 { + d.ChunkSize = 10 + } + + op.MustSaveDriverStorage(d) + return nil +} + +func (d *Teldrive) Drop(ctx context.Context) error { + return nil +} + +func (d *Teldrive) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { + var listResp ListResp + err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "path": dir.GetPath(), + "limit": "1000", // overide default 500, TODO pagination + }) + }, &listResp) + if err != nil { + return nil, err + } + + return utils.SliceConvert(listResp.Items, func(src Object) (model.Obj, error) { + return &model.Object{ + ID: src.ID, + Name: src.Name, + Size: func() int64 { + if src.Type == "folder" { + return 0 + } + return src.Size + }(), + IsFolder: src.Type == "folder", + Modified: src.UpdatedAt, + }, nil + }) +} + +func (d *Teldrive) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { + if d.UseShareLink { + shareObj, err := d.getShareFileById(file.GetID()) + if err != nil || shareObj == nil { + if err := d.createShareFile(file.GetID()); err != nil { + return nil, err + } + shareObj, err = d.getShareFileById(file.GetID()) + if err != nil { + return nil, err + } + } + return &model.Link{ + URL: d.Address + "/api/shares/" + url.PathEscape(shareObj.Id) + "/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()), + }, nil + } + return &model.Link{ + URL: d.Address + "/api/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()), + Header: http.Header{ + "Cookie": {d.Cookie}, + }, + }, nil +} + +func (d *Teldrive) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { + return d.request(http.MethodPost, "/api/files/mkdir", func(req *resty.Request) { + req.SetBody(map[string]interface{}{ + "path": parentDir.GetPath() + "/" + dirName, + }) + }, nil) +} + +func (d *Teldrive) Move(ctx context.Context, srcObj, dstDir model.Obj) error { + body := base.Json{ + "ids": []string{srcObj.GetID()}, + "destinationParent": dstDir.GetID(), + } + return d.request(http.MethodPost, "/api/files/move", func(req *resty.Request) { + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Rename(ctx context.Context, srcObj model.Obj, newName string) error { + body := base.Json{ + "name": newName, + } + return d.request(http.MethodPatch, "/api/files/{id}", func(req *resty.Request) { + req.SetPathParam("id", srcObj.GetID()) + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { + copyConcurrentLimit := 4 + copyManager := NewCopyManager(ctx, copyConcurrentLimit, d) + copyManager.startWorkers() + copyManager.G.Go(func() error { + defer close(copyManager.TaskChan) + return copyManager.generateTasks(ctx, srcObj, dstDir) + }) + return copyManager.G.Wait() +} + +func (d *Teldrive) Remove(ctx context.Context, obj model.Obj) error { + body := base.Json{ + "ids": []string{obj.GetID()}, + } + return d.request(http.MethodPost, "/api/files/delete", func(req *resty.Request) { + req.SetBody(body) + }, nil) +} + +func (d *Teldrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error { + fileId := uuid.New().String() + chunkSizeInMB := d.ChunkSize + chunkSize := chunkSizeInMB * 1024 * 1024 // Convert MB to bytes + totalSize := file.GetSize() + totalParts := int(math.Ceil(float64(totalSize) / float64(chunkSize))) + maxRetried := 3 + + // delete the upload task when finished or failed + defer func() { + _ = d.request(http.MethodDelete, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, nil) + }() + + if obj, err := d.getFile(dstDir.GetPath(), file.GetName(), file.IsDir()); err == nil { + if err = d.Remove(ctx, obj); err != nil { + return err + } + } + // start the upload process + if err := d.request(http.MethodGet, "/api/uploads/fileId", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, nil); err != nil { + return err + } + if totalSize == 0 { + return d.touch(file.GetName(), dstDir.GetPath()) + } + + if totalParts <= 1 { + return d.doSingleUpload(ctx, dstDir, file, up, totalParts, chunkSize, fileId) + } + + return d.doMultiUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId) +} + +func (d *Teldrive) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) { + // TODO get archive file meta-info, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) ListArchive(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) ([]model.Obj, error) { + // TODO list args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) Extract(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) (*model.Link, error) { + // TODO return link of file args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional + return nil, errs.NotImplement +} + +func (d *Teldrive) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.Obj, args model.ArchiveDecompressArgs) ([]model.Obj, error) { + // TODO extract args.InnerPath path in the archive srcObj to the dstDir location, optional + // a folder with the same name as the archive file needs to be created to store the extracted results if args.PutIntoNewDir + // return errs.NotImplement to use an internal archive tool + return nil, errs.NotImplement +} + +//func (d *Teldrive) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) { +// return nil, errs.NotSupport +//} + +var _ driver.Driver = (*Teldrive)(nil) diff --git a/drivers/teldrive/meta.go b/drivers/teldrive/meta.go new file mode 100644 index 00000000..23bae5f9 --- /dev/null +++ b/drivers/teldrive/meta.go @@ -0,0 +1,26 @@ +package teldrive + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/op" +) + +type Addition struct { + driver.RootPath + Address string `json:"url" required:"true"` + Cookie string `json:"cookie" type:"string" required:"true" help:"access_token=xxx"` + UseShareLink bool `json:"use_share_link" type:"bool" default:"false" help:"Create share link when getting link to support 302. If disabled, you need to enable web proxy."` + ChunkSize int64 `json:"chunk_size" type:"number" default:"10" help:"Chunk size in MiB"` + UploadConcurrency int64 `json:"upload_concurrency" type:"number" default:"4" help:"Concurrency upload requests"` +} + +var config = driver.Config{ + Name: "Teldrive", + DefaultRoot: "/", +} + +func init() { + op.RegisterDriver(func() driver.Driver { + return &Teldrive{} + }) +} diff --git a/drivers/teldrive/types.go b/drivers/teldrive/types.go new file mode 100644 index 00000000..f6399e06 --- /dev/null +++ b/drivers/teldrive/types.go @@ -0,0 +1,77 @@ +package teldrive + +import ( + "context" + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +type ErrResp struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type Object struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + MimeType string `json:"mimeType"` + Category string `json:"category,omitempty"` + ParentId string `json:"parentId"` + Size int64 `json:"size"` + Encrypted bool `json:"encrypted"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type ListResp struct { + Items []Object `json:"items"` + Meta struct { + Count int `json:"count"` + TotalPages int `json:"totalPages"` + CurrentPage int `json:"currentPage"` + } `json:"meta"` +} + +type FilePart struct { + Name string `json:"name"` + PartId int `json:"partId"` + PartNo int `json:"partNo"` + ChannelId int `json:"channelId"` + Size int `json:"size"` + Encrypted bool `json:"encrypted"` + Salt string `json:"salt"` +} + +type chunkTask struct { + chunkIdx int + fileName string + chunkSize int64 + reader *stream.SectionReader + ss *stream.StreamSectionReader +} + +type CopyManager struct { + TaskChan chan CopyTask + Sem *semaphore.Weighted + G *errgroup.Group + Ctx context.Context + d *Teldrive +} + +type CopyTask struct { + SrcObj model.Obj + DstDir model.Obj +} + +type ShareObj struct { + Id string `json:"id"` + Protected bool `json:"protected"` + UserId int `json:"userId"` + Type string `json:"type"` + Name string `json:"name"` + ExpiresAt time.Time `json:"expiresAt"` +} diff --git a/drivers/teldrive/upload.go b/drivers/teldrive/upload.go new file mode 100644 index 00000000..168d9bef --- /dev/null +++ b/drivers/teldrive/upload.go @@ -0,0 +1,373 @@ +package teldrive + +import ( + "fmt" + "io" + "net/http" + "sort" + "strconv" + "sync" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/avast/retry-go" + "github.com/go-resty/resty/v2" + "github.com/pkg/errors" + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +// create empty file +func (d *Teldrive) touch(name, path string) error { + uploadBody := base.Json{ + "name": name, + "type": "file", + "path": path, + } + if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) { + req.SetBody(uploadBody) + }, nil); err != nil { + return err + } + + return nil +} + +func (d *Teldrive) createFileOnUploadSuccess(name, id, path string, uploadedFileParts []FilePart, totalSize int64) error { + remoteFileParts, err := d.getFilePart(id) + if err != nil { + return err + } + // check if the uploaded file parts match the remote file parts + if len(remoteFileParts) != len(uploadedFileParts) { + return fmt.Errorf("[Teldrive] file parts count mismatch: expected %d, got %d", len(uploadedFileParts), len(remoteFileParts)) + } + formatParts := make([]base.Json, 0) + for _, p := range remoteFileParts { + formatParts = append(formatParts, base.Json{ + "id": p.PartId, + "salt": p.Salt, + }) + } + uploadBody := base.Json{ + "name": name, + "type": "file", + "path": path, + "parts": formatParts, + "size": totalSize, + } + // create file here + if err := d.request(http.MethodPost, "/api/files", func(req *resty.Request) { + req.SetBody(uploadBody) + }, nil); err != nil { + return err + } + + return nil +} + +func (d *Teldrive) checkFilePartExist(fileId string, partId int) (FilePart, error) { + var uploadedParts []FilePart + var filePart FilePart + + if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &uploadedParts); err != nil { + return filePart, err + } + + for _, part := range uploadedParts { + if part.PartId == partId { + return part, nil + } + } + + return filePart, nil +} + +func (d *Teldrive) getFilePart(fileId string) ([]FilePart, error) { + var uploadedParts []FilePart + if err := d.request(http.MethodGet, "/api/uploads/{id}", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &uploadedParts); err != nil { + return nil, err + } + + return uploadedParts, nil +} + +func (d *Teldrive) singleUploadRequest(fileId string, callback base.ReqCallback, resp interface{}) error { + url := d.Address + "/api/uploads/" + fileId + client := resty.New().SetTimeout(0) + + ctx := context.Background() + + req := client.R(). + SetContext(ctx) + req.SetHeader("Cookie", d.Cookie) + req.SetHeader("Content-Type", "application/octet-stream") + req.SetContentLength(true) + req.AddRetryCondition(func(r *resty.Response, err error) bool { + return false + }) + if callback != nil { + callback(req) + } + if resp != nil { + req.SetResult(resp) + } + var e ErrResp + req.SetError(&e) + _req, err := req.Execute(http.MethodPost, url) + if err != nil { + return err + } + + if _req.IsError() { + return &e + } + return nil +} + +func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress, + totalParts int, chunkSize int64, fileId string) error { + + totalSize := file.GetSize() + var fileParts []FilePart + var uploaded int64 = 0 + ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up) + if err != nil { + return err + } + + for uploaded < totalSize { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + curChunkSize := min(totalSize-uploaded, chunkSize) + rd, err := ss.GetSectionReader(uploaded, curChunkSize) + if err != nil { + return err + } + filePart := &FilePart{} + if err := retry.Do(func() error { + + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := d.singleUploadRequest(fileId, func(req *resty.Request) { + uploadParams := map[string]string{ + "partName": func() string { + digits := len(fmt.Sprintf("%d", totalParts)) + return file.GetName() + fmt.Sprintf(".%0*d", digits, 1) + }(), + "partNo": strconv.Itoa(1), + "fileName": file.GetName(), + } + req.SetQueryParams(uploadParams) + req.SetBody(driver.NewLimitedUploadStream(ctx, rd)) + req.SetHeader("Content-Length", strconv.FormatInt(curChunkSize, 10)) + }, filePart); err != nil { + return err + } + + return nil + }, + retry.Attempts(3), + retry.DelayType(retry.BackOffDelay), + retry.Delay(time.Second)); err != nil { + return err + } + + if filePart.Name != "" { + fileParts = append(fileParts, *filePart) + uploaded += curChunkSize + up(float64(uploaded) / float64(totalSize)) + ss.FreeSectionReader(rd) + } + + } + + return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize) +} + +func (d *Teldrive) doMultiUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress, + maxRetried, totalParts int, chunkSize int64, fileId string) error { + + concurrent := d.UploadConcurrency + g, ctx := errgroup.WithContext(ctx) + sem := semaphore.NewWeighted(int64(concurrent)) + chunkChan := make(chan chunkTask, concurrent*2) + resultChan := make(chan FilePart, concurrent) + totalSize := file.GetSize() + + ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up) + if err != nil { + return err + } + ssLock := sync.Mutex{} + g.Go(func() error { + defer close(chunkChan) + + chunkIdx := 0 + for chunkIdx < totalParts { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + offset := int64(chunkIdx) * chunkSize + curChunkSize := min(totalSize-offset, chunkSize) + + ssLock.Lock() + reader, err := ss.GetSectionReader(offset, curChunkSize) + ssLock.Unlock() + + if err != nil { + return err + } + task := chunkTask{ + chunkIdx: chunkIdx + 1, + chunkSize: curChunkSize, + fileName: file.GetName(), + reader: reader, + ss: ss, + } + // freeSectionReader will be called in d.uploadSingleChunk + select { + case chunkChan <- task: + chunkIdx++ + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + for i := 0; i < int(concurrent); i++ { + g.Go(func() error { + for task := range chunkChan { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } + + filePart, err := d.uploadSingleChunk(ctx, fileId, task, totalParts, maxRetried) + sem.Release(1) + + if err != nil { + return fmt.Errorf("upload chunk %d failed: %w", task.chunkIdx, err) + } + + select { + case resultChan <- *filePart: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + var fileParts []FilePart + var collectErr error + collectDone := make(chan struct{}) + + go func() { + defer close(collectDone) + fileParts = make([]FilePart, 0, totalParts) + + done := make(chan error, 1) + go func() { + done <- g.Wait() + close(resultChan) + }() + + for { + select { + case filePart, ok := <-resultChan: + if !ok { + collectErr = <-done + return + } + fileParts = append(fileParts, filePart) + case err := <-done: + collectErr = err + return + } + } + }() + + <-collectDone + + if collectErr != nil { + return fmt.Errorf("multi-upload failed: %w", collectErr) + } + sort.Slice(fileParts, func(i, j int) bool { + return fileParts[i].PartNo < fileParts[j].PartNo + }) + + return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize) +} + +func (d *Teldrive) uploadSingleChunk(ctx context.Context, fileId string, task chunkTask, totalParts, maxRetried int) (*FilePart, error) { + filePart := &FilePart{} + retryCount := 0 + defer task.ss.FreeSectionReader(task.reader) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if existingPart, err := d.checkFilePartExist(fileId, task.chunkIdx); err == nil && existingPart.Name != "" { + return &existingPart, nil + } + + err := d.singleUploadRequest(fileId, func(req *resty.Request) { + uploadParams := map[string]string{ + "partName": func() string { + digits := len(fmt.Sprintf("%d", totalParts)) + return task.fileName + fmt.Sprintf(".%0*d", digits, task.chunkIdx) + }(), + "partNo": strconv.Itoa(task.chunkIdx), + "fileName": task.fileName, + } + req.SetQueryParams(uploadParams) + req.SetBody(driver.NewLimitedUploadStream(ctx, task.reader)) + req.SetHeader("Content-Length", strconv.Itoa(int(task.chunkSize))) + }, filePart) + + if err == nil { + return filePart, nil + } + + if retryCount >= maxRetried { + return nil, fmt.Errorf("upload failed after %d retries: %w", maxRetried, err) + } + + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + continue + } + + retryCount++ + utils.Log.Errorf("[Teldrive] upload error: %v, retrying %d times", err, retryCount) + + backoffDuration := time.Duration(retryCount*retryCount) * time.Second + if backoffDuration > 30*time.Second { + backoffDuration = 30 * time.Second + } + + select { + case <-time.After(backoffDuration): + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} diff --git a/drivers/teldrive/util.go b/drivers/teldrive/util.go new file mode 100644 index 00000000..ca3cccf9 --- /dev/null +++ b/drivers/teldrive/util.go @@ -0,0 +1,109 @@ +package teldrive + +import ( + "fmt" + "net/http" + "time" + + "github.com/OpenListTeam/OpenList/v4/drivers/base" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/go-resty/resty/v2" +) + +// do others that not defined in Driver interface + +func (d *Teldrive) request(method string, pathname string, callback base.ReqCallback, resp interface{}) error { + url := d.Address + pathname + req := base.RestyClient.R() + req.SetHeader("Cookie", d.Cookie) + if callback != nil { + callback(req) + } + if resp != nil { + req.SetResult(resp) + } + var e ErrResp + req.SetError(&e) + _req, err := req.Execute(method, url) + if err != nil { + return err + } + + if _req.IsError() { + return &e + } + return nil +} + +func (d *Teldrive) getFile(path, name string, isFolder bool) (model.Obj, error) { + resp := &ListResp{} + err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "path": path, + "name": name, + "type": func() string { + if isFolder { + return "folder" + } + return "file" + }(), + "operation": "find", + }) + }, resp) + if err != nil { + return nil, err + } + if len(resp.Items) == 0 { + return nil, fmt.Errorf("file not found: %s/%s", path, name) + } + obj := resp.Items[0] + return &model.Object{ + ID: obj.ID, + Name: obj.Name, + Size: obj.Size, + IsFolder: obj.Type == "folder", + }, err +} + +func (err *ErrResp) Error() string { + if err == nil { + return "" + } + + return fmt.Sprintf("[Teldrive] message:%s Error code:%d", err.Message, err.Code) +} + +func (d *Teldrive) createShareFile(fileId string) error { + var errResp ErrResp + if err := d.request(http.MethodPost, "/api/files/{id}/share", func(req *resty.Request) { + req.SetPathParam("id", fileId) + req.SetBody(base.Json{ + "expiresAt": getDateTime(), + }) + }, &errResp); err != nil { + return err + } + + if errResp.Message != "" { + return &errResp + } + + return nil +} + +func (d *Teldrive) getShareFileById(fileId string) (*ShareObj, error) { + var shareObj ShareObj + if err := d.request(http.MethodGet, "/api/files/{id}/share", func(req *resty.Request) { + req.SetPathParam("id", fileId) + }, &shareObj); err != nil { + return nil, err + } + + return &shareObj, nil +} + +func getDateTime() string { + now := time.Now().UTC() + formattedWithMs := now.Add(time.Hour * 1).Format("2006-01-02T15:04:05.000Z") + return formattedWithMs +}