mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-09-19 04:06:18 +08:00
refactor(cloudreve): use retry-go
for net/http
uploads (#773)
* refactor(cloudreve): use retry-go for uploads * refactor(cloudreve_v4): use retry-go for uploads * refactor(onedrive): use retry-go for uploads * refactor(onedrive_app): use retry-go for uploads * chore(onedrive_app): remove unnecessary error handling for host retrieval * feat(cloudreve): move read logic inside retry block * feat(cloudreve_v4): move read logic inside retry block * feat(onedrive): move read logic inside retry block * feat(onedrive_app): move read logic inside retry block
This commit is contained in:
@ -20,6 +20,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/setting"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/cookie"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/avast/retry-go"
|
||||
"github.com/go-resty/resty/v2"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
@ -240,14 +241,14 @@ func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u U
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := stream.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[Cloudreve-Remote] upload range: %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(stream, byteData)
|
||||
@ -255,24 +256,21 @@ func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u U
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("POST", uploadUrl+"?chunk="+strconv.Itoa(chunk),
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadUrl+"?chunk="+strconv.Itoa(chunk),
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Authorization", fmt.Sprint(credential))
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
err = func() error {
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != 200 {
|
||||
return errors.New(res.Status)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
}
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
@ -287,21 +285,17 @@ func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u U
|
||||
return errors.New(up.Msg)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err == nil {
|
||||
retryCount = 0
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||
chunk++
|
||||
} else {
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error: %s", maxRetries, err)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[Cloudreve-Remote] server errors while uploading, retrying after %v...", backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -310,14 +304,14 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
|
||||
uploadUrl := u.UploadURLs[0]
|
||||
var finish int64 = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := stream.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[Cloudreve-OneDrive] upload range: %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(stream, byteData)
|
||||
@ -325,41 +319,39 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl,
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize()))
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
|
||||
switch {
|
||||
case res.StatusCode >= 500 && res.StatusCode <= 504:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
res.Body.Close()
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error %d", maxRetries, res.StatusCode)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[Cloudreve-OneDrive] server errors %d while uploading, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200:
|
||||
data, _ := io.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
return errors.New(string(data))
|
||||
default:
|
||||
res.Body.Close()
|
||||
retryCount = 0
|
||||
return nil
|
||||
}
|
||||
}, retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||
}
|
||||
}
|
||||
// 上传成功发送回调请求
|
||||
return d.request(http.MethodPost, "/callback/onedrive/finish/"+u.SessionID, func(req *resty.Request) {
|
||||
req.SetBody("{}")
|
||||
@ -371,14 +363,14 @@ func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u Uploa
|
||||
var chunk int = 0
|
||||
var etags []string
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := stream.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[Cloudreve-S3] upload range: %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(stream, byteData)
|
||||
@ -386,13 +378,13 @@ func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u Uploa
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("PUT", u.UploadURLs[chunk],
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.UploadURLs[chunk],
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -401,24 +393,24 @@ func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u Uploa
|
||||
res.Body.Close()
|
||||
switch {
|
||||
case res.StatusCode != 200:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error %d", maxRetries, res.StatusCode)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[Cloudreve-S3] server errors %d while uploading, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case etag == "":
|
||||
return errors.New("failed to get ETag from header")
|
||||
default:
|
||||
retryCount = 0
|
||||
etags = append(etags, etag)
|
||||
return nil
|
||||
}
|
||||
}, retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||
chunk++
|
||||
}
|
||||
}
|
||||
|
||||
// s3LikeFinishUpload
|
||||
// https://github.com/cloudreve/frontend/blob/b485bf297974cbe4834d2e8e744ae7b7e5b2ad39/src/component/Uploader/core/api/index.ts#L204-L252
|
||||
bodyBuilder := &strings.Builder{}
|
||||
@ -431,8 +423,8 @@ func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u Uploa
|
||||
))
|
||||
}
|
||||
bodyBuilder.WriteString("</CompleteMultipartUpload>")
|
||||
req, err := http.NewRequest(
|
||||
"POST",
|
||||
req, err := http.NewRequestWithContext(ctx,
|
||||
http.MethodPost,
|
||||
u.CompleteURL,
|
||||
strings.NewReader(bodyBuilder.String()),
|
||||
)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/setting"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/avast/retry-go"
|
||||
"github.com/go-resty/resty/v2"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
@ -255,14 +256,14 @@ func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u F
|
||||
var finish int64 = 0
|
||||
var chunk int = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := file.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[CloudreveV4-Remote] upload range: %d-%d/%d", finish, finish+byteSize-1, file.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(file, byteData)
|
||||
@ -270,24 +271,22 @@ func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u F
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("POST", uploadUrl+"?chunk="+strconv.Itoa(chunk),
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadUrl+"?chunk="+strconv.Itoa(chunk),
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Authorization", fmt.Sprint(credential))
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
err = func() error {
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != 200 {
|
||||
return errors.New(res.Status)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
}
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
@ -302,21 +301,16 @@ func (d *CloudreveV4) upRemote(ctx context.Context, file model.FileStreamer, u F
|
||||
return errors.New(up.Msg)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err == nil {
|
||||
retryCount = 0
|
||||
}, retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(file.GetSize()))
|
||||
chunk++
|
||||
} else {
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error: %s", maxRetries, err)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[Cloudreve-Remote] server errors while uploading, retrying after %v...", backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -325,14 +319,14 @@ func (d *CloudreveV4) upOneDrive(ctx context.Context, file model.FileStreamer, u
|
||||
uploadUrl := u.UploadUrls[0]
|
||||
var finish int64 = 0
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := file.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[CloudreveV4-OneDrive] upload range: %d-%d/%d", finish, finish+byteSize-1, file.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(file, byteData)
|
||||
@ -340,41 +334,40 @@ func (d *CloudreveV4) upOneDrive(ctx context.Context, file model.FileStreamer, u
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPut, uploadUrl, driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl,
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, file.GetSize()))
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
|
||||
switch {
|
||||
case res.StatusCode >= 500 && res.StatusCode <= 504:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
res.Body.Close()
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error %d", maxRetries, res.StatusCode)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[CloudreveV4-OneDrive] server errors %d while uploading, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200:
|
||||
data, _ := io.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
return errors.New(string(data))
|
||||
default:
|
||||
res.Body.Close()
|
||||
retryCount = 0
|
||||
return nil
|
||||
}
|
||||
}, retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(file.GetSize()))
|
||||
}
|
||||
}
|
||||
// 上传成功发送回调请求
|
||||
return d.request(http.MethodPost, "/callback/onedrive/"+u.SessionID+"/"+u.CallbackSecret, func(req *resty.Request) {
|
||||
req.SetBody("{}")
|
||||
@ -386,14 +379,14 @@ func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileU
|
||||
var chunk int = 0
|
||||
var etags []string
|
||||
DEFAULT := int64(u.ChunkSize)
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < file.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := file.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err := retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[CloudreveV4-S3] upload range: %d-%d/%d", finish, finish+byteSize-1, file.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(file, byteData)
|
||||
@ -401,13 +394,13 @@ func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileU
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodPut, u.UploadUrls[chunk],
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.UploadUrls[chunk],
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
req.Header.Set("User-Agent", d.getUA())
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -416,23 +409,25 @@ func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileU
|
||||
res.Body.Close()
|
||||
switch {
|
||||
case res.StatusCode != 200:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors", maxRetries)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("server error %d, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case etag == "":
|
||||
return errors.New("failed to get ETag from header")
|
||||
default:
|
||||
retryCount = 0
|
||||
etags = append(etags, etag)
|
||||
return nil
|
||||
}
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(file.GetSize()))
|
||||
chunk++
|
||||
}
|
||||
}
|
||||
|
||||
// s3LikeFinishUpload
|
||||
bodyBuilder := &strings.Builder{}
|
||||
@ -445,8 +440,8 @@ func (d *CloudreveV4) upS3(ctx context.Context, file model.FileStreamer, u FileU
|
||||
))
|
||||
}
|
||||
bodyBuilder.WriteString("</CompleteMultipartUpload>")
|
||||
req, err := http.NewRequest(
|
||||
"POST",
|
||||
req, err := http.NewRequestWithContext(ctx,
|
||||
http.MethodPost,
|
||||
u.CompleteURL,
|
||||
strings.NewReader(bodyBuilder.String()),
|
||||
)
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/avast/retry-go"
|
||||
"github.com/go-resty/resty/v2"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
@ -230,7 +231,7 @@ func toAPIMetadata(stream model.FileStreamer) Metadata {
|
||||
|
||||
func (d *Onedrive) upBig(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
|
||||
url := d.GetMetaUrl(false, stdpath.Join(dstDir.GetPath(), stream.GetName())) + "/createUploadSession"
|
||||
metadata := map[string]interface{}{"item": toAPIMetadata(stream)}
|
||||
metadata := map[string]any{"item": toAPIMetadata(stream)}
|
||||
res, err := d.Request(url, http.MethodPost, func(req *resty.Request) {
|
||||
req.SetBody(metadata).SetContext(ctx)
|
||||
}, nil)
|
||||
@ -240,14 +241,14 @@ func (d *Onedrive) upBig(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
DEFAULT := d.ChunkSize * 1024 * 1024
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := stream.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err = retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[Onedrive] upload range: %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(stream, byteData)
|
||||
@ -255,39 +256,38 @@ func (d *Onedrive) upBig(ctx context.Context, dstDir model.Obj, stream model.Fil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl,
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize()))
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
|
||||
switch {
|
||||
case res.StatusCode >= 500 && res.StatusCode <= 504:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
res.Body.Close()
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error %d", maxRetries, res.StatusCode)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[Onedrive] server errors %d while uploading, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200:
|
||||
data, _ := io.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
return errors.New(string(data))
|
||||
default:
|
||||
res.Body.Close()
|
||||
retryCount = 0
|
||||
return nil
|
||||
}
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/model"
|
||||
"github.com/OpenListTeam/OpenList/v4/internal/op"
|
||||
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
|
||||
"github.com/avast/retry-go"
|
||||
"github.com/go-resty/resty/v2"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
@ -40,7 +41,7 @@ var onedriveHostMap = map[string]Host{
|
||||
}
|
||||
|
||||
func (d *OnedriveAPP) GetMetaUrl(auth bool, path string) string {
|
||||
host, _ := onedriveHostMap[d.Region]
|
||||
host := onedriveHostMap[d.Region]
|
||||
path = utils.EncodePath(path, true)
|
||||
if auth {
|
||||
return host.Oauth
|
||||
@ -154,14 +155,14 @@ func (d *OnedriveAPP) upBig(ctx context.Context, dstDir model.Obj, stream model.
|
||||
uploadUrl := jsoniter.Get(res, "uploadUrl").ToString()
|
||||
var finish int64 = 0
|
||||
DEFAULT := d.ChunkSize * 1024 * 1024
|
||||
retryCount := 0
|
||||
maxRetries := 3
|
||||
for finish < stream.GetSize() {
|
||||
if utils.IsCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
left := stream.GetSize() - finish
|
||||
byteSize := min(left, DEFAULT)
|
||||
err = retry.Do(
|
||||
func() error {
|
||||
utils.Log.Debugf("[OnedriveAPP] upload range: %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())
|
||||
byteData := make([]byte, byteSize)
|
||||
n, err := io.ReadFull(stream, byteData)
|
||||
@ -169,39 +170,38 @@ func (d *OnedriveAPP) upBig(ctx context.Context, dstDir model.Obj, stream model.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl,
|
||||
driver.NewLimitedUploadStream(ctx, bytes.NewReader(byteData)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = byteSize
|
||||
// req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
|
||||
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize()))
|
||||
res, err := base.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
|
||||
switch {
|
||||
case res.StatusCode >= 500 && res.StatusCode <= 504:
|
||||
retryCount++
|
||||
if retryCount > maxRetries {
|
||||
res.Body.Close()
|
||||
return fmt.Errorf("upload failed after %d retries due to server errors, error %d", maxRetries, res.StatusCode)
|
||||
}
|
||||
backoff := time.Duration(1<<retryCount) * time.Second
|
||||
utils.Log.Warnf("[OnedriveAPP] server errors %d while uploading, retrying after %v...", res.StatusCode, backoff)
|
||||
time.Sleep(backoff)
|
||||
return fmt.Errorf("server error: %d", res.StatusCode)
|
||||
case res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200:
|
||||
data, _ := io.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
return errors.New(string(data))
|
||||
default:
|
||||
res.Body.Close()
|
||||
retryCount = 0
|
||||
return nil
|
||||
}
|
||||
},
|
||||
retry.Attempts(3),
|
||||
retry.DelayType(retry.BackOffDelay),
|
||||
retry.Delay(time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finish += byteSize
|
||||
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user