mirror of
https://github.com/OpenListTeam/OpenList.git
synced 2025-07-18 17:38:07 +08:00
fix(net): goroutine deadlock
This commit is contained in:
@ -3,6 +3,7 @@ package net
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -119,7 +120,7 @@ type ConcurrencyLimit struct {
|
||||
Limit int // 需要大于0
|
||||
}
|
||||
|
||||
var ErrExceedMaxConcurrency = fmt.Errorf("ExceedMaxConcurrency")
|
||||
var ErrExceedMaxConcurrency = errors.New("ExceedMaxConcurrency")
|
||||
|
||||
func (l *ConcurrencyLimit) sub() error {
|
||||
l._m.Lock()
|
||||
@ -279,10 +280,9 @@ func (d *downloader) interrupt() error {
|
||||
err := fmt.Errorf("interrupted")
|
||||
d.err = err
|
||||
}
|
||||
if d.chunkChannel != nil {
|
||||
close(d.chunkChannel)
|
||||
if d.bufs != nil {
|
||||
d.cancel(err)
|
||||
close(d.chunkChannel)
|
||||
d.chunkChannel = nil
|
||||
for _, buf := range d.bufs {
|
||||
buf.Close()
|
||||
}
|
||||
@ -291,8 +291,6 @@ func (d *downloader) interrupt() error {
|
||||
d.concurrency = -d.concurrency
|
||||
}
|
||||
log.Debugf("maxConcurrency:%d", d.cfg.Concurrency+d.concurrency)
|
||||
} else {
|
||||
log.Debug("close of closed channel")
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -314,31 +312,35 @@ func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) {
|
||||
// downloadPart is an individual goroutine worker reading from the ch channel
|
||||
// and performing Http request on the data with a given byte range.
|
||||
func (d *downloader) downloadPart() {
|
||||
//defer d.wg.Done()
|
||||
defer d.concurrencyFinish()
|
||||
for {
|
||||
c, ok := <-d.chunkChannel
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if d.getErr() != nil {
|
||||
// Drain the channel if there is an error, to prevent deadlocking
|
||||
// of download producer.
|
||||
break
|
||||
}
|
||||
if err := d.downloadChunk(&c); err != nil {
|
||||
if err == errCancelConcurrency {
|
||||
break
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case c, ok := <-d.chunkChannel:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err == context.Canceled {
|
||||
if e := context.Cause(d.ctx); e != nil {
|
||||
err = e
|
||||
if d.getErr() != nil {
|
||||
// Drain the channel if there is an error, to prevent deadlocking
|
||||
// of download producer.
|
||||
return
|
||||
}
|
||||
if err := d.downloadChunk(&c); err != nil {
|
||||
if err == errCancelConcurrency {
|
||||
return
|
||||
}
|
||||
if err == context.Canceled {
|
||||
if e := context.Cause(d.ctx); e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
d.setErr(err)
|
||||
d.cancel(err)
|
||||
return
|
||||
}
|
||||
d.setErr(err)
|
||||
d.cancel(err)
|
||||
}
|
||||
}
|
||||
d.concurrencyFinish()
|
||||
}
|
||||
|
||||
// downloadChunk downloads the chunk
|
||||
@ -390,8 +392,8 @@ func (d *downloader) downloadChunk(ch *chunk) error {
|
||||
return err
|
||||
}
|
||||
|
||||
var errCancelConcurrency = fmt.Errorf("cancel concurrency")
|
||||
var errInfiniteRetry = fmt.Errorf("infinite retry")
|
||||
var errCancelConcurrency = errors.New("cancel concurrency")
|
||||
var errInfiniteRetry = errors.New("infinite retry")
|
||||
|
||||
func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int64, error) {
|
||||
resp, err := d.cfg.HttpClient(d.ctx, params)
|
||||
|
@ -120,7 +120,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
reader, err := RangeReadCloser.RangeRead(ctx, http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
code = http.StatusRequestedRangeNotSatisfiable
|
||||
if err == ErrExceedMaxConcurrency {
|
||||
if errors.Is(err, ErrExceedMaxConcurrency) {
|
||||
code = http.StatusTooManyRequests
|
||||
}
|
||||
http.Error(w, err.Error(), code)
|
||||
@ -143,7 +143,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
sendContent, err = RangeReadCloser.RangeRead(ctx, ra)
|
||||
if err != nil {
|
||||
code = http.StatusRequestedRangeNotSatisfiable
|
||||
if err == ErrExceedMaxConcurrency {
|
||||
if errors.Is(err, ErrExceedMaxConcurrency) {
|
||||
code = http.StatusTooManyRequests
|
||||
}
|
||||
http.Error(w, err.Error(), code)
|
||||
@ -205,7 +205,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
log.Warnf("Maybe size incorrect or reader not giving correct/full data, or connection closed before finish. written bytes: %d ,sendSize:%d, ", written, sendSize)
|
||||
}
|
||||
code = http.StatusInternalServerError
|
||||
if err == ErrExceedMaxConcurrency {
|
||||
if errors.Is(err, ErrExceedMaxConcurrency) {
|
||||
code = http.StatusTooManyRequests
|
||||
}
|
||||
w.WriteHeader(code)
|
||||
|
Reference in New Issue
Block a user