Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to listen for notifications from postgres #677

Merged
merged 3 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions spec/database_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require "./spec_helper"

describe Avram::Database do
describe "listen" do
it "yields the payload from a notify" do
done = Channel(Nil).new
TestDatabase.listen("dinner_time") do |notification|
notification.channel.should eq "dinner_time"
notification.payload.should eq "Tacos"
done.send(nil)
end

TestDatabase.exec("SELECT pg_notify('dinner_time', 'Tacos')")
done.receive
end
end
end
6 changes: 6 additions & 0 deletions src/avram/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ class Avram::Connection
try_connection!
end

def connect_listen(*channels : String, &block : PQ::Notification ->) : Nil
PG.connect_listen(@connection_string, *channels, &block)
rescue DB::ConnectionRefused
raise ConnectionError.new(connection_uri, database_class: @database_class)
end

def try_connection!
DB.open(@connection_string)
rescue DB::ConnectionRefused
Expand Down
27 changes: 26 additions & 1 deletion src/avram/database.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ abstract class Avram::Database
new.delete
end

# Listens for `pg_notify()` calls on each channel in `channels`
# Yields a `PQ::Notification` object with `channel`, `payload`, and `pid`.
#
# ```
# # pg_notify("callback", "123")
# AppDatabase.listen("callback", "jobs") do |notification|
# notification.channel # => "callback"
# notification.payload # => "123"
# end
# ```
def self.listen(*channels : String, &block : PQ::Notification ->) : Nil
new.listen(*channels) do |notification|
block.call(notification)
end
jwoertink marked this conversation as resolved.
Show resolved Hide resolved
end

@@database_info : DatabaseInfo?

def self.database_info : DatabaseInfo
Expand Down Expand Up @@ -131,11 +147,20 @@ abstract class Avram::Database
yield current_transaction.try(&.connection) || db
end

# :nodoc:
def listen(*channels : String, &block : PQ::Notification ->) : Nil
connection.connect_listen(*channels, &block)
end

private def connection : Avram::Connection
Avram::Connection.new(url, database_class: self.class)
end

private def db : DB::Database
@@db ||= @@lock.synchronize do
# check @@db again because a previous request could have set it after
# the first time it was checked
@@db || Avram::Connection.new(url, database_class: self.class).open
@@db || connection.open
end
end

Expand Down