diff --git a/drivers/189pc/utils.go b/drivers/189pc/utils.go index b3552162..2251dbc5 100644 --- a/drivers/189pc/utils.go +++ b/drivers/189pc/utils.go @@ -18,8 +18,6 @@ import ( "strings" "time" - "golang.org/x/sync/semaphore" - "github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/driver" @@ -506,7 +504,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - sem := semaphore.NewWeighted(3) + threadG.SetLimit(3) count := int(size / sliceSize) lastPartSize := size % sliceSize @@ -531,7 +529,6 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo // 读取块 silceMd5.Reset() if _, err := io.ReadFull(teeReader, byteData); err != io.EOF && err != nil { - sem.Release(1) return nil, err } @@ -541,10 +538,6 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes)) threadG.Go(func(ctx context.Context) error { - if err = sem.Acquire(ctx, 1); err != nil { - return err - } - defer sem.Release(1) uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, initMultiUpload.Data.UploadFileID, partInfo) if err != nil { return err diff --git a/drivers/baidu_netdisk/driver.go b/drivers/baidu_netdisk/driver.go index e005a8d1..98973391 100644 --- a/drivers/baidu_netdisk/driver.go +++ b/drivers/baidu_netdisk/driver.go @@ -12,8 +12,6 @@ import ( "strconv" "time" - "golang.org/x/sync/semaphore" - "github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/driver" @@ -297,7 +295,8 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F retry.Attempts(1), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - sem := semaphore.NewWeighted(3) + threadG.SetLimit(3) + for i, partseq := range precreateResp.BlockList { if utils.IsCanceled(upCtx) { break @@ -308,10 +307,6 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F byteSize = lastBlockSize } threadG.Go(func(ctx context.Context) error { - if err = sem.Acquire(ctx, 1); err != nil { - return err - } - defer sem.Release(1) params := map[string]string{ "method": "upload", "access_token": d.AccessToken, diff --git a/drivers/baidu_photo/driver.go b/drivers/baidu_photo/driver.go index cac16eaa..1dd911b5 100644 --- a/drivers/baidu_photo/driver.go +++ b/drivers/baidu_photo/driver.go @@ -13,8 +13,6 @@ import ( "strings" "time" - "golang.org/x/sync/semaphore" - "github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/driver" @@ -344,7 +342,8 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - sem := semaphore.NewWeighted(3) + threadG.SetLimit(3) + for i, partseq := range precreateResp.BlockList { if utils.IsCanceled(upCtx) { break @@ -356,10 +355,6 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil } threadG.Go(func(ctx context.Context) error { - if err = sem.Acquire(ctx, 1); err != nil { - return err - } - defer sem.Release(1) uploadParams := map[string]string{ "method": "upload", "path": params["path"], diff --git a/drivers/mopan/driver.go b/drivers/mopan/driver.go index adf1f8ab..12208cb1 100644 --- a/drivers/mopan/driver.go +++ b/drivers/mopan/driver.go @@ -10,8 +10,6 @@ import ( "strings" "time" - "golang.org/x/sync/semaphore" - "github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/internal/driver" "github.com/OpenListTeam/OpenList/internal/model" @@ -300,7 +298,7 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre retry.Attempts(3), retry.Delay(time.Second), retry.DelayType(retry.BackOffDelay)) - sem := semaphore.NewWeighted(3) + threadG.SetLimit(3) // step.3 parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfos) @@ -319,10 +317,6 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre // step.4 threadG.Go(func(ctx context.Context) error { - if err = sem.Acquire(ctx, 1); err != nil { - return err - } - defer sem.Release(1) reader := io.NewSectionReader(file, int64(part.PartNumber-1)*initUpdload.PartSize, byteSize) req, err := part.NewRequest(ctx, driver.NewLimitedUploadStream(ctx, reader)) if err != nil { diff --git a/internal/driver/driver.go b/internal/driver/driver.go index 88e00062..4851971c 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -97,9 +97,9 @@ type Put interface { // before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN` // if your file chunks are sufficiently small (less than about 50KB). // NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if - // you use a `errgroup.Group` to upload each chunk in parallel, you should consider using a recursive - // mutex like `semaphore.Weighted` to limit the maximum number of upload threads, preventing excessive - // memory usage caused by buffering too many file chunks awaiting upload. + // you use a `errgroup.Group` to upload each chunk in parallel, you should use `Group.SetLimit` to + // limit the maximum number of upload threads, preventing excessive memory usage caused by buffering + // too many file chunks awaiting upload. Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) error } @@ -156,9 +156,9 @@ type PutResult interface { // before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN` // if your file chunks are sufficiently small (less than about 50KB). // NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if - // you use a `errgroup.Group` to upload each chunk in parallel, you should consider using a recursive - // mutex like `semaphore.Weighted` to limit the maximum number of upload threads, preventing excessive - // memory usage caused by buffering too many file chunks awaiting upload. + // you use a `errgroup.Group` to upload each chunk in parallel, you should use `Group.SetLimit` to + // limit the maximum number of upload threads, preventing excessive memory usage caused by buffering + // too many file chunks awaiting upload. Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) (model.Obj, error) }