Files
OpenList/internal/stream/limit.go
Kuingsmile fdcc2f136e chore: change module name to OpenListTeam/OpenList (#2)
* Enable blank issue

* chore(README.md): update docs (temporally)

* Update FUNDING.yml

* chore: purge README.md

* chore: change module name to OpenListTeam/OpenList

* fix: fix link errors

* chore: remove v3 in module name

* fix: resolve some conficts

* fix: resolve conficts

* docs: update with latest file

---------

Co-authored-by: ShenLin <773933146@qq.com>
Co-authored-by: Hantong Chen <cxwdyx620@gmail.com>
Co-authored-by: joshua <i@joshua.su>
Co-authored-by: Hantong Chen <70561268+cxw620@users.noreply.github.com>
2025-06-12 22:02:46 +08:00

154 lines
2.8 KiB
Go

package stream
import (
"context"
"io"
"time"
"github.com/OpenListTeam/OpenList/internal/model"
"github.com/OpenListTeam/OpenList/pkg/http_range"
"github.com/OpenListTeam/OpenList/pkg/utils"
"golang.org/x/time/rate"
)
type Limiter interface {
Limit() rate.Limit
Burst() int
TokensAt(time.Time) float64
Tokens() float64
Allow() bool
AllowN(time.Time, int) bool
Reserve() *rate.Reservation
ReserveN(time.Time, int) *rate.Reservation
Wait(context.Context) error
WaitN(context.Context, int) error
SetLimit(rate.Limit)
SetLimitAt(time.Time, rate.Limit)
SetBurst(int)
SetBurstAt(time.Time, int)
}
var (
ClientDownloadLimit Limiter
ClientUploadLimit Limiter
ServerDownloadLimit Limiter
ServerUploadLimit Limiter
)
type RateLimitReader struct {
io.Reader
Limiter Limiter
Ctx context.Context
}
func (r *RateLimitReader) Read(p []byte) (n int, err error) {
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
return 0, r.Ctx.Err()
}
n, err = r.Reader.Read(p)
if err != nil {
return
}
if r.Limiter != nil {
if r.Ctx == nil {
r.Ctx = context.Background()
}
err = r.Limiter.WaitN(r.Ctx, n)
}
return
}
func (r *RateLimitReader) Close() error {
if c, ok := r.Reader.(io.Closer); ok {
return c.Close()
}
return nil
}
type RateLimitWriter struct {
io.Writer
Limiter Limiter
Ctx context.Context
}
func (w *RateLimitWriter) Write(p []byte) (n int, err error) {
if w.Ctx != nil && utils.IsCanceled(w.Ctx) {
return 0, w.Ctx.Err()
}
n, err = w.Writer.Write(p)
if err != nil {
return
}
if w.Limiter != nil {
if w.Ctx == nil {
w.Ctx = context.Background()
}
err = w.Limiter.WaitN(w.Ctx, n)
}
return
}
func (w *RateLimitWriter) Close() error {
if c, ok := w.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}
type RateLimitFile struct {
model.File
Limiter Limiter
Ctx context.Context
}
func (r *RateLimitFile) Read(p []byte) (n int, err error) {
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
return 0, r.Ctx.Err()
}
n, err = r.File.Read(p)
if err != nil {
return
}
if r.Limiter != nil {
if r.Ctx == nil {
r.Ctx = context.Background()
}
err = r.Limiter.WaitN(r.Ctx, n)
}
return
}
func (r *RateLimitFile) ReadAt(p []byte, off int64) (n int, err error) {
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
return 0, r.Ctx.Err()
}
n, err = r.File.ReadAt(p, off)
if err != nil {
return
}
if r.Limiter != nil {
if r.Ctx == nil {
r.Ctx = context.Background()
}
err = r.Limiter.WaitN(r.Ctx, n)
}
return
}
type RateLimitRangeReadCloser struct {
model.RangeReadCloserIF
Limiter Limiter
}
func (rrc *RateLimitRangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
rc, err := rrc.RangeReadCloserIF.RangeRead(ctx, httpRange)
if err != nil {
return nil, err
}
return &RateLimitReader{
Reader: rc,
Limiter: rrc.Limiter,
Ctx: ctx,
}, nil
}