pref(net): improve concurrent read and write buffer (#416)

* pref(net): improve concurrent read and write buffer

* chore
This commit is contained in:
j2rong4cn
2025-06-27 15:18:11 +08:00
committed by GitHub
parent d89d0a05b4
commit 23cfe8090b

View File

@ -619,6 +619,9 @@ type Buf struct {
ctx context.Context
off int
rw sync.Mutex
readSignal chan struct{}
readPending bool
}
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
@ -628,9 +631,16 @@ func NewBuf(ctx context.Context, maxSize int) *Buf {
ctx: ctx,
buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
size: maxSize,
readSignal: make(chan struct{}, 1),
}
}
func (br *Buf) Reset(size int) {
br.rw.Lock()
defer br.rw.Unlock()
if br.buffer == nil {
return
}
br.buffer.Reset()
br.size = size
br.off = 0
@ -646,27 +656,34 @@ func (br *Buf) Read(p []byte) (n int, err error) {
if br.off >= br.size {
return 0, io.EOF
}
br.rw.Lock()
n, err = br.buffer.Read(p)
br.rw.Unlock()
if err == nil {
br.off += n
return n, err
}
if err != io.EOF {
return n, err
}
if n != 0 {
br.off += n
return n, nil
}
// n==0, err==io.EOF
// wait for new write for 200ms
select {
case <-br.ctx.Done():
return 0, br.ctx.Err()
case <-time.After(time.Millisecond * 200):
return 0, nil
for {
br.rw.Lock()
if br.buffer != nil {
n, err = br.buffer.Read(p)
} else {
err = io.ErrClosedPipe
}
br.rw.Unlock()
if err != nil && err != io.EOF {
return
}
if n > 0 {
br.off += n
return n, nil
}
br.rw.Lock()
br.readPending = true
br.rw.Unlock()
// n==0, err==io.EOF
select {
case <-br.ctx.Done():
return 0, br.ctx.Err()
case _, ok := <-br.readSignal:
if !ok {
return 0, io.ErrClosedPipe
}
continue
}
}
}
@ -676,10 +693,23 @@ func (br *Buf) Write(p []byte) (n int, err error) {
}
br.rw.Lock()
defer br.rw.Unlock()
if br.buffer == nil {
return 0, io.ErrClosedPipe
}
n, err = br.buffer.Write(p)
if br.readPending {
br.readPending = false
select {
case br.readSignal <- struct{}{}:
default:
}
}
return
}
func (br *Buf) Close() {
br.rw.Lock()
defer br.rw.Unlock()
br.buffer = nil
close(br.readSignal)
}