Skip to content

Commit

Permalink
Fix routine leak on close
Browse files Browse the repository at this point in the history
Read from channel was blocked if closed when there is unread data.
Unblock channel read on close.
  • Loading branch information
at-wat authored and Sean-Der committed Dec 15, 2019
1 parent b4351bb commit 486ad1d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [Hayden James](https://github.com/hjames9)
* [Jozef Kralik](https://github.com/jkralik)
* [Robert Eperjesi](https://github.com/epes)
* [Atsushi Watanabe](https://github.com/at-wat)

### License
MIT License - see [LICENSE](LICENSE) for full text
10 changes: 9 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Conn struct {

bufferedPackets []*packet

connectionClosed *Closer // Closed on connection close and unblock read

connErr atomic.Value
log logging.LeveledLogger
}
Expand Down Expand Up @@ -128,6 +130,7 @@ func createConn(nextConn net.Conn, flightHandler flightHandler, handshakeMessage
}

handshakeDoneSignal := NewCloser()
connectionClosed := NewCloser()

c := &Conn{
nextConn: nextConn,
Expand Down Expand Up @@ -156,6 +159,7 @@ func createConn(nextConn net.Conn, flightHandler flightHandler, handshakeMessage
decrypted: make(chan []byte),
workerTicker: time.NewTicker(workerInterval),
handshakeDoneSignal: handshakeDoneSignal,
connectionClosed: connectionClosed,
log: logger,
}

Expand Down Expand Up @@ -624,7 +628,10 @@ func (c *Conn) handleIncomingPacket(buf []byte) (*alert, error) {
return &alert{alertLevelFatal, alertUnexpectedMessage}, fmt.Errorf("ApplicationData with epoch of 0")
}

c.decrypted <- content.data
select {
case c.decrypted <- content.data:
case <-c.connectionClosed.Done():
}
default:
return &alert{alertLevelFatal, alertUnexpectedMessage}, fmt.Errorf("unhandled contentType %d", content.contentType())
}
Expand Down Expand Up @@ -709,6 +716,7 @@ func (c *Conn) stopWithError(err error) {
c.workerTicker.Stop()

c.handshakeDoneSignal.Close()
c.connectionClosed.Close()
}

func (c *Conn) getConnErr() error {
Expand Down
27 changes: 27 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,33 @@ func stressDuplex(t *testing.T) {
}
}

func TestRoutineLeakOnClose(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(5 * time.Second)
defer lim.Stop()

// Check for leaking routines
report := test.CheckRoutines(t)
defer report()

ca, cb, err := pipeMemory()
if err != nil {
t.Fatal(err)
}

if _, err := ca.Write(make([]byte, 100)); err != nil {
t.Fatal(err)
}
if err := cb.Close(); err != nil {
t.Fatal(err)
}
if err := ca.Close(); err != nil {
t.Fatal(err)
}
// Packet is sent, but not read.
// inboundLoop routine should not be leaked.
}

func pipeMemory() (*Conn, *Conn, error) {
// In memory pipe
ca, cb := net.Pipe()
Expand Down

0 comments on commit 486ad1d

Please sign in to comment.