Compare commits

..

1 Commits

Author SHA1 Message Date
18a4aa1a29 fix(deps): update module gorm.io/driver/postgres to v1.6.0 2025-08-13 17:31:30 +00:00
17 changed files with 110 additions and 284 deletions

View File

@ -1,38 +0,0 @@
name: Sync to Gitee
on:
push:
branches:
- main
workflow_dispatch:
jobs:
sync:
runs-on: ubuntu-latest
name: Sync GitHub to Gitee
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup SSH
run: |
mkdir -p ~/.ssh
echo "${{ secrets.GITEE_SSH_PRIVATE_KEY }}" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
ssh-keyscan gitee.com >> ~/.ssh/known_hosts
- name: Create single commit and push
run: |
git config user.name "GitHub Actions"
git config user.email "actions@github.com"
# Create a new branch
git checkout --orphan new-main
git add .
git commit -m "Sync from GitHub: $(date)"
# Add Gitee remote and force push
git remote add gitee ${{ vars.GITEE_REPO_URL }}
git push --force gitee new-main:main

View File

@ -6,9 +6,10 @@ services:
ports: ports:
- '5244:5244' - '5244:5244'
- '5245:5245' - '5245:5245'
user: '0:0'
environment: environment:
- PUID=0
- PGID=0
- UMASK=022 - UMASK=022
- TZ=Asia/Shanghai - TZ=UTC
container_name: openlist container_name: openlist
image: 'openlistteam/openlist:latest' image: 'openlistteam/openlist:latest'

View File

@ -70,8 +70,6 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
var reader *stream.SectionReader var reader *stream.SectionReader
var rateLimitedRd io.Reader var rateLimitedRd io.Reader
sliceMD5 := "" sliceMD5 := ""
// 表单
b := bytes.NewBuffer(make([]byte, 0, 2048))
threadG.GoWithLifecycle(errgroup.Lifecycle{ threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error { Before: func(ctx context.Context) error {
if reader == nil { if reader == nil {
@ -86,6 +84,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
if err != nil { if err != nil {
return err return err
} }
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
} }
return nil return nil
}, },
@ -93,8 +92,9 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
// 重置分片reader位置因为HashReader、上一次失败已经读取到分片EOF // 重置分片reader位置因为HashReader、上一次失败已经读取到分片EOF
reader.Seek(0, io.SeekStart) reader.Seek(0, io.SeekStart)
b.Reset() // 创建表单数据
w := multipart.NewWriter(b) var b bytes.Buffer
w := multipart.NewWriter(&b)
// 添加表单字段 // 添加表单字段
err = w.WriteField("preuploadID", createResp.Data.PreuploadID) err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
if err != nil { if err != nil {
@ -109,20 +109,21 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
return err return err
} }
// 写入文件内容 // 写入文件内容
_, err = w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber)) fw, err := w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
if err != nil {
return err
}
_, err = utils.CopyWithBuffer(fw, rateLimitedRd)
if err != nil { if err != nil {
return err return err
} }
headSize := b.Len()
err = w.Close() err = w.Close()
if err != nil { if err != nil {
return err return err
} }
head := bytes.NewReader(b.Bytes()[:headSize])
tail := bytes.NewReader(b.Bytes()[headSize:])
rateLimitedRd = driver.NewLimitedUploadStream(ctx, io.MultiReader(head, reader, tail))
// 创建请求并设置header // 创建请求并设置header
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", rateLimitedRd) req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", &b)
if err != nil { if err != nil {
return err return err
} }

View File

