From e40a742a0142fc12be7a4e5421354e7e65e9fe6f Mon Sep 17 00:00:00 2001 From: Hannu Shemeikka Date: Sun, 8 Mar 2020 13:25:56 +0200 Subject: [PATCH 1/2] Connection option for Consumer and Producer Added new keyword 'connection' to both Consumer's and Producer's inits. This accepts same values AMQP-library's Connection.open/2 'connection' keyword tells user that more options than just the connection string can be passed. The actual behavior is same as with 'uri' keyword. No actual logic changes were made. The old 'uri' keyword can still be used. --- README.md | 4 ++-- examples/consumer.ex | 2 +- examples/publisher.ex | 2 +- lib/consumer.ex | 11 ++++++++--- lib/publisher.ex | 16 +++++++++++++--- lib/rabbit_case.ex | 12 ++++++------ test/gen_rmq_consumer_test.exs | 4 ++-- test/gen_rmq_publisher_test.exs | 6 +++--- test/support/test_consumers.ex | 20 ++++++++++---------- test/support/test_publishers.ex | 4 ++-- 10 files changed, 48 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 1492e32..ac4e738 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ defmodule Consumer do exchange: "gen_rmq_exchange", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end ] end @@ -88,7 +88,7 @@ defmodule Publisher do def init() do [ exchange: "gen_rmq_exchange", - uri: "amqp://guest:guest@localhost:5672" + connection: "amqp://guest:guest@localhost:5672" ] end end diff --git a/examples/consumer.ex b/examples/consumer.ex index 170dab9..7826f21 100644 --- a/examples/consumer.ex +++ b/examples/consumer.ex @@ -42,7 +42,7 @@ defmodule ExampleConsumer do exchange: "example_exchange", routing_key: "routing_key.#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672" + connection: "amqp://guest:guest@localhost:5672" ] end diff --git a/examples/publisher.ex b/examples/publisher.ex index 10876f2..73f9570 100644 --- a/examples/publisher.ex +++ b/examples/publisher.ex @@ -26,7 +26,7 @@ defmodule ExamplePublisher do def init() do [ exchange: "example_exchange", - uri: "amqp://guest:guest@localhost:5672" + connection: "amqp://guest:guest@localhost:5672" ] end end diff --git a/lib/consumer.ex b/lib/consumer.ex index a0db9af..5b69230 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -27,7 +27,7 @@ defmodule GenRMQ.Consumer do ## Return values ### Mandatory: - `uri` - RabbitMQ uri + `connection` - RabbitMQ connection options. Accepts same options as AMQP-library's Connection.open(). `queue` - the name of the queue to consume. If it does not exist, it will be created. @@ -40,6 +40,8 @@ defmodule GenRMQ.Consumer do ### Optional: + `uri` - RabbitMQ uri. Deprecated. Please use `connection`. + `queue_ttl` - controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers, the queue has not been redeclared, and basic.get has not been invoked @@ -75,6 +77,7 @@ defmodule GenRMQ.Consumer do ``` def init() do [ + connection: "amqp://guest:guest@localhost:5672", queue: "gen_rmq_in_queue", exchange: "gen_rmq_exchange", routing_key: "#", @@ -95,6 +98,7 @@ defmodule GenRMQ.Consumer do """ @callback init() :: [ + connection: list, queue: String.t(), exchange: GenRMQ.Binding.exchange(), routing_key: [String.t()] | String.t(), @@ -348,6 +352,7 @@ defmodule GenRMQ.Consumer do config_no_q_mp |> Keyword.put(:queue, queue_settings) + |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri])) end defp handle_message(payload, attributes, %{module: module} = state, false) do @@ -398,7 +403,7 @@ defmodule GenRMQ.Consumer do emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key) - case Connection.open(config[:uri]) do + case Connection.open(config[:connection]) do {:ok, conn} -> emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key) Process.monitor(conn.pid) @@ -407,7 +412,7 @@ defmodule GenRMQ.Consumer do {:error, e} -> Logger.error( "[#{module}]: Failed to connect to RabbitMQ with settings: " <> - "#{inspect(strip_key(config, :uri))}, reason #{inspect(e)}" + "#{inspect(strip_key(config, :connection))}, reason #{inspect(e)}" ) emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e) diff --git a/lib/publisher.ex b/lib/publisher.ex index 7bac897..39163a4 100644 --- a/lib/publisher.ex +++ b/lib/publisher.ex @@ -39,13 +39,15 @@ defmodule GenRMQ.Publisher do ## Return values ### Mandatory: - `uri` - RabbitMQ uri + `connection` - RabbitMQ connection options. Accepts same options as AMQP-library's Connection.open(). `exchange` - name or `{type, name}` of the target exchange. If it does not exist, it will be created. For valid exchange types see `GenRMQ.Binding`. ### Optional: + `uri` - RabbitMQ uri. Deprecated. Please use `connection`. + `app_id` - publishing application ID. By default it is `:gen_rmq`. `enable_confirmations` - activates publishing confirmations on the channel. Confirmations are disabled by default. @@ -57,7 +59,8 @@ defmodule GenRMQ.Publisher do def init() do [ exchange: "gen_rmq_exchange", - uri: "amqp://guest:guest@localhost:5672" + connection: "amqp://guest:guest@localhost:5672", + uri: "amqp://guest:guest@localhost:5672", app_id: :my_app_id, enable_confirmations: true, max_confirmation_wait_time: 5_000 @@ -67,6 +70,7 @@ defmodule GenRMQ.Publisher do """ @callback init() :: [ + connection: list, exchange: GenRMQ.Binding.exchange(), uri: String.t(), app_id: atom, @@ -319,6 +323,7 @@ defmodule GenRMQ.Publisher do ############################################################################## defp setup_publisher(%{module: module, config: config} = state) do + state = Map.put(state, :config, parse_config(config)) start_time = System.monotonic_time() exchange = config[:exchange] @@ -336,6 +341,11 @@ defmodule GenRMQ.Publisher do {:ok, %{channel: channel, module: module, config: config, conn: conn}} end + defp parse_config(config) do + config + |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri])) + end + defp emit_connection_down_event(module, reason) do start_time = System.monotonic_time() measurements = %{time: start_time} @@ -400,7 +410,7 @@ defmodule GenRMQ.Publisher do defp publish_result(error, _), do: error defp connect(%{module: module, config: config} = state) do - case Connection.open(config[:uri]) do + case Connection.open(config[:connection]) do {:ok, conn} -> Process.monitor(conn.pid) {:ok, conn} diff --git a/lib/rabbit_case.ex b/lib/rabbit_case.ex index b66e24b..9045da4 100644 --- a/lib/rabbit_case.ex +++ b/lib/rabbit_case.ex @@ -8,8 +8,8 @@ defmodule GenRMQ.RabbitCase do quote do use AMQP - def rmq_open(uri) do - AMQP.Connection.open(uri) + def rmq_open(connection) do + AMQP.Connection.open(connection) end def open_channel(connection), do: AMQP.Channel.open(connection) @@ -39,14 +39,14 @@ defmodule GenRMQ.RabbitCase do {:ok, Jason.decode!(payload), meta} end - def purge_queues(uri, queues) do - {:ok, conn} = rmq_open(uri) + def purge_queues(connection, queues) do + {:ok, conn} = rmq_open(connection) Enum.each(queues, &purge_queue(conn, &1)) AMQP.Connection.close(conn) end - def purge_queues!(uri, queues) do - {:ok, conn} = rmq_open(uri) + def purge_queues!(connection, queues) do + {:ok, conn} = rmq_open(connection) Enum.each(queues, &purge_queue!(conn, &1)) AMQP.Connection.close(conn) end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index abd5dc2..d87abcd 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -18,10 +18,10 @@ defmodule GenRMQ.ConsumerTest do alias TestConsumer.WithMultiBindingExchange alias TestConsumer.RedeclaringExistingExchange - @uri "amqp://guest:guest@localhost:5672" + @connection "amqp://guest:guest@localhost:5672" setup_all do - {:ok, conn} = rmq_open(@uri) + {:ok, conn} = rmq_open(@connection) {:ok, rabbit_conn: conn} end diff --git a/test/gen_rmq_publisher_test.exs b/test/gen_rmq_publisher_test.exs index 317d65c..911f34c 100644 --- a/test/gen_rmq_publisher_test.exs +++ b/test/gen_rmq_publisher_test.exs @@ -9,19 +9,19 @@ defmodule GenRMQ.PublisherTest do alias TestPublisher.WithConfirmations alias TestPublisher.RedeclaringExistingExchange - @uri "amqp://guest:guest@localhost:5672" + @connection "amqp://guest:guest@localhost:5672" @exchange "gen_rmq_out_exchange" @out_queue "gen_rmq_out_queue" @invalid_queue "invalid_queue" setup_all do - {:ok, conn} = rmq_open(@uri) + {:ok, conn} = rmq_open(@connection) :ok = setup_out_queue(conn, @out_queue, @exchange) {:ok, rabbit_conn: conn, out_queue: @out_queue} end setup do - purge_queues(@uri, [@out_queue]) + purge_queues(@connection, [@out_queue]) end describe "start_link/2" do diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index db228d9..8286954 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -9,7 +9,7 @@ defmodule TestConsumer do exchange: "gen_rmq_in_exchange", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000 ] end @@ -39,7 +39,7 @@ defmodule TestConsumer do exchange: "gen_rmq_in_exchange", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", concurrency: false, queue_ttl: 1000 ] @@ -68,7 +68,7 @@ defmodule TestConsumer do exchange: "does_not_matter_exchange", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", reconnect: false, queue_ttl: 1000 ] @@ -92,7 +92,7 @@ defmodule TestConsumer do exchange: "gen_rmq_in_exchange_no_deadletter", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000, deadletter: false ] @@ -117,7 +117,7 @@ defmodule TestConsumer do exchange: "gen_rmq_in_exchange_custom_deadletter", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000, deadletter_queue: "dl_queue", deadletter_exchange: "dl_exchange", @@ -144,7 +144,7 @@ defmodule TestConsumer do exchange: "gen_rmq_in_exchange_with_prio", routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000, queue_max_priority: 100 ] @@ -171,7 +171,7 @@ defmodule TestConsumer do exchange: {:topic, "gen_rmq_in_wt_exchange"}, routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000 ] end @@ -201,7 +201,7 @@ defmodule TestConsumer do exchange: {:direct, "gen_rmq_in_wd_exchange"}, routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000 ] end @@ -231,7 +231,7 @@ defmodule TestConsumer do exchange: {:fanout, "gen_rmq_in_wf_exchange"}, routing_key: "#", prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000 ] end @@ -261,7 +261,7 @@ defmodule TestConsumer do exchange: {:direct, "gen_rmq_in_mb_exchange"}, routing_key: ["routing_key_1", "routing_key_2"], prefetch_count: "10", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", queue_ttl: 1000 ] end diff --git a/test/support/test_publishers.ex b/test/support/test_publishers.ex index cc5c040..a45dfac 100644 --- a/test/support/test_publishers.ex +++ b/test/support/test_publishers.ex @@ -6,7 +6,7 @@ defmodule TestPublisher do def init() do [ exchange: "gen_rmq_out_exchange", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", app_id: :my_app_id ] end @@ -34,7 +34,7 @@ defmodule TestPublisher do def init() do [ exchange: "gen_rmq_out_exchange", - uri: "amqp://guest:guest@localhost:5672", + connection: "amqp://guest:guest@localhost:5672", app_id: :my_app_id, enable_confirmations: true ] From 82598c28355def0f7ae935f1f3d6e7821550fa18 Mon Sep 17 00:00:00 2001 From: Hannu Shemeikka Date: Mon, 9 Mar 2020 19:54:36 +0200 Subject: [PATCH 2/2] Updated documentation Added link to AMQP-library's connection.open/2 for the connection-keyword. Added small comment to publisher's parse_config. --- lib/consumer.ex | 4 ++-- lib/publisher.ex | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index 5b69230..b5ce072 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -27,7 +27,7 @@ defmodule GenRMQ.Consumer do ## Return values ### Mandatory: - `connection` - RabbitMQ connection options. Accepts same options as AMQP-library's Connection.open(). + `connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2). `queue` - the name of the queue to consume. If it does not exist, it will be created. @@ -98,7 +98,7 @@ defmodule GenRMQ.Consumer do """ @callback init() :: [ - connection: list, + connection: keyword | {String.t(), String.t()} | :undefined | keyword, queue: String.t(), exchange: GenRMQ.Binding.exchange(), routing_key: [String.t()] | String.t(), diff --git a/lib/publisher.ex b/lib/publisher.ex index 39163a4..22cafb9 100644 --- a/lib/publisher.ex +++ b/lib/publisher.ex @@ -39,7 +39,7 @@ defmodule GenRMQ.Publisher do ## Return values ### Mandatory: - `connection` - RabbitMQ connection options. Accepts same options as AMQP-library's Connection.open(). + `connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2). `exchange` - name or `{type, name}` of the target exchange. If it does not exist, it will be created. For valid exchange types see `GenRMQ.Binding`. @@ -70,7 +70,7 @@ defmodule GenRMQ.Publisher do """ @callback init() :: [ - connection: list, + connection: keyword | {String.t(), String.t()} | :undefined | keyword, exchange: GenRMQ.Binding.exchange(), uri: String.t(), app_id: atom, @@ -342,8 +342,9 @@ defmodule GenRMQ.Publisher do end defp parse_config(config) do - config - |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri])) + # Backwards compatibility support + # Use connection-keyword if it's set, otherwise use uri-keyword + Keyword.put(config, :connection, Keyword.get(config, :connection, config[:uri])) end defp emit_connection_down_event(module, reason) do