fix(ftp): create a new connection for each download (#989)

This commit is contained in:
j2rong4cn
2025-08-06 20:35:01 +08:00
committed by GitHub
parent 52d7d819ad
commit 317d190b77
2 changed files with 24 additions and 55 deletions

View File

@ -2,10 +2,9 @@ package ftp
import ( import (
"context" "context"
"errors"
"io" "io"
stdpath "path" stdpath "path"
"sync"
"time"
"github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/errs" "github.com/OpenListTeam/OpenList/v4/internal/errs"
@ -72,45 +71,44 @@ func (d *FTP) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]m
return res, nil return res, nil
} }
func (d *FTP) Link(_ context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { func (d *FTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
ctx, cancel := context.WithCancel(context.Background())
conn, err := d._login(ctx) conn, err := d._login(ctx)
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} }
close := func() error {
_ = conn.Quit()
cancel()
return nil
}
path := encode(file.GetPath(), d.Encoding) path := encode(file.GetPath(), d.Encoding)
size := file.GetSize() size := file.GetSize()
mu := &sync.Mutex{}
resultRangeReader := func(context context.Context, httpRange http_range.Range) (io.ReadCloser, error) { resultRangeReader := func(context context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
length := httpRange.Length length := httpRange.Length
if length < 0 || httpRange.Start+length > size { if length < 0 || httpRange.Start+length > size {
length = size - httpRange.Start length = size - httpRange.Start
} }
mu.Lock() var c *ftp.ServerConn
defer mu.Unlock() if ctx == context {
r, err := conn.RetrFrom(path, uint64(httpRange.Start)) c = conn
if err != nil { } else {
_ = conn.Quit() var err error
conn, err = d._login(ctx) c, err = d._login(context)
if err == nil {
r, err = conn.RetrFrom(path, uint64(httpRange.Start))
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
r.SetDeadline(time.Now().Add(time.Second)) resp, err := c.RetrFrom(path, uint64(httpRange.Start))
return &FileReader{ if err != nil {
Response: r, return nil, err
Reader: io.LimitReader(r, length), }
ctx: context, var close utils.CloseFunc
if context == ctx {
close = resp.Close
} else {
close = func() error {
return errors.Join(resp.Close(), c.Quit())
}
}
return utils.ReadCloser{
Reader: io.LimitReader(resp, length),
Closer: close,
}, nil }, nil
} }
@ -118,7 +116,7 @@ func (d *FTP) Link(_ context.Context, file model.Obj, args model.LinkArgs) (*mod
RangeReader: &model.FileRangeReader{ RangeReader: &model.FileRangeReader{
RangeReaderIF: stream.RateLimitRangeReaderFunc(resultRangeReader), RangeReaderIF: stream.RateLimitRangeReaderFunc(resultRangeReader),
}, },
SyncClosers: utils.NewSyncClosers(utils.CloseFunc(close)), SyncClosers: utils.NewSyncClosers(utils.CloseFunc(conn.Quit)),
}, nil }, nil
} }

View File

@ -2,14 +2,10 @@ package ftp
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io"
"os"
"time" "time"
"github.com/OpenListTeam/OpenList/v4/pkg/singleflight" "github.com/OpenListTeam/OpenList/v4/pkg/singleflight"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/jlaffaye/ftp" "github.com/jlaffaye/ftp"
) )
@ -45,28 +41,3 @@ func (d *FTP) _login(ctx context.Context) (*ftp.ServerConn, error) {
} }
return conn, nil return conn, nil
} }
type FileReader struct {
*ftp.Response
io.Reader
ctx context.Context
}
func (r *FileReader) Read(buf []byte) (int, error) {
n := 0
for n < len(buf) {
w, err := r.Reader.Read(buf[n:])
if utils.IsCanceled(r.ctx) {
return n, r.ctx.Err()
}
n += w
if errors.Is(err, os.ErrDeadlineExceeded) {
r.Response.SetDeadline(time.Now().Add(time.Second))
continue
}
if err != nil || w == 0 {
return n, err
}
}
return n, nil
}