@ -13,7 +13,7 @@ type Addition struct {
ClientSecret string `json:"client_secret" required:"false" help:"Keep it empty if you don't have one"` ClientSecret string `json:"client_secret" required:"false" help:"Keep it empty if you don't have one"`
AccessToken string AccessToken string
RefreshToken string `json:"refresh_token" required:"true"` RefreshToken string `json:"refresh_token" required:"true"`
RootNamespaceId string `json:"RootNamespaceId" required:"false"` RootNamespaceId string
} }
var config = driver.Config{ var config = driver.Config{

View File

@ -175,13 +175,6 @@ func (d *Dropbox) finishUploadSession(ctx context.Context, toPath string, offset
} }
req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Authorization", "Bearer "+d.AccessToken) req.Header.Set("Authorization", "Bearer "+d.AccessToken)
if d.RootNamespaceId != "" {
apiPathRootJson, err := d.buildPathRootHeader()
if err != nil {
return err
}
req.Header.Set("Dropbox-API-Path-Root", apiPathRootJson)
}
uploadFinishArgs := UploadFinishArgs{ uploadFinishArgs := UploadFinishArgs{
Commit: struct { Commit: struct {
@ -226,13 +219,6 @@ func (d *Dropbox) startUploadSession(ctx context.Context) (string, error) {
} }
req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Authorization", "Bearer "+d.AccessToken) req.Header.Set("Authorization", "Bearer "+d.AccessToken)
if d.RootNamespaceId != "" {
apiPathRootJson, err := d.buildPathRootHeader()
if err != nil {
return "", err
}
req.Header.Set("Dropbox-API-Path-Root", apiPathRootJson)
}
req.Header.Set("Dropbox-API-Arg", "{\"close\":false}") req.Header.Set("Dropbox-API-Arg", "{\"close\":false}")
res, err := base.HttpClient.Do(req) res, err := base.HttpClient.Do(req)
@ -247,11 +233,3 @@ func (d *Dropbox) startUploadSession(ctx context.Context) (string, error) {
_ = res.Body.Close() _ = res.Body.Close()
return sessionId, nil return sessionId, nil
} }
func (d *Dropbox) buildPathRootHeader() (string, error) {
return utils.Json.MarshalToString(map[string]interface{}{
".tag": "root",
"root": d.RootNamespaceId,
})
}

View File

@ -132,7 +132,7 @@ func (d *Terabox) Remove(ctx context.Context, obj model.Obj) error {
func (d *Terabox) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error { func (d *Terabox) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
resp, err := base.RestyClient.R(). resp, err := base.RestyClient.R().
SetContext(ctx). SetContext(ctx).
Get("https://" + d.url_domain_prefix + "-data.terabox.com/rest/2.0/pcs/file?method=locateupload") Get("https://d.terabox.com/rest/2.0/pcs/file?method=locateupload")
if err != nil { if err != nil {
return err return err
} }

View File

@ -5,6 +5,11 @@ umask ${UMASK}
if [ "$1" = "version" ]; then if [ "$1" = "version" ]; then
./openlist version ./openlist version
else else
# Define the target directory path for openlist service
OPENLIST_DIR="/opt/service/start/openlist"
if [ ! -d "$OPENLIST_DIR" ]; then
cp -r /opt/service/stop/openlist "$OPENLIST_DIR" 2>/dev/null
fi
# Define the target directory path for aria2 service # Define the target directory path for aria2 service
ARIA2_DIR="/opt/service/start/aria2" ARIA2_DIR="/opt/service/start/aria2"
@ -14,12 +19,12 @@ else
mkdir -p "$ARIA2_DIR" mkdir -p "$ARIA2_DIR"
cp -r /opt/service/stop/aria2/* "$ARIA2_DIR" 2>/dev/null cp -r /opt/service/stop/aria2/* "$ARIA2_DIR" 2>/dev/null
fi fi
runsvdir /opt/service/start &
else else
# If aria2 should NOT run and target directory exists, remove it # If aria2 should NOT run and target directory exists, remove it
if [ -d "$ARIA2_DIR" ]; then if [ -d "$ARIA2_DIR" ]; then
rm -rf "$ARIA2_DIR" rm -rf "$ARIA2_DIR"
fi fi
fi fi
exec ./openlist server --no-prefix
exec runsvdir /opt/service/start
fi fi

8
go.mod
View File

@ -73,7 +73,7 @@ require (
google.golang.org/appengine v1.6.8 google.golang.org/appengine v1.6.8
gopkg.in/ldap.v3 v3.1.0 gopkg.in/ldap.v3 v3.1.0
gorm.io/driver/mysql v1.5.7 gorm.io/driver/mysql v1.5.7
gorm.io/driver/postgres v1.5.9 gorm.io/driver/postgres v1.6.0
gorm.io/driver/sqlite v1.5.6 gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.11 gorm.io/gorm v1.25.11
) )
@ -118,7 +118,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hekmon/cunits/v2 v2.1.0 // indirect github.com/hekmon/cunits/v2 v2.1.0 // indirect
github.com/ipfs/boxo v0.12.0 // indirect github.com/ipfs/boxo v0.12.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect github.com/klauspost/pgzip v1.2.6 // indirect
github.com/matoous/go-nanoid/v2 v2.1.0 // indirect github.com/matoous/go-nanoid/v2 v2.1.0 // indirect
github.com/microcosm-cc/bluemonday v1.0.27 github.com/microcosm-cc/bluemonday v1.0.27
@ -188,8 +188,8 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-cid v0.5.0
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect

8
go.sum
View File

@ -392,10 +392,16 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
@ -948,6 +954,8 @@ gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8=
gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE= gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=

View File

@ -77,10 +77,6 @@ func InitConfig() {
log.Fatalf("update config struct error: %+v", err) log.Fatalf("update config struct error: %+v", err)
} }
} }
if !conf.Conf.Force {
confFromEnv()
}
if conf.Conf.MaxConcurrency > 0 { if conf.Conf.MaxConcurrency > 0 {
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency} net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
} }
@ -96,31 +92,25 @@ func InitConfig() {
conf.MaxBufferLimit = conf.Conf.MaxBufferLimit * utils.MB conf.MaxBufferLimit = conf.Conf.MaxBufferLimit * utils.MB
} }
log.Infof("max buffer limit: %dMB", conf.MaxBufferLimit/utils.MB) log.Infof("max buffer limit: %dMB", conf.MaxBufferLimit/utils.MB)
if conf.Conf.MmapThreshold > 0 { if !conf.Conf.Force {
conf.MmapThreshold = conf.Conf.MmapThreshold * utils.MB confFromEnv()
} else {
conf.MmapThreshold = 0
} }
log.Infof("mmap threshold: %dMB", conf.Conf.MmapThreshold)
if len(conf.Conf.Log.Filter.Filters) == 0 { if len(conf.Conf.Log.Filter.Filters) == 0 {
conf.Conf.Log.Filter.Enable = false conf.Conf.Log.Filter.Enable = false
} }
// convert abs path // convert abs path
convertAbsPath := func(path *string) { convertAbsPath := func(path *string) {
if *path != "" && !filepath.IsAbs(*path) { if !filepath.IsAbs(*path) {
*path = filepath.Join(pwd, *path) *path = filepath.Join(pwd, *path)
} }
} }
convertAbsPath(&conf.Conf.Database.DBFile)
convertAbsPath(&conf.Conf.Scheme.CertFile)
convertAbsPath(&conf.Conf.Scheme.KeyFile)
convertAbsPath(&conf.Conf.Scheme.UnixFile)
convertAbsPath(&conf.Conf.Log.Name)
convertAbsPath(&conf.Conf.TempDir) convertAbsPath(&conf.Conf.TempDir)
convertAbsPath(&conf.Conf.BleveDir) convertAbsPath(&conf.Conf.BleveDir)
convertAbsPath(&conf.Conf.DistDir) convertAbsPath(&conf.Conf.Log.Name)
convertAbsPath(&conf.Conf.Database.DBFile)
if conf.Conf.DistDir != "" {
convertAbsPath(&conf.Conf.DistDir)
}
err := os.MkdirAll(conf.Conf.TempDir, 0o777) err := os.MkdirAll(conf.Conf.TempDir, 0o777)
if err != nil { if err != nil {
log.Fatalf("create temp dir error: %+v", err) log.Fatalf("create temp dir error: %+v", err)

View File

@ -120,7 +120,6 @@ type Config struct {
Log LogConfig `json:"log" envPrefix:"LOG_"` Log LogConfig `json:"log" envPrefix:"LOG_"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"` DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxBufferLimit int `json:"max_buffer_limitMB" env:"MAX_BUFFER_LIMIT_MB"` MaxBufferLimit int `json:"max_buffer_limitMB" env:"MAX_BUFFER_LIMIT_MB"`
MmapThreshold int `json:"mmap_thresholdMB" env:"MMAP_THRESHOLD_MB"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"` MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"` MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"` TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
@ -177,7 +176,6 @@ func DefaultConfig(dataDir string) *Config {
}, },
}, },
MaxBufferLimit: -1, MaxBufferLimit: -1,
MmapThreshold: 4,
MaxConnections: 0, MaxConnections: 0,
MaxConcurrency: 64, MaxConcurrency: 64,
TlsInsecureSkipVerify: true, TlsInsecureSkipVerify: true,

View File

@ -25,10 +25,7 @@ var PrivacyReg []*regexp.Regexp
var ( var (
// StoragesLoaded loaded success if empty // StoragesLoaded loaded success if empty
StoragesLoaded = false StoragesLoaded = false
// 单个Buffer最大限制
MaxBufferLimit = 16 * 1024 * 1024 MaxBufferLimit = 16 * 1024 * 1024
// 超过该阈值的Buffer将使用 mmap 分配,可主动释放内存
MmapThreshold = 4 * 1024 * 1024
) )
var ( var (
RawIndexHtml string RawIndexHtml string

View File

@ -1,6 +1,7 @@
package net package net
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -14,7 +15,6 @@ import (
"github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/rclone/rclone/lib/mmap"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/aws/aws-sdk-go/aws/awsutil" "github.com/aws/aws-sdk-go/aws/awsutil"
@ -255,10 +255,7 @@ func (d *downloader) sendChunkTask(newConcurrency bool) error {
finalSize += firstSize - minSize finalSize += firstSize - minSize
} }
} }
err := buf.Reset(int(finalSize)) buf.Reset(int(finalSize))
if err != nil {
return err
}
ch := chunk{ ch := chunk{
start: d.pos, start: d.pos,
size: finalSize, size: finalSize,
@ -648,13 +645,11 @@ func (mr MultiReadCloser) Close() error {
} }
type Buf struct { type Buf struct {
size int //expected size buffer *bytes.Buffer
ctx context.Context size int //expected size
offR int ctx context.Context
offW int off int
rw sync.Mutex rw sync.Mutex
buf []byte
mmap bool
readSignal chan struct{} readSignal chan struct{}
readPending bool readPending bool
@ -663,100 +658,76 @@ type Buf struct {
// NewBuf is a buffer that can have 1 read & 1 write at the same time. // NewBuf is a buffer that can have 1 read & 1 write at the same time.
// when read is faster write, immediately feed data to read after written // when read is faster write, immediately feed data to read after written
func NewBuf(ctx context.Context, maxSize int) *Buf { func NewBuf(ctx context.Context, maxSize int) *Buf {
br := &Buf{ return &Buf{
ctx: ctx, ctx: ctx,
size: maxSize, buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
size: maxSize,
readSignal: make(chan struct{}, 1), readSignal: make(chan struct{}, 1),
} }
if conf.MmapThreshold > 0 && maxSize >= conf.MmapThreshold {
m, err := mmap.Alloc(maxSize)
if err == nil {
br.buf = m
br.mmap = true
return br
}
}
br.buf = make([]byte, maxSize)
return br
} }
func (br *Buf) Reset(size int) {
func (br *Buf) Reset(size int) error {
br.rw.Lock() br.rw.Lock()
defer br.rw.Unlock() defer br.rw.Unlock()
if br.buf == nil { if br.buffer == nil {
return io.ErrClosedPipe return
}
if size > cap(br.buf) {
return fmt.Errorf("reset size %d exceeds max size %d", size, cap(br.buf))
} }
br.buffer.Reset()
br.size = size br.size = size
br.offR = 0 br.off = 0
br.offW = 0
return nil
} }
func (br *Buf) Read(p []byte) (int, error) { func (br *Buf) Read(p []byte) (n int, err error) {
if err := br.ctx.Err(); err != nil { if err := br.ctx.Err(); err != nil {
return 0, err return 0, err
} }
if len(p) == 0 { if len(p) == 0 {
return 0, nil return 0, nil
} }
if br.offR >= br.size { if br.off >= br.size {
return 0, io.EOF return 0, io.EOF
} }
for { for {
br.rw.Lock() br.rw.Lock()
if br.buf == nil { if br.buffer != nil {
br.rw.Unlock() n, err = br.buffer.Read(p)
return 0, io.ErrClosedPipe } else {
err = io.ErrClosedPipe
} }
if err != nil && err != io.EOF {
if br.offW < br.offR {
br.rw.Unlock() br.rw.Unlock()
return 0, io.ErrUnexpectedEOF return
} }
if br.offW == br.offR { if n > 0 {
br.readPending = true br.off += n
br.rw.Unlock() br.rw.Unlock()
select { return n, nil
case <-br.ctx.Done():
return 0, br.ctx.Err()
case _, ok := <-br.readSignal:
if !ok {
return 0, io.ErrClosedPipe
}
continue
}
} }
br.readPending = true
n := copy(p, br.buf[br.offR:br.offW])
br.offR += n
br.rw.Unlock() br.rw.Unlock()
if n < len(p) && br.offR >= br.size { // n==0, err==io.EOF
return n, io.EOF select {
case <-br.ctx.Done():
return 0, br.ctx.Err()
case _, ok := <-br.readSignal:
if !ok {
return 0, io.ErrClosedPipe
}
continue
} }
return n, nil
} }
} }
func (br *Buf) Write(p []byte) (int, error) { func (br *Buf) Write(p []byte) (n int, err error) {
if err := br.ctx.Err(); err != nil { if err := br.ctx.Err(); err != nil {
return 0, err return 0, err
} }
if len(p) == 0 {
return 0, nil
}
br.rw.Lock() br.rw.Lock()
defer br.rw.Unlock() defer br.rw.Unlock()
if br.buf == nil { if br.buffer == nil {
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
} }
if br.offW >= br.size { n, err = br.buffer.Write(p)
return 0, io.ErrShortWrite
}
n := copy(br.buf[br.offW:], p[:min(br.size-br.offW, len(p))])
br.offW += n
if br.readPending { if br.readPending {
br.readPending = false br.readPending = false
select { select {
@ -764,21 +735,12 @@ func (br *Buf) Write(p []byte) (int, error) {
default: default:
} }
} }
if n < len(p) { return
return n, io.ErrShortWrite
}
return n, nil
} }
func (br *Buf) Close() error { func (br *Buf) Close() {
br.rw.Lock() br.rw.Lock()
defer br.rw.Unlock() defer br.rw.Unlock()
var err error br.buffer = nil
if br.mmap {
err = mmap.Free(br.buf)
br.mmap = false
}
br.buf = nil
close(br.readSignal) close(br.readSignal)
return err
} }

View File

@ -15,7 +15,6 @@ import (
"github.com/OpenListTeam/OpenList/v4/pkg/buffer" "github.com/OpenListTeam/OpenList/v4/pkg/buffer"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/rclone/rclone/lib/mmap"
"go4.org/readerutil" "go4.org/readerutil"
) )
@ -61,12 +60,8 @@ func (f *FileStream) IsForceStreamUpload() bool {
} }
func (f *FileStream) Close() error { func (f *FileStream) Close() error {
if f.peekBuff != nil {
f.peekBuff.Reset()
f.peekBuff = nil
}
var err1, err2 error var err1, err2 error
err1 = f.Closers.Close() err1 = f.Closers.Close()
if errors.Is(err1, os.ErrClosed) { if errors.Is(err1, os.ErrClosed) {
err1 = nil err1 = nil
@ -79,6 +74,10 @@ func (f *FileStream) Close() error {
f.tmpFile = nil f.tmpFile = nil
} }
} }
if f.peekBuff != nil {
f.peekBuff.Reset()
f.peekBuff = nil
}
return errors.Join(err1, err2) return errors.Join(err1, err2)
} }
@ -195,19 +194,7 @@ func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
f.oriReader = f.Reader f.oriReader = f.Reader
} }
bufSize := maxCacheSize - int64(f.peekBuff.Len()) bufSize := maxCacheSize - int64(f.peekBuff.Len())
var buf []byte buf := make([]byte, bufSize)
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
m, err := mmap.Alloc(int(bufSize))
if err == nil {
f.Add(utils.CloseFunc(func() error {
return mmap.Free(m)
}))
buf = m
}
}
if buf == nil {
buf = make([]byte, bufSize)
}
n, err := io.ReadFull(f.oriReader, buf) n, err := io.ReadFull(f.oriReader, buf)
if bufSize != int64(n) { if bufSize != int64(n) {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err) return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)

View File

@ -7,13 +7,11 @@ import (
"io" "io"
"testing" "testing"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/http_range"
) )
func TestFileStream_RangeRead(t *testing.T) { func TestFileStream_RangeRead(t *testing.T) {
conf.MaxBufferLimit = 16 * 1024 * 1024
type args struct { type args struct {
httpRange http_range.Range httpRange http_range.Range
} }
@ -73,7 +71,7 @@ func TestFileStream_RangeRead(t *testing.T) {
} }
}) })
} }
t.Run("after", func(t *testing.T) { t.Run("after check", func(t *testing.T) {
if f.GetFile() == nil { if f.GetFile() == nil {
t.Error("not cached") t.Error("not cached")
} }

View File

@ -8,14 +8,13 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sync"
"github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/net" "github.com/OpenListTeam/OpenList/v4/internal/net"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range" "github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/pool"
"github.com/OpenListTeam/OpenList/v4/pkg/utils" "github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/rclone/rclone/lib/mmap"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -154,49 +153,26 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
type StreamSectionReader struct { type StreamSectionReader struct {
file model.FileStreamer file model.FileStreamer
off int64 off int64
bufPool *pool.Pool[[]byte] bufPool *sync.Pool
} }
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) { func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
ss := &StreamSectionReader{file: file} ss := &StreamSectionReader{file: file}
if file.GetFile() != nil { if file.GetFile() == nil {
return ss, nil maxBufferSize = min(maxBufferSize, int(file.GetSize()))
} if maxBufferSize > conf.MaxBufferLimit {
_, err := file.CacheFullAndWriter(up, nil)
maxBufferSize = min(maxBufferSize, int(file.GetSize())) if err != nil {
if maxBufferSize > conf.MaxBufferLimit { return nil, err
_, err := file.CacheFullAndWriter(up, nil) }
if err != nil { } else {
return nil, err ss.bufPool = &sync.Pool{
} New: func() any {
return ss, nil return make([]byte, maxBufferSize)
} },
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold { }
ss.bufPool = &pool.Pool[[]byte]{
New: func() []byte {
buf, err := mmap.Alloc(maxBufferSize)
if err == nil {
file.Add(utils.CloseFunc(func() error {
return mmap.Free(buf)
}))
} else {
buf = make([]byte, maxBufferSize)
}
return buf
},
}
} else {
ss.bufPool = &pool.Pool[[]byte]{
New: func() []byte {
return make([]byte, maxBufferSize)
},
} }
} }
file.Add(utils.CloseFunc(func() error {
ss.bufPool.Reset()
return nil
}))
return ss, nil return ss, nil
} }
@ -208,7 +184,7 @@ func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionRead
if off != ss.off { if off != ss.off {
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off) return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
} }
tempBuf := ss.bufPool.Get() tempBuf := ss.bufPool.Get().([]byte)
buf = tempBuf[:length] buf = tempBuf[:length]
n, err := io.ReadFull(ss.file, buf) n, err := io.ReadFull(ss.file, buf)
if int64(n) != length { if int64(n) != length {

View File

@ -1,37 +0,0 @@
package pool
import "sync"
type Pool[T any] struct {
New func() T
MaxCap int
cache []T
mu sync.Mutex
}
func (p *Pool[T]) Get() T {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.cache) == 0 {
return p.New()
}
item := p.cache[len(p.cache)-1]
p.cache = p.cache[:len(p.cache)-1]
return item
}
func (p *Pool[T]) Put(item T) {
p.mu.Lock()
defer p.mu.Unlock()
if p.MaxCap == 0 || len(p.cache) < int(p.MaxCap) {
p.cache = append(p.cache, item)
}
}
func (p *Pool[T]) Reset() {
p.mu.Lock()
defer p.mu.Unlock()
clear(p.cache)
p.cache = nil
}