Skip to content

Commit

Permalink
Adding ability to listen for notifications from postgres (#677)
Browse files Browse the repository at this point in the history
* Adding ability to listen for notifications from postgres

* Using a channel to ensure the listen spec is actually ran and checked

* Update src/avram/database.cr

Co-authored-by: Matthew McGarvey <[email protected]>

Co-authored-by: Matthew McGarvey <[email protected]>
  • Loading branch information
jwoertink and matthewmcgarvey authored May 26, 2021
1 parent 02b786a commit dcd0ee2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
17 changes: 17 additions & 0 deletions spec/database_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require "./spec_helper"

describe Avram::Database do
describe "listen" do
it "yields the payload from a notify" do
done = Channel(Nil).new
TestDatabase.listen("dinner_time") do |notification|
notification.channel.should eq "dinner_time"
notification.payload.should eq "Tacos"
done.send(nil)
end

TestDatabase.exec("SELECT pg_notify('dinner_time', 'Tacos')")
done.receive
end
end
end
6 changes: 6 additions & 0 deletions src/avram/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ class Avram::Connection
try_connection!
end

def connect_listen(*channels : String, &block : PQ::Notification ->) : Nil
PG.connect_listen(@connection_string, *channels, &block)
rescue DB::ConnectionRefused
raise ConnectionError.new(connection_uri, database_class: @database_class)
end

def try_connection!
DB.open(@connection_string)
rescue DB::ConnectionRefused
Expand Down
25 changes: 24 additions & 1 deletion src/avram/database.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ abstract class Avram::Database
new.delete
end

# Listens for `pg_notify()` calls on each channel in `channels`
# Yields a `PQ::Notification` object with `channel`, `payload`, and `pid`.
#
# ```
# # pg_notify("callback", "123")
# AppDatabase.listen("callback", "jobs") do |notification|
# notification.channel # => "callback"
# notification.payload # => "123"
# end
# ```
def self.listen(*channels : String, &block : PQ::Notification ->) : Nil
new.listen(*channels, &block)
end

@@database_info : DatabaseInfo?

def self.database_info : DatabaseInfo
Expand Down Expand Up @@ -131,11 +145,20 @@ abstract class Avram::Database
yield current_transaction.try(&.connection) || db
end

# :nodoc:
def listen(*channels : String, &block : PQ::Notification ->) : Nil
connection.connect_listen(*channels, &block)
end

private def connection : Avram::Connection
Avram::Connection.new(url, database_class: self.class)
end

private def db : DB::Database
@@db ||= @@lock.synchronize do
# check @@db again because a previous request could have set it after
# the first time it was checked
@@db || Avram::Connection.new(url, database_class: self.class).open
@@db || connection.open
end
end

Expand Down

0 comments on commit dcd0ee2

Please sign in to comment.