public class BlockingQueueConsumer
extends java.lang.Object
| Modifier and Type | Class and Description |
|---|---|
class |
BlockingQueueConsumer.DeclarationException |
private static class |
BlockingQueueConsumer.Delivery
Encapsulates an arbitrary message - simple "bean" holder structure.
|
private class |
BlockingQueueConsumer.InternalConsumer |
| Modifier and Type | Field and Description |
|---|---|
private AcknowledgeMode |
acknowledgeMode |
private ActiveObjectCounter<BlockingQueueConsumer> |
activeObjectCounter |
private java.util.concurrent.atomic.AtomicBoolean |
cancelled |
private java.util.concurrent.atomic.AtomicBoolean |
cancelReceived |
private com.rabbitmq.client.Channel |
channel |
private ConnectionFactory |
connectionFactory |
private BlockingQueueConsumer.InternalConsumer |
consumer |
private java.util.Map<java.lang.String,java.lang.Object> |
consumerArgs |
private java.util.Collection<java.lang.String> |
consumerTags |
private int |
declarationRetries |
private boolean |
defaultRequeuRejected |
private java.util.Set<java.lang.Long> |
deliveryTags |
private boolean |
exclusive |
private long |
failedDeclarationRetryInterval |
private long |
lastRetryDeclaration |
private static org.apache.commons.logging.Log |
logger |
private MessagePropertiesConverter |
messagePropertiesConverter |
private java.util.Set<java.lang.String> |
missingQueues |
private int |
prefetchCount |
private java.util.concurrent.BlockingQueue<BlockingQueueConsumer.Delivery> |
queue |
private java.lang.String[] |
queues |
private RabbitResourceHolder |
resourceHolder |
private long |
retryDeclarationInterval |
private com.rabbitmq.client.ShutdownSignalException |
shutdown |
private long |
shutdownTimeout |
private java.util.concurrent.CountDownLatch |
suspendClientThread |
private boolean |
transactional |
| Constructor and Description |
|---|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
java.util.Map<java.lang.String,java.lang.Object> consumerArgs,
boolean exclusive,
java.lang.String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
java.util.Map<java.lang.String,java.lang.Object> consumerArgs,
java.lang.String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
boolean defaultRequeueRejected,
java.lang.String... queues)
Create a consumer.
|
BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter,
AcknowledgeMode acknowledgeMode,
boolean transactional,
int prefetchCount,
java.lang.String... queues)
Create a consumer.
|
| Modifier and Type | Method and Description |
|---|---|
private void |
attemptPassiveDeclarations() |
private void |
checkMissingQueues() |
private void |
checkShutdown()
Check if we are in shutdown mode and if so throw an exception.
|
boolean |
commitIfNecessary(boolean locallyTransacted)
Perform a commit or message acknowledgement, as appropriate.
|
private void |
consumeFromQueue(java.lang.String queue) |
com.rabbitmq.client.Channel |
getChannel() |
java.lang.String |
getConsumerTag() |
private Message |
handle(BlockingQueueConsumer.Delivery delivery)
If this is a non-POISON non-null delivery simply return it.
|
Message |
nextMessage()
Main application-side API: wait for the next message delivery and return it.
|
Message |
nextMessage(long timeout)
Main application-side API: wait for the next message delivery and return it.
|
void |
rollbackOnExceptionIfNecessary(java.lang.Throwable ex)
Perform a rollback, handling rollback exceptions properly.
|
void |
setDeclarationRetries(int declarationRetries)
Set the number of retries after passive queue declaration fails.
|
void |
setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
Set the interval between passive queue declaration attempts in milliseconds.
|
void |
setQuiesce(long shutdownTimeout)
Stop receiving new messages; drain the queue of any prefetched messages.
|
void |
setRetryDeclarationInterval(long retryDeclarationInterval)
When consuming multiple queues, set the interval between declaration attempts when only
a subset of the queues were available (milliseconds).
|
void |
start() |
void |
stop() |
java.lang.String |
toString() |
private static org.apache.commons.logging.Log logger
private final java.util.concurrent.BlockingQueue<BlockingQueueConsumer.Delivery> queue
private volatile com.rabbitmq.client.ShutdownSignalException shutdown
private final java.lang.String[] queues
private final int prefetchCount
private final boolean transactional
private com.rabbitmq.client.Channel channel
private RabbitResourceHolder resourceHolder
private BlockingQueueConsumer.InternalConsumer consumer
private final java.util.concurrent.atomic.AtomicBoolean cancelled
private volatile long shutdownTimeout
private final java.util.concurrent.atomic.AtomicBoolean cancelReceived
private final AcknowledgeMode acknowledgeMode
private final ConnectionFactory connectionFactory
private final MessagePropertiesConverter messagePropertiesConverter
private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter
private final java.util.Map<java.lang.String,java.lang.Object> consumerArgs
private final boolean exclusive
private final java.util.Set<java.lang.Long> deliveryTags
private final boolean defaultRequeuRejected
private final java.util.concurrent.CountDownLatch suspendClientThread
private final java.util.Collection<java.lang.String> consumerTags
private final java.util.Set<java.lang.String> missingQueues
private long retryDeclarationInterval
private long failedDeclarationRetryInterval
private int declarationRetries
private long lastRetryDeclaration
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, java.lang.String... queues)
connectionFactory - The connection factory.messagePropertiesConverter - The properties converter.activeObjectCounter - The active object counter; used during shutdown.acknowledgeMode - The acknowledgemode.transactional - Whether the channel is transactional.prefetchCount - The prefetch count.queues - The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, java.lang.String... queues)
connectionFactory - The connection factory.messagePropertiesConverter - The properties converter.activeObjectCounter - The active object counter; used during shutdown.acknowledgeMode - The acknowledge mode.transactional - Whether the channel is transactional.prefetchCount - The prefetch count.defaultRequeueRejected - true to reject requeued messages.queues - The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, java.util.Map<java.lang.String,java.lang.Object> consumerArgs, java.lang.String... queues)
connectionFactory - The connection factory.messagePropertiesConverter - The properties converter.activeObjectCounter - The active object counter; used during shutdown.acknowledgeMode - The acknowledge mode.transactional - Whether the channel is transactional.prefetchCount - The prefetch count.defaultRequeueRejected - true to reject requeued messages.consumerArgs - The consumer arguments (e.g. x-priority).queues - The queues.public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, java.util.Map<java.lang.String,java.lang.Object> consumerArgs, boolean exclusive, java.lang.String... queues)
connectionFactory - The connection factory.messagePropertiesConverter - The properties converter.activeObjectCounter - The active object counter; used during shutdown.acknowledgeMode - The acknowledge mode.transactional - Whether the channel is transactional.prefetchCount - The prefetch count.defaultRequeueRejected - true to reject requeued messages.consumerArgs - The consumer arguments (e.g. x-priority).exclusive - true if the consumer is to be exclusive.queues - The queues.public com.rabbitmq.client.Channel getChannel()
public java.lang.String getConsumerTag()
public final void setQuiesce(long shutdownTimeout)
shutdownTimeout - how long (ms) to suspend the client thread.public void setDeclarationRetries(int declarationRetries)
declarationRetries - The number of retries, default 3.setFailedDeclarationRetryInterval(long)public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval)
failedDeclarationRetryInterval - the interval, default 5000.setDeclarationRetries(int)public void setRetryDeclarationInterval(long retryDeclarationInterval)
retryDeclarationInterval - the interval, default 60000.private void checkShutdown()
private Message handle(BlockingQueueConsumer.Delivery delivery) throws java.lang.InterruptedException
java.lang.InterruptedExceptionpublic Message nextMessage() throws java.lang.InterruptedException, com.rabbitmq.client.ShutdownSignalException
java.lang.InterruptedException - if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waitingpublic Message nextMessage(long timeout) throws java.lang.InterruptedException, com.rabbitmq.client.ShutdownSignalException
timeout - timeout in millisecondjava.lang.InterruptedException - if an interrupt is received while waitingcom.rabbitmq.client.ShutdownSignalException - if the connection is shut down while waitingprivate void checkMissingQueues()
public void start()
throws AmqpException
AmqpExceptionprivate void consumeFromQueue(java.lang.String queue)
throws java.io.IOException
java.io.IOExceptionprivate void attemptPassiveDeclarations()
public void stop()
public java.lang.String toString()
toString in class java.lang.Objectpublic void rollbackOnExceptionIfNecessary(java.lang.Throwable ex)
throws java.lang.Exception
ex - the thrown application exception or errorjava.lang.Exception - in case of a rollback errorpublic boolean commitIfNecessary(boolean locallyTransacted)
throws java.io.IOException
locallyTransacted - Whether the channel is locally transacted.java.io.IOException - Any IOException.