Skip to content

Commit

Permalink
Make Listener compatible with net.Listener
Browse files Browse the repository at this point in the history
Breaking changes:

1. Remove timeout argument from listener.Close
Already accepted connections should be active after listener.Close().
Previously, the connections were broken after the timeout.
In the new implementation, readLoop routine is remained after
listener.Close() and automatically stopped after closing
all connections. The final conn/listener.Close() returns
error from underlying connection after stopping the routine.

2. Unexpose listener implementation
Return as net.Listener interface instead of *udp.Listener.
  • Loading branch information
at-wat committed Feb 7, 2020
1 parent dca3dfb commit 5abf71d
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 110 deletions.
4 changes: 2 additions & 2 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func assertE2ECommunication(clientConfig, serverConfig *dtls.Config, serverPort
clientConn net.Conn
serverMutex sync.Mutex
serverConn net.Conn
serverListener *dtls.Listener
serverListener net.Listener
serverReady = make(chan struct{})
errChan = make(chan error)
clientChan = make(chan string)
Expand Down Expand Up @@ -141,7 +141,7 @@ func assertE2ECommunication(clientConfig, serverConfig *dtls.Config, serverPort
t.Fatal(err)
}

if err := serverListener.Close(2 * time.Second); err != nil {
if err := serverListener.Close(); err != nil {
t.Fatal(err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion examples/listen-psk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
listener, err := dtls.Listen("udp", addr, config)
util.Check(err)
defer func() {
util.Check(listener.Close(5 * time.Second))
util.Check(listener.Close())
}()

fmt.Println("Listening")
Expand Down
2 changes: 1 addition & 1 deletion examples/listen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
listener, err := dtls.Listen("udp", addr, config)
util.Check(err)
defer func() {
util.Check(listener.Close(5 * time.Second))
util.Check(listener.Close())
}()

fmt.Println("Listening")
Expand Down
136 changes: 61 additions & 75 deletions internal/udp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
)

const receiveMTU = 8192
const closeRetryInterval = 100 * time.Millisecond

var errClosedListener = errors.New("udp: listener closed")

// Listener augments a connection-oriented Listener over a UDP PacketConn
type Listener struct {
pConn *net.UDPConn

lock sync.RWMutex
accepting bool
accepting atomic.Value // bool
acceptCh chan *Conn
doneCh chan struct{}
doneOnce sync.Once

conns map[string]*Conn
connLock sync.Mutex
conns map[string]*Conn
connWG sync.WaitGroup

readWG sync.WaitGroup
errClose atomic.Value // error
}

// Accept waits for and returns the next connection to the listener.
// You have to either close or read on all connection that are created.
func (l *Listener) Accept() (*Conn, error) {
select {
case c := <-l.acceptCh:
l.connWG.Add(1)
return c, nil

case <-l.doneCh:
Expand All @@ -41,41 +45,29 @@ func (l *Listener) Accept() (*Conn, error) {

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *Listener) Close(shutdownTimeout time.Duration) error {
l.lock.Lock()
defer l.lock.Unlock()

func (l *Listener) Close() error {
var err error
l.doneOnce.Do(func() {
l.accepting = false
l.connWG.Done()
l.accepting.Store(false)
close(l.doneCh)
err = l.cleanupWithTimeout(shutdownTimeout)
})

return err
}
l.connLock.Lock()
nConns := len(l.conns)
l.connLock.Unlock()

func (l *Listener) cleanupWithTimeout(shutdownTimeout time.Duration) error {
timeoutTimer := time.NewTimer(shutdownTimeout)
for {
select {
case <-time.After(closeRetryInterval):
if len(l.conns) == 0 {
return l.cleanup()
if nConns == 0 {
// Wait if this is the final connection
l.readWG.Wait()
if errClose, ok := l.errClose.Load().(error); ok {
err = errClose
}
case <-timeoutTimer.C:
return l.cleanup()
} else {
err = nil
}
}
}
})

// cleanup closes the packet conn if it is no longer used
// The caller should hold the read lock.
func (l *Listener) cleanup() error {
if !l.accepting && len(l.conns) == 0 {
return l.pConn.Close()
}
return nil
return err
}

// Addr returns the listener's network address.
Expand All @@ -91,14 +83,23 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
}

l := &Listener{
pConn: conn,
acceptCh: make(chan *Conn),
conns: make(map[string]*Conn),
accepting: true,
doneCh: make(chan struct{}),
pConn: conn,
acceptCh: make(chan *Conn),
conns: make(map[string]*Conn),
doneCh: make(chan struct{}),
}
l.accepting.Store(true)
l.connWG.Add(1)
l.readWG.Add(2) // wait readLoop and Close execution routine

go l.readLoop()
go func() {
l.connWG.Wait()
if err := l.pConn.Close(); err != nil {
l.errClose.Store(err)
}
l.readWG.Done()
}()

return l, nil
}
Expand All @@ -108,9 +109,9 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
// It can therefore not be ended until all Conns are closed.
// 2. Creating a new Conn when receiving from a new remote.
func (l *Listener) readLoop() {
defer l.readWG.Done()
buf := make([]byte, receiveMTU)

readLoop:
for {
n, raddr, err := l.pConn.ReadFrom(buf)
if err != nil {
Expand All @@ -120,23 +121,18 @@ readLoop:
if err != nil {
continue
}
select {
case cBuf := <-conn.readCh:
n = copy(cBuf, buf[:n])
conn.sizeCh <- n
case <-conn.doneCh:
continue readLoop
}
cBuf := <-conn.readCh
n = copy(cBuf, buf[:n])
conn.sizeCh <- n
}
}

func (l *Listener) getConn(raddr net.Addr) (*Conn, error) {
l.lock.Lock()
defer l.lock.Unlock()

l.connLock.Lock()
defer l.connLock.Unlock()
conn, ok := l.conns[raddr.String()]
if !ok {
if !l.accepting {
if !l.accepting.Load().(bool) {
return nil, errClosedListener
}
conn = l.newConn(raddr)
Expand All @@ -155,7 +151,6 @@ type Conn struct {
readCh chan []byte
sizeCh chan int

lock sync.RWMutex
doneCh chan struct{}
doneOnce sync.Once
}
Expand Down Expand Up @@ -183,46 +178,37 @@ func (c *Conn) Read(p []byte) (int, error) {

// Write writes len(p) bytes from p to the DTLS connection
func (c *Conn) Write(p []byte) (n int, err error) {
c.lock.Lock()
l := c.listener
c.lock.Unlock()

if l == nil {
return 0, io.EOF
}

return l.pConn.WriteTo(p, c.rAddr)
return c.listener.pConn.WriteTo(p, c.rAddr)
}

// Close closes the conn and releases any Read calls
func (c *Conn) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

var err error
c.doneOnce.Do(func() {
c.listener.connWG.Done()
close(c.doneCh)
c.listener.lock.Lock()
c.listener.connLock.Lock()
delete(c.listener.conns, c.rAddr.String())
err = c.listener.cleanup()
c.listener.lock.Unlock()
c.listener = nil
nConns := len(c.listener.conns)
c.listener.connLock.Unlock()

if nConns == 0 && !c.listener.accepting.Load().(bool) {
// Wait if this is the final connection
c.listener.readWG.Wait()
if errClose, ok := c.listener.errClose.Load().(error); ok {
err = errClose
}
} else {
err = nil
}
})

return err
}

// LocalAddr is a stub
func (c *Conn) LocalAddr() net.Addr {
c.lock.Lock()
l := c.listener
c.lock.Unlock()

if l == nil {
return nil
}

return l.pConn.LocalAddr()
return c.listener.pConn.LocalAddr()
}

// RemoteAddr is a stub
Expand Down
97 changes: 74 additions & 23 deletions internal/udp/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func stressDuplex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = listener.Close(5 * time.Second)
err = listener.Close()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestListenerCloseTimeout(t *testing.T) {
t.Fatal(err)
}

err = listener.Close(500 * time.Millisecond)
err = listener.Close()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,24 +149,75 @@ func getConfig() (string, *net.UDPAddr) {
return "udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}
}

// func TestConnClose(t *testing.T) {
// lim := test.TimeOut(time.Second * 5)
// defer lim.Stop()
//
// listener, ca, cb, err := pipe()
// if err != nil {
// t.Fatal(err)
// }
// err = ca.Close()
// if err != nil {
// t.Fatalf("Failed to close A side: %v\n", err)
// }
// err = cb.Close()
// if err != nil {
// t.Fatalf("Failed to close B side: %v\n", err)
// }
// err = listener.Close(1 * time.Second)
// if err != nil {
// t.Fatalf("Failed to close listener: %v\n", err)
// }
// }
func TestConnClose(t *testing.T) {
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

t.Run("Close", func(t *testing.T) {
// Check for leaking routines
report := test.CheckRoutines(t)
defer report()

listener, ca, cb, errPipe := pipe()
if errPipe != nil {
t.Fatal(errPipe)
}
if err := ca.Close(); err != nil {
t.Errorf("Failed to close A side: %v", err)
}
if err := cb.Close(); err != nil {
t.Errorf("Failed to close B side: %v", err)
}
if err := listener.Close(); err != nil {
t.Errorf("Failed to close listener: %v", err)
}
})
t.Run("CloseError1", func(t *testing.T) {
// Check for leaking routines
report := test.CheckRoutines(t)
defer report()

listener, ca, cb, errPipe := pipe()
if errPipe != nil {
t.Fatal(errPipe)
}
// Close listener.pConn to inject error.
if err := listener.pConn.Close(); err != nil {
t.Error(err)
}

if err := cb.Close(); err != nil {
t.Errorf("Failed to close A side: %v", err)
}
if err := ca.Close(); err != nil {
t.Errorf("Failed to close B side: %v", err)
}
if err := listener.Close(); err == nil {
t.Errorf("Error is not propagated to Listener.Close")
}
})
t.Run("CloseError2", func(t *testing.T) {
// Check for leaking routines
report := test.CheckRoutines(t)
defer report()

listener, ca, cb, errPipe := pipe()
if errPipe != nil {
t.Fatal(errPipe)
}
// Close listener.pConn to inject error.
if err := listener.pConn.Close(); err != nil {
t.Error(err)
}

if err := cb.Close(); err != nil {
t.Errorf("Failed to close A side: %v", err)
}
if err := listener.Close(); err != nil {
t.Errorf("Failed to close listener: %v", err)
}
if err := ca.Close(); err == nil {
t.Errorf("Error is not propagated to Conn.Close")
}
})
}
Loading

0 comments on commit 5abf71d

Please sign in to comment.