Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Connection option for Consumer and Producer #149

Merged
merged 2 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Consumer do
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
]
end
Expand Down Expand Up @@ -88,7 +88,7 @@ defmodule Publisher do
def init() do
[
exchange: "gen_rmq_exchange",
uri: "amqp://guest:guest@localhost:5672"
connection: "amqp://guest:guest@localhost:5672"
]
end
end
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule ExampleConsumer do
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672"
connection: "amqp://guest:guest@localhost:5672"
]
end

Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule ExamplePublisher do
def init() do
[
exchange: "example_exchange",
uri: "amqp://guest:guest@localhost:5672"
connection: "amqp://guest:guest@localhost:5672"
]
end
end
11 changes: 8 additions & 3 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule GenRMQ.Consumer do
## Return values
### Mandatory:

`uri` - RabbitMQ uri
`connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2).

`queue` - the name of the queue to consume. If it does not exist, it will be created.

Expand All @@ -40,6 +40,8 @@ defmodule GenRMQ.Consumer do

### Optional:

`uri` - RabbitMQ uri. Deprecated. Please use `connection`.

`queue_ttl` - controls for how long a queue can be unused before it is
automatically deleted. Unused means the queue has no consumers,
the queue has not been redeclared, and basic.get has not been invoked
Expand Down Expand Up @@ -75,6 +77,7 @@ defmodule GenRMQ.Consumer do
```
def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "gen_rmq_in_queue",
exchange: "gen_rmq_exchange",
routing_key: "#",
Expand All @@ -95,6 +98,7 @@ defmodule GenRMQ.Consumer do

"""
@callback init() :: [
connection: keyword | {String.t(), String.t()} | :undefined | keyword,
queue: String.t(),
exchange: GenRMQ.Binding.exchange(),
routing_key: [String.t()] | String.t(),
Expand Down Expand Up @@ -348,6 +352,7 @@ defmodule GenRMQ.Consumer do

config_no_q_mp
|> Keyword.put(:queue, queue_settings)
|> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri]))
end

defp handle_message(payload, attributes, %{module: module} = state, false) do
Expand Down Expand Up @@ -398,7 +403,7 @@ defmodule GenRMQ.Consumer do

emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key)

case Connection.open(config[:uri]) do
case Connection.open(config[:connection]) do
{:ok, conn} ->
emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Process.monitor(conn.pid)
Expand All @@ -407,7 +412,7 @@ defmodule GenRMQ.Consumer do
{:error, e} ->
Logger.error(
"[#{module}]: Failed to connect to RabbitMQ with settings: " <>
"#{inspect(strip_key(config, :uri))}, reason #{inspect(e)}"
"#{inspect(strip_key(config, :connection))}, reason #{inspect(e)}"
)

emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)
Expand Down
17 changes: 14 additions & 3 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ defmodule GenRMQ.Publisher do
## Return values
### Mandatory:

`uri` - RabbitMQ uri
`connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2).

`exchange` - name or `{type, name}` of the target exchange. If it does not exist, it will be created.
For valid exchange types see `GenRMQ.Binding`.

### Optional:

`uri` - RabbitMQ uri. Deprecated. Please use `connection`.

`app_id` - publishing application ID. By default it is `:gen_rmq`.

`enable_confirmations` - activates publishing confirmations on the channel. Confirmations are disabled by default.
Expand All @@ -57,7 +59,8 @@ defmodule GenRMQ.Publisher do
def init() do
[
exchange: "gen_rmq_exchange",
uri: "amqp://guest:guest@localhost:5672"
connection: "amqp://guest:guest@localhost:5672",
uri: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id,
enable_confirmations: true,
max_confirmation_wait_time: 5_000
Expand All @@ -67,6 +70,7 @@ defmodule GenRMQ.Publisher do

"""
@callback init() :: [
connection: keyword | {String.t(), String.t()} | :undefined | keyword,
exchange: GenRMQ.Binding.exchange(),
uri: String.t(),
app_id: atom,
Expand Down Expand Up @@ -319,6 +323,7 @@ defmodule GenRMQ.Publisher do
##############################################################################

defp setup_publisher(%{module: module, config: config} = state) do
state = Map.put(state, :config, parse_config(config))
start_time = System.monotonic_time()
exchange = config[:exchange]

Expand All @@ -336,6 +341,12 @@ defmodule GenRMQ.Publisher do
{:ok, %{channel: channel, module: module, config: config, conn: conn}}
end

defp parse_config(config) do
# Backwards compatibility support
# Use connection-keyword if it's set, otherwise use uri-keyword
Keyword.put(config, :connection, Keyword.get(config, :connection, config[:uri]))
end

defp emit_connection_down_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
Expand Down Expand Up @@ -400,7 +411,7 @@ defmodule GenRMQ.Publisher do
defp publish_result(error, _), do: error

defp connect(%{module: module, config: config} = state) do
case Connection.open(config[:uri]) do
case Connection.open(config[:connection]) do
{:ok, conn} ->
Process.monitor(conn.pid)
{:ok, conn}
Expand Down
12 changes: 6 additions & 6 deletions lib/rabbit_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ defmodule GenRMQ.RabbitCase do
quote do
use AMQP

def rmq_open(uri) do
AMQP.Connection.open(uri)
def rmq_open(connection) do
AMQP.Connection.open(connection)
end

def open_channel(connection), do: AMQP.Channel.open(connection)
Expand Down Expand Up @@ -39,14 +39,14 @@ defmodule GenRMQ.RabbitCase do
{:ok, Jason.decode!(payload), meta}
end

def purge_queues(uri, queues) do
{:ok, conn} = rmq_open(uri)
def purge_queues(connection, queues) do
{:ok, conn} = rmq_open(connection)
Enum.each(queues, &purge_queue(conn, &1))
AMQP.Connection.close(conn)
end

def purge_queues!(uri, queues) do
{:ok, conn} = rmq_open(uri)
def purge_queues!(connection, queues) do
{:ok, conn} = rmq_open(connection)
Enum.each(queues, &purge_queue!(conn, &1))
AMQP.Connection.close(conn)
end
Expand Down
4 changes: 2 additions & 2 deletions test/gen_rmq_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ defmodule GenRMQ.ConsumerTest do
alias TestConsumer.WithMultiBindingExchange
alias TestConsumer.RedeclaringExistingExchange

@uri "amqp://guest:guest@localhost:5672"
@connection "amqp://guest:guest@localhost:5672"

setup_all do
{:ok, conn} = rmq_open(@uri)
{:ok, conn} = rmq_open(@connection)
{:ok, rabbit_conn: conn}
end

Expand Down
6 changes: 3 additions & 3 deletions test/gen_rmq_publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ defmodule GenRMQ.PublisherTest do
alias TestPublisher.WithConfirmations
alias TestPublisher.RedeclaringExistingExchange

@uri "amqp://guest:guest@localhost:5672"
@connection "amqp://guest:guest@localhost:5672"
@exchange "gen_rmq_out_exchange"
@out_queue "gen_rmq_out_queue"
@invalid_queue "invalid_queue"

setup_all do
{:ok, conn} = rmq_open(@uri)
{:ok, conn} = rmq_open(@connection)
:ok = setup_out_queue(conn, @out_queue, @exchange)
{:ok, rabbit_conn: conn, out_queue: @out_queue}
end

setup do
purge_queues(@uri, [@out_queue])
purge_queues(@connection, [@out_queue])
end

describe "start_link/2" do
Expand Down
20 changes: 10 additions & 10 deletions test/support/test_consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule TestConsumer do
exchange: "gen_rmq_in_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000
]
end
Expand Down Expand Up @@ -39,7 +39,7 @@ defmodule TestConsumer do
exchange: "gen_rmq_in_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
concurrency: false,
queue_ttl: 1000
]
Expand Down Expand Up @@ -68,7 +68,7 @@ defmodule TestConsumer do
exchange: "does_not_matter_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
reconnect: false,
queue_ttl: 1000
]
Expand All @@ -92,7 +92,7 @@ defmodule TestConsumer do
exchange: "gen_rmq_in_exchange_no_deadletter",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000,
deadletter: false
]
Expand All @@ -117,7 +117,7 @@ defmodule TestConsumer do
exchange: "gen_rmq_in_exchange_custom_deadletter",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000,
deadletter_queue: "dl_queue",
deadletter_exchange: "dl_exchange",
Expand All @@ -144,7 +144,7 @@ defmodule TestConsumer do
exchange: "gen_rmq_in_exchange_with_prio",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000,
queue_max_priority: 100
]
Expand All @@ -171,7 +171,7 @@ defmodule TestConsumer do
exchange: {:topic, "gen_rmq_in_wt_exchange"},
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000
]
end
Expand Down Expand Up @@ -201,7 +201,7 @@ defmodule TestConsumer do
exchange: {:direct, "gen_rmq_in_wd_exchange"},
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000
]
end
Expand Down Expand Up @@ -231,7 +231,7 @@ defmodule TestConsumer do
exchange: {:fanout, "gen_rmq_in_wf_exchange"},
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000
]
end
Expand Down Expand Up @@ -261,7 +261,7 @@ defmodule TestConsumer do
exchange: {:direct, "gen_rmq_in_mb_exchange"},
routing_key: ["routing_key_1", "routing_key_2"],
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1000
]
end
Expand Down
4 changes: 2 additions & 2 deletions test/support/test_publishers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule TestPublisher do
def init() do
[
exchange: "gen_rmq_out_exchange",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id
]
end
Expand Down Expand Up @@ -34,7 +34,7 @@ defmodule TestPublisher do
def init() do
[
exchange: "gen_rmq_out_exchange",
uri: "amqp://guest:guest@localhost:5672",
connection: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id,
enable_confirmations: true
]
Expand Down