mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-4537: TSimpleServer can exit AcceptLoop() without releasing lock
Client: go This closes #1523.
This commit is contained in:
parent
930428438c
commit
8a83b041d2
@ -125,26 +125,38 @@ func (p *TSimpleServer) Listen() error {
|
||||
return p.serverTransport.Listen()
|
||||
}
|
||||
|
||||
func (p *TSimpleServer) innerAccept() (int32, error) {
|
||||
client, err := p.serverTransport.Accept()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
closed := atomic.LoadInt32(&p.closed)
|
||||
if closed != 0 {
|
||||
return closed, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if client != nil {
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
if err := p.processRequests(client); err != nil {
|
||||
log.Println("error processing request:", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (p *TSimpleServer) AcceptLoop() error {
|
||||
for {
|
||||
client, err := p.serverTransport.Accept()
|
||||
p.mu.Lock()
|
||||
if atomic.LoadInt32(&p.closed) != 0 {
|
||||
return nil
|
||||
}
|
||||
closed, err := p.innerAccept()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if client != nil {
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
if err := p.processRequests(client); err != nil {
|
||||
log.Println("error processing request:", err)
|
||||
}
|
||||
}()
|
||||
if closed != 0 {
|
||||
return nil
|
||||
}
|
||||
p.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,8 @@ package thrift
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
"errors"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type mockServerTransport struct {
|
||||
@ -122,6 +123,34 @@ func TestWaitRace(t *testing.T) {
|
||||
|
||||
serv := NewTSimpleServer2(proc, trans)
|
||||
go serv.Serve()
|
||||
time.Sleep(1)
|
||||
runtime.Gosched()
|
||||
serv.Stop()
|
||||
}
|
||||
|
||||
func TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
|
||||
proc := &mockProcessor{
|
||||
ProcessFunc: func(in, out TProtocol) (bool, TException) {
|
||||
return false, nil
|
||||
},
|
||||
}
|
||||
|
||||
trans := &mockServerTransport{
|
||||
ListenFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
AcceptFunc: func() (TTransport, error) {
|
||||
return nil, errors.New("no sir")
|
||||
},
|
||||
CloseFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
InterruptFunc: func() error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
serv := NewTSimpleServer2(proc, trans)
|
||||
go serv.Serve()
|
||||
runtime.Gosched()
|
||||
serv.Stop()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user