fix(189pc): crashes when upload cancelled (#79)

* fix(189pc): crashes when upload cancelled

Signed-off-by: XZB-1248 <28593573+XZB-1248@users.noreply.github.com>

* fix(189pc): replace semaphore with errgroup.Group.SetLimit

---------

Signed-off-by: XZB-1248 <28593573+XZB-1248@users.noreply.github.com>
Co-authored-by: KirCute <951206789@qq.com>
This commit is contained in:
XZB-1248
2025-06-17 00:13:31 +08:00
committed by GitHub
parent 5a4649c929
commit 87ca1b96ae
5 changed files with 12 additions and 35 deletions

View File

@ -18,8 +18,6 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sync/semaphore"
"github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/drivers/base"
"github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/conf"
"github.com/OpenListTeam/OpenList/internal/driver" "github.com/OpenListTeam/OpenList/internal/driver"
@ -506,7 +504,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
retry.Attempts(3), retry.Attempts(3),
retry.Delay(time.Second), retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay)) retry.DelayType(retry.BackOffDelay))
sem := semaphore.NewWeighted(3) threadG.SetLimit(3)
count := int(size / sliceSize) count := int(size / sliceSize)
lastPartSize := size % sliceSize lastPartSize := size % sliceSize
@ -531,7 +529,6 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
// 读取块 // 读取块
silceMd5.Reset() silceMd5.Reset()
if _, err := io.ReadFull(teeReader, byteData); err != io.EOF && err != nil { if _, err := io.ReadFull(teeReader, byteData); err != io.EOF && err != nil {
sem.Release(1)
return nil, err return nil, err
} }
@ -541,10 +538,6 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes)) partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes))
threadG.Go(func(ctx context.Context) error { threadG.Go(func(ctx context.Context) error {
if err = sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, initMultiUpload.Data.UploadFileID, partInfo) uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, initMultiUpload.Data.UploadFileID, partInfo)
if err != nil { if err != nil {
return err return err

View File

@ -12,8 +12,6 @@ import (
"strconv" "strconv"
"time" "time"
"golang.org/x/sync/semaphore"
"github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/drivers/base"
"github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/conf"
"github.com/OpenListTeam/OpenList/internal/driver" "github.com/OpenListTeam/OpenList/internal/driver"
@ -297,7 +295,8 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
retry.Attempts(1), retry.Attempts(1),
retry.Delay(time.Second), retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay)) retry.DelayType(retry.BackOffDelay))
sem := semaphore.NewWeighted(3) threadG.SetLimit(3)
for i, partseq := range precreateResp.BlockList { for i, partseq := range precreateResp.BlockList {
if utils.IsCanceled(upCtx) { if utils.IsCanceled(upCtx) {
break break
@ -308,10 +307,6 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
byteSize = lastBlockSize byteSize = lastBlockSize
} }
threadG.Go(func(ctx context.Context) error { threadG.Go(func(ctx context.Context) error {
if err = sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
params := map[string]string{ params := map[string]string{
"method": "upload", "method": "upload",
"access_token": d.AccessToken, "access_token": d.AccessToken,

View File

@ -13,8 +13,6 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sync/semaphore"
"github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/drivers/base"
"github.com/OpenListTeam/OpenList/internal/conf" "github.com/OpenListTeam/OpenList/internal/conf"
"github.com/OpenListTeam/OpenList/internal/driver" "github.com/OpenListTeam/OpenList/internal/driver"
@ -344,7 +342,8 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil
retry.Attempts(3), retry.Attempts(3),
retry.Delay(time.Second), retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay)) retry.DelayType(retry.BackOffDelay))
sem := semaphore.NewWeighted(3) threadG.SetLimit(3)
for i, partseq := range precreateResp.BlockList { for i, partseq := range precreateResp.BlockList {
if utils.IsCanceled(upCtx) { if utils.IsCanceled(upCtx) {
break break
@ -356,10 +355,6 @@ func (d *BaiduPhoto) Put(ctx context.Context, dstDir model.Obj, stream model.Fil
} }
threadG.Go(func(ctx context.Context) error { threadG.Go(func(ctx context.Context) error {
if err = sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
uploadParams := map[string]string{ uploadParams := map[string]string{
"method": "upload", "method": "upload",
"path": params["path"], "path": params["path"],

View File

@ -10,8 +10,6 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sync/semaphore"
"github.com/OpenListTeam/OpenList/drivers/base" "github.com/OpenListTeam/OpenList/drivers/base"
"github.com/OpenListTeam/OpenList/internal/driver" "github.com/OpenListTeam/OpenList/internal/driver"
"github.com/OpenListTeam/OpenList/internal/model" "github.com/OpenListTeam/OpenList/internal/model"
@ -300,7 +298,7 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre
retry.Attempts(3), retry.Attempts(3),
retry.Delay(time.Second), retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay)) retry.DelayType(retry.BackOffDelay))
sem := semaphore.NewWeighted(3) threadG.SetLimit(3)
// step.3 // step.3
parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfos) parts, err := d.client.GetAllMultiUploadUrls(initUpdload.UploadFileID, initUpdload.PartInfos)
@ -319,10 +317,6 @@ func (d *MoPan) Put(ctx context.Context, dstDir model.Obj, stream model.FileStre
// step.4 // step.4
threadG.Go(func(ctx context.Context) error { threadG.Go(func(ctx context.Context) error {
if err = sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
reader := io.NewSectionReader(file, int64(part.PartNumber-1)*initUpdload.PartSize, byteSize) reader := io.NewSectionReader(file, int64(part.PartNumber-1)*initUpdload.PartSize, byteSize)
req, err := part.NewRequest(ctx, driver.NewLimitedUploadStream(ctx, reader)) req, err := part.NewRequest(ctx, driver.NewLimitedUploadStream(ctx, reader))
if err != nil { if err != nil {

View File

@ -97,9 +97,9 @@ type Put interface {
// before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN` // before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN`
// if your file chunks are sufficiently small (less than about 50KB). // if your file chunks are sufficiently small (less than about 50KB).
// NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if // NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if
// you use a `errgroup.Group` to upload each chunk in parallel, you should consider using a recursive // you use a `errgroup.Group` to upload each chunk in parallel, you should use `Group.SetLimit` to
// mutex like `semaphore.Weighted` to limit the maximum number of upload threads, preventing excessive // limit the maximum number of upload threads, preventing excessive memory usage caused by buffering
// memory usage caused by buffering too many file chunks awaiting upload. // too many file chunks awaiting upload.
Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) error Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) error
} }
@ -156,9 +156,9 @@ type PutResult interface {
// before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN` // before uploading the file or file chunks. Or you can directly call `driver.ServerUploadLimitWaitN`
// if your file chunks are sufficiently small (less than about 50KB). // if your file chunks are sufficiently small (less than about 50KB).
// NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if // NOTE that the network speed may be significantly slower than the stream's read speed. Therefore, if
// you use a `errgroup.Group` to upload each chunk in parallel, you should consider using a recursive // you use a `errgroup.Group` to upload each chunk in parallel, you should use `Group.SetLimit` to
// mutex like `semaphore.Weighted` to limit the maximum number of upload threads, preventing excessive // limit the maximum number of upload threads, preventing excessive memory usage caused by buffering
// memory usage caused by buffering too many file chunks awaiting upload. // too many file chunks awaiting upload.
Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) (model.Obj, error) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up UpdateProgress) (model.Obj, error)
} }