Simple consumer server for the RabbitMQ.
Add Coney as a dependency in your mix.exs
file.
def deps do
[{:coney, "~> 2.0"}]
end
After you are done, run mix deps.get
in your shell to fetch and compile Coney.
# config/config.exs
config :coney,
adapter: Coney.RabbitConnection,
pool_size: 1,
settings: %{
url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
timeout: 1000
},
workers: [
MyApplication.MyConsumer
]
# or
workers: [
%{
connection: %{
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routing_key"]
},
worker: MyApplication.MyConsumer
}
]
# config/test.exs
config :coney, adapter: Coney.FakeConnection, settings: %{}
# web/consumers/my_consumer.ex
defmodule MyApplication.MyConsumer do
@behaviour Coney.Consumer
def connection do
%{
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routnig_key"]
}
end
def parse(payload, _meta) do
String.to_integer(payload)
end
def process(number, _meta) do
if number <= 10 do
:ok
else
:reject
end
end
def error_happened(exception, payload, _meta) do
IO.inspect __STACKTRACE__
IO.puts "Exception raised with #{ payload }"
:redeliver
end
end
:ok
- ack message.:reject
- reject message.:redeliver
- return message to the queue.{:reply, binary}
- response will be published to reply exchange.
To use {:reply, binary}
you should add response exchange in connection
:
# web/consumers/my_consumer.ex
def connection do
%{
# ...
respond_to: "response_exchange"
}
end
Response will be published to "response_exchange"
exchange.
Coney.ConnectionServer.publish("exchange", "message")
# or
Coney.ConnectionServer.publish("exchange", "routing_key", "message")
Bug reports and pull requests are welcome on GitHub at https://github.com/llxff/coney.
The library is available as open source under the terms of the MIT License.