Clusters
Overview
Apache ActiveMQ Artemis clusters allow groups of Apache ActiveMQ Artemis servers to be grouped together in order to share message processing load. Each active node in the cluster is an active Apache ActiveMQ Artemis server which manages its own messages and handles its own connections.
The cluster is formed by each node declaring cluster connections to
other nodes in the core configuration file broker.xml
.
When a node forms a cluster connection to another node, internally it
creates a core bridge (as described in Core Bridges) connection between it and
the other node, this is done transparently behind the scenes - you don't
have to declare an explicit bridge for each node. These cluster
connections allow messages to flow between the nodes of the cluster to
balance load.
Nodes can be connected together to form a cluster in many different topologies, we will discuss a couple of the more common topologies later in this chapter.
We'll also discuss client side load balancing, where we can balance client connections across the nodes of the cluster, and we'll consider message redistribution where Apache ActiveMQ Artemis will redistribute messages between nodes to avoid starvation.
Another important part of clustering is server discovery where servers can broadcast their connection details so clients or other servers can connect to them with the minimum of configuration.
Once a cluster node has been configured it is common to simply copy that configuration to other nodes to produce a symmetric cluster. However, care must be taken when copying the Apache ActiveMQ Artemis files. Do not copy the Apache ActiveMQ Artemis data (i.e. the
bindings
,journal
, andlarge-messages
directories) from one node to another. When a node is started for the first time and initializes its journal files it also persists a special identifier to thejournal
directory. This id must be unique among nodes in the cluster or the cluster will not form properly.
Server discovery
Server discovery is a mechanism by which servers can propagate their connection details to:
Messaging clients. A messaging client wants to be able to connect to the servers of the cluster without having specific knowledge of which servers in the cluster are up at any one time.
Other servers. Servers in a cluster want to be able to create cluster connections to each other without having prior knowledge of all the other servers in the cluster.
This information, let's call it the Cluster Topology, is actually sent around normal Apache ActiveMQ Artemis connections to clients and to other servers over cluster connections. This being the case we need a way of establishing the initial first connection. This can be done using dynamic discovery techniques like UDP and JGroups, or by providing a list of initial connectors.
Dynamic Discovery
Server discovery uses UDP multicast or JGroups to broadcast server connection settings.
Broadcast Groups
A broadcast group is the means by which a server broadcasts connectors over the network. A connector defines a way in which a client (or other server) can make connections to the server. For more information on what a connector is, please see Configuring the Transport.
The broadcast group takes a set of connector pairs, each connector pair contains connection settings for a live and backup server (if one exists) and broadcasts them on the network. Depending on which broadcasting technique you configure the cluster, it uses either UDP or JGroups to broadcast connector pairs information.
Broadcast groups are defined in the server configuration file
broker.xml
. There can be many broadcast groups per
Apache ActiveMQ Artemis server. All broadcast groups must be defined in a
broadcast-groups
element.
Let's take a look at an example broadcast group from
broker.xml
that defines a UDP broadcast group:
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<local-bind-address>172.16.9.3</local-bind-address>
<local-bind-port>5432</local-bind-port>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<broadcast-period>2000</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
Some of the broadcast group parameters are optional and you'll normally use the defaults, but we specify them all in the above example for clarity. Let's discuss each one in turn:
name
attribute. Each broadcast group in the server must have a unique name.local-bind-address
. This is the local bind address that the datagram socket is bound to. If you have multiple network interfaces on your server, you would specify which one you wish to use for broadcasts by setting this property. If this property is not specified then the socket will be bound to the wildcard address, an IP address chosen by the kernel. This is a UDP specific attribute.local-bind-port
. If you want to specify a local port to which the datagram socket is bound you can specify it here. Normally you would just use the default value of-1
which signifies that an anonymous port should be used. This parameter is always specified in conjunction withlocal-bind-address
. This is a UDP specific attribute.group-address
. This is the multicast address to which the data will be broadcast. It is a class D IP address in the range224.0.0.0
to239.255.255.255
, inclusive. The address224.0.0.0
is reserved and is not available for use. This parameter is mandatory. This is a UDP specific attribute.group-port
. This is the UDP port number used for broadcasting. This parameter is mandatory. This is a UDP specific attribute.broadcast-period
. This is the period in milliseconds between consecutive broadcasts. This parameter is optional, the default value is2000
milliseconds.connector-ref
. This specifies the connector and optional backup connector that will be broadcasted (see Configuring the Transport for more information on connectors).
Here is another example broadcast group that defines a JGroups broadcast group:
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
<jgroups-channel>activemq_broadcast_channel</jgroups-channel>
<broadcast-period>2000</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
To be able to use JGroups to broadcast, one must specify two attributes,
i.e. jgroups-file
and jgroups-channel
, as discussed in details as
following:
jgroups-file
attribute. This is the name of JGroups configuration file. It will be used to initialize JGroups channels. Make sure the file is in the java resource path so that Apache ActiveMQ Artemis can load it.jgroups-channel
attribute. The name that JGroups channels connect to for broadcasting.
Note:
The JGroups attributes (
jgroups-file
andjgroups-channel
) and UDP specific attributes described above are exclusive of each other. Only one set can be specified in a broadcast group configuration. Don't mix them!
The following is an example of a JGroups file
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
<TCP loopback="true"
recv_buf_size="20000000"
send_buf_size="640000"
discard_incompatible_packets="true"
max_bundle_size="64000"
max_bundle_timeout="30"
enable_bundling="true"
use_send_queues="false"
sock_conn_timeout="300"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="10"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="run"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="run"/>
<FILE_PING location="../file.ping.dir"/>
<MERGE2 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD timeout="10000" max_tries="5" />
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK
use_mcast_xmit="false"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true"/>
<UNICAST timeout="300,600,1200" />
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<pbcast.GMS print_local_addr="true" join_timeout="3000"
view_bundling="true"/>
<FC max_credits="2000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000" />
<pbcast.STATE_TRANSFER/>
<pbcast.FLUSH timeout="0"/>
</config>
As it shows, the file content defines a jgroups protocol stacks. If you
want Apache ActiveMQ Artemis to use this stacks for channel creation, you have to make
sure the value of jgroups-file
in your broadcast-group/discovery-group
configuration to be the name of this jgroups configuration file. For
example if the above stacks configuration is stored in a file named
"jgroups-stacks.xml" then your jgroups-file
should be like
<jgroups-file>jgroups-stacks.xml</jgroups-file>
Discovery Groups
While the broadcast group defines how connector information is broadcasted from a server, a discovery group defines how connector information is received from a broadcast endpoint (a UDP multicast address or JGroup channel).
A discovery group maintains a list of connector pairs - one for each broadcast by a different server. As it receives broadcasts on the broadcast endpoint from a particular server it updates its entry in the list for that server.
If it has not received a broadcast from a particular server for a length of time it will remove that server's entry from its list.
Discovery groups are used in two places in Apache ActiveMQ Artemis:
By cluster connections so they know how to obtain an initial connection to download the topology
By messaging clients so they know how to obtain an initial connection to download the topology
Although a discovery group will always accept broadcasts, its current list of available live and backup servers is only ever used when an initial connection is made, from then server discovery is done over the normal Apache ActiveMQ Artemis connections.
Note:
Each discovery group must be configured with broadcast endpoint (UDP or JGroups) that matches its broadcast group counterpart. For example, if broadcast is configured using UDP, the discovery group must also use UDP, and the same multicast address.
Defining Discovery Groups on the Server
For cluster connections, discovery groups are defined in the server side
configuration file broker.xml
. All discovery groups
must be defined inside a discovery-groups
element. There can be many
discovery groups defined by Apache ActiveMQ Artemis server. Let's look at an example:
<discovery-groups>
<discovery-group name="my-discovery-group">
<local-bind-address>172.16.9.7</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
We'll consider each parameter of the discovery group:
name
attribute. Each discovery group must have a unique name per server.local-bind-address
. If you are running with multiple network interfaces on the same machine, you may want to specify that the discovery group listens only only a specific interface. To do this you can specify the interface address with this parameter. This parameter is optional. This is a UDP specific attribute.group-address
. This is the multicast IP address of the group to listen on. It should match thegroup-address
in the broadcast group that you wish to listen from. This parameter is mandatory. This is a UDP specific attribute.group-port
. This is the UDP port of the multicast group. It should match thegroup-port
in the broadcast group that you wish to listen from. This parameter is mandatory. This is a UDP specific attribute.refresh-timeout
. This is the period the discovery group waits after receiving the last broadcast from a particular server before removing that servers connector pair entry from its list. You would normally set this to a value significantly higher than thebroadcast-period
on the broadcast group otherwise servers might intermittently disappear from the list even though they are still broadcasting due to slight differences in timing. This parameter is optional, the default value is10000
milliseconds (10 seconds).
Here is another example that defines a JGroups discovery group:
<discovery-groups>
<discovery-group name="my-broadcast-group">
<jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
<jgroups-channel>activemq_broadcast_channel</jgroups-channel>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
To receive broadcast from JGroups channels, one must specify two
attributes, jgroups-file
and jgroups-channel
, as discussed in
details as following:
jgroups-file
attribute. This is the name of JGroups configuration file. It will be used to initialize JGroups channels. Make sure the file is in the java resource path so that Apache ActiveMQ Artemis can load it.jgroups-channel
attribute. The name that JGroups channels connect to for receiving broadcasts.
Note:
The JGroups attributes (
jgroups-file
andjgroups-channel
) and UDP specific attributes described above are exclusive of each other. Only one set can be specified in a discovery group configuration. Don't mix them!
Discovery Groups on the Client Side
Let's discuss how to configure an Apache ActiveMQ Artemis client to use discovery to discover a list of servers to which it can connect. The way to do this differs depending on whether you're using JMS or the core API.
Configuring client discovery
Use the udp
URL scheme and a host:port combination matches the group-address and
group-port from the corresponding broadcast-group
on the server:
udp://231.7.7.7:9876
The element discovery-group-ref
specifies the name of a discovery
group defined in broker.xml
.
Connections created using this URI will be load-balanced across the list of servers that the discovery group maintains by listening on the multicast address specified in the discovery group configuration.
The aforementioned refreshTimeout
parameter can be set directly in the URI.
There is also a URL parameter named initialWaitTimeout
. If the corresponding
JMS connection factory or core session factory is used immediately after
creation then it may not have had enough time to received broadcasts from
all the nodes in the cluster. On first usage, the connection factory will
make sure it waits this long since creation before creating the first
connection. The default value for this parameter is 10000
milliseconds.
Discovery using static Connectors
Sometimes it may be impossible to use UDP on the network you are using. In this case its possible to configure a connection with an initial list of possible servers. This could be just one server that you know will always be available or a list of servers where at least one will be available.
This doesn't mean that you have to know where all your servers are going to be hosted, you can configure these servers to use the reliable servers to connect to. Once they are connected their connection details will be propagated via the server it connects to
Configuring a Cluster Connection
For cluster connections there is no extra configuration needed, you just need to make sure that any connectors are defined in the usual manner, (see Configuring the Transport for more information on connectors). These are then referenced by the cluster connection configuration.
Configuring a Client Connection
A static list of possible servers can also be used by a normal client.
Configuring client discovery
A list of servers to be used for the initial connection attempt can be
specified in the connection URI using a syntax with ()
, e.g.:
(tcp://myhost:61616,tcp://myhost2:61616)?reconnectAttempts=5
The brackets are expanded so the same query can be appended after the last bracket for ease.
Server-Side Message Load Balancing
If cluster connections are defined between nodes of a cluster, then Apache ActiveMQ Artemis will load balance messages arriving at a particular node from a client.
Let's take a simple example of a cluster of four nodes A, B, C, and D
arranged in a symmetric cluster (described in Symmetrical Clusters section). We have a queue
called OrderQueue
deployed on each node of the cluster.
We have client Ca connected to node A, sending orders to the server. We
have also have order processor clients Pa, Pb, Pc, and Pd connected to
each of the nodes A, B, C, D. If no cluster connection was defined on
node A, then as order messages arrive on node A they will all end up in
the OrderQueue
on node A, so will only get consumed by the order
processor client attached to node A, Pa.
If we define a cluster connection on node A, then as ordered messages
arrive on node A instead of all of them going into the local
OrderQueue
instance, they are distributed in a round-robin fashion
between all the nodes of the cluster. The messages are forwarded from
the receiving node to other nodes of the cluster. This is all done on
the server side, the client maintains a single connection to node A.
For example, messages arriving on node A might be distributed in the following order between the nodes: B, D, C, A, B, D, C, A, B, D. The exact order depends on the order the nodes started up, but the algorithm used is round robin.
Apache ActiveMQ Artemis cluster connections can be configured to always blindly load balance messages in a round robin fashion irrespective of whether there are any matching consumers on other nodes, but they can be a bit cleverer than that and also be configured to only distribute to other nodes if they have matching consumers. We'll look at both these cases in turn with some examples, but first we'll discuss configuring cluster connections in general.
Configuring Cluster Connections
Cluster connections group servers into clusters so that messages can be
load balanced between the nodes of the cluster. Let's take a look at a
typical cluster connection. Cluster connections are always defined in
broker.xml
inside a cluster-connection
element.
There can be zero or more cluster connections defined per Apache ActiveMQ Artemis
server.
<cluster-connections>
<cluster-connection name="my-cluster">
<address></address>
<connector-ref>netty-connector</connector-ref>
<check-period>1000</check-period>
<connection-ttl>5000</connection-ttl>
<min-large-message-size>50000</min-large-message-size>
<call-timeout>5000</call-timeout>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.0</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<confirmation-window-size>32000</confirmation-window-size>
<call-failover-timeout>30000</call-failover-timeout>
<notification-interval>1000</notification-interval>
<notification-attempts>2</notification-attempts>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
In the above cluster connection all parameters have been explicitly specified. The following shows all the available configuration options
address
Each cluster connection only applies to addresses that match the specifiedaddress
field. An address is matched on the cluster connection when it begins with the string specified in this field. Theaddress
field on a cluster connection also supports comma separated lists and an exclude syntax!
. To prevent an address from being matched on this cluster connection, prepend a cluster connection address string with!
.In the case shown above the cluster connection will load balance messages sent to all addresses (since it's empty).
The address can be any value and you can have many cluster connections with different values of
address
, simultaneously balancing messages for those addresses, potentially to different clusters of servers. By having multiple cluster connections on different addresses a single Apache ActiveMQ Artemis Server can effectively take part in multiple clusters simultaneously.Be careful not to have multiple cluster connections with overlapping values of
address
, e.g. "europe" and "europe.news" since this could result in the same messages being distributed between more than one cluster connection, possibly resulting in duplicate deliveries.Examples:
- 'eu' matches all addresses starting with 'eu'
- '!eu' matches all address except for those starting with 'eu'
- 'eu.uk,eu.de' matches all addresses starting with either 'eu.uk' or 'eu.de'
- 'eu,!eu.uk' matches all addresses starting with 'eu' but not those starting with 'eu.uk'
Note::
- Address exclusion will always takes precedence over address inclusion.
- Address matching on cluster connections does not support wild-card matching.
connector-ref
. This is the connector which will be sent to other nodes in the cluster so they have the correct cluster topology.This parameter is mandatory.
check-period
. The period (in milliseconds) used to check if the cluster connection has failed to receive pings from another server. Default is 30000.connection-ttl
. This is how long a cluster connection should stay alive if it stops receiving messages from a specific node in the cluster. Default is 60000.min-large-message-size
. If the message size (in bytes) is larger than this value then it will be split into multiple segments when sent over the network to other cluster members. Default is 102400.call-timeout
. When a packet is sent via a cluster connection and is a blocking call, i.e. for acknowledgements, this is how long it will wait (in milliseconds) for the reply before throwing an exception. Default is 30000.retry-interval
. We mentioned before that, internally, cluster connections cause bridges to be created between the nodes of the cluster. If the cluster connection is created and the target node has not been started, or say, is being rebooted, then the cluster connections from other nodes will retry connecting to the target until it comes back up, in the same way as a bridge does.This parameter determines the interval in milliseconds between retry attempts. It has the same meaning as the
retry-interval
on a bridge (as described in Core Bridges).This parameter is optional and its default value is
500
milliseconds.retry-interval-multiplier
. This is a multiplier used to increase theretry-interval
after each reconnect attempt, default is 1.max-retry-interval
. The maximum delay (in milliseconds) for retries. Default is 2000.initial-connect-attempts
. The number of times the system will try to connect a node in the cluster initially. If the max-retry is achieved this node will be considered permanently down and the system will not route messages to this node. Default is -1 (infinite retries).reconnect-attempts
. The number of times the system will try to reconnect to a node in the cluster. If the max-retry is achieved this node will be considered permanently down and the system will stop routing messages to this node. Default is -1 (infinite retries).use-duplicate-detection
. Internally cluster connections use bridges to link the nodes, and bridges can be configured to add a duplicate id property in each message that is forwarded. If the target node of the bridge crashes and then recovers, messages might be resent from the source node. By enabling duplicate detection any duplicate messages will be filtered out and ignored on receipt at the target node.This parameter has the same meaning as
use-duplicate-detection
on a bridge. For more information on duplicate detection, please see Duplicate Detection. Default is true.message-load-balancing
. This parameter determines if/how messages will be distributed between other nodes of the cluster. It can be one of three values -OFF
,STRICT
, orON_DEMAND
(default). This parameter replaces the deprecatedforward-when-no-consumers
parameter.If this is set to
OFF
then messages will never be forwarded to another node in the clusterIf this is set to
STRICT
then each incoming message will be round robin'd even though the same queues on the other nodes of the cluster may have no consumers at all, or they may have consumers that have non matching message filters (selectors). Note that Apache ActiveMQ Artemis will not forward messages to other nodes if there are no queues of the same name on the other nodes, even if this parameter is set toSTRICT
. UsingSTRICT
is like setting the legacyforward-when-no-consumers
parameter totrue
.If this is set to
ON_DEMAND
then Apache ActiveMQ Artemis will only forward messages to other nodes of the cluster if the address to which they are being forwarded has queues which have consumers, and if those consumers have message filters (selectors) at least one of those selectors must match the message. UsingON_DEMAND
is like setting the legacyforward-when-no-consumers
parameter tofalse
.Keep in mind that this message forwarding/balancing is what we call "initial distribution." It is different than redistribution which is discussed below. This distinction is important because redistribution is configured differently and has unique semantics (e.g. it does not support filters (selectors)).
Default is
ON_DEMAND
.max-hops
. When a cluster connection decides the set of nodes to which it might load balance a message, those nodes do not have to be directly connected to it via a cluster connection. Apache ActiveMQ Artemis can be configured to also load balance messages to nodes which might be connected to it only indirectly with other Apache ActiveMQ Artemis servers as intermediates in a chain.This allows Apache ActiveMQ Artemis to be configured in more complex topologies and still provide message load balancing. We'll discuss this more later in this chapter.
The default value for this parameter is
1
, which means messages are only load balanced to other Apache ActiveMQ Artemis serves which are directly connected to this server. This parameter is optional.confirmation-window-size
. The size (in bytes) of the window used for sending confirmations from the server connected to. So once the server has receivedconfirmation-window-size
bytes it notifies its client, default is 1048576. A value of -1 means no window.producer-window-size
. The size for producer flow control over cluster connection. it's by default disabled through the cluster connection bridge but you may want to set a value if you are using really large messages in cluster. A value of -1 means no window.call-failover-timeout
. Similar tocall-timeout
but used when a call is made during a failover attempt. Default is -1 (no timeout).notification-interval
. How often (in milliseconds) the cluster connection should broadcast itself when attaching to the cluster. Default is 1000.notification-attempts
. How many times the cluster connection should broadcast itself when connecting to the cluster. Default is 2.discovery-group-ref
. This parameter determines which discovery group is used to obtain the list of other servers in the cluster that this cluster connection will make connections to.
Alternatively if you would like your cluster connections to use a static list of servers for discovery then you can do it like this.
<cluster-connection name="my-cluster">
...
<static-connectors>
<connector-ref>server0-connector</connector-ref>
<connector-ref>server1-connector</connector-ref>
</static-connectors>
</cluster-connection>
Here we have defined 2 servers that we know for sure will that at least one will be available. There may be many more servers in the cluster but these will; be discovered via one of these connectors once an initial connection has been made.
Cluster User Credentials
When creating connections between nodes of a cluster to form a cluster
connection, Apache ActiveMQ Artemis uses a cluster user and cluster password which is
defined in broker.xml
:
<cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>CHANGE ME!!</cluster-password>
Warning
It is imperative that these values are changed from their default, or remote clients will be able to make connections to the server using the default values. If they are not changed from the default, Apache ActiveMQ Artemis will detect this and pester you with a warning on every start-up.
Client-Side Load balancing
With Apache ActiveMQ Artemis client-side load balancing, subsequent sessions created using a single session factory can be connected to different nodes of the cluster. This allows sessions to spread smoothly across the nodes of a cluster and not be "clumped" on any particular node.
The load balancing policy to be used by the client factory is configurable. Apache ActiveMQ Artemis provides four out-of-the-box load balancing policies, and you can also implement your own and use that.
The out-of-the-box policies are
Round Robin. With this policy the first node is chosen randomly then each subsequent node is chosen sequentially in the same order.
For example nodes might be chosen in the order B, C, D, A, B, C, D, A, B or D, A, B, C, D, A, B, C, D or C, D, A, B, C, D, A, B, C.
Use
org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
as the<connection-load-balancing-policy-class-name>
.Random. With this policy each node is chosen randomly.
Use
org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
as the<connection-load-balancing-policy-class-name>
.Random Sticky. With this policy the first node is chosen randomly and then re-used for subsequent connections.
Use
org.apache.activemq.artemis.api.core.client.loadbalance.RandomStickyConnectionLoadBalancingPolicy
as the<connection-load-balancing-policy-class-name>
.First Element. With this policy the "first" (i.e. 0th) node is always returned.
Use
org.apache.activemq.artemis.api.core.client.loadbalance.FirstElementConnectionLoadBalancingPolicy
as the<connection-load-balancing-policy-class-name>
.
You can also implement your own policy by implementing the interface
org.apache.activemq.artemis.api.core.client.loadbalance.ConnectionLoadBalancingPolicy
Specifying which load balancing policy to use differs whether you are
using JMS or the core API. If you don't specify a policy then the
default will be used which is
org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
.
The parameter connectionLoadBalancingPolicyClassName
can be set on the URI to
configure what load balancing policy to use:
tcp://localhost:61616?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
The set of servers over which the factory load balances can be determined in one of two ways:
Specifying servers explicitly in the URL. This also requires setting the
useTopologyForLoadBalancing
parameter tofalse
on the URL.Using discovery. This is the default behavior.
Specifying Members of a Cluster Explicitly
Sometimes you want to explicitly define a cluster more explicitly, that is control which server connect to each other in the cluster. This is typically used to form non symmetrical clusters such as chain cluster or ring clusters. This can only be done using a static list of connectors and is configured as follows:
<cluster-connection name="my-cluster">
<address/>
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors allow-direct-connections-only="true">
<connector-ref>server1-connector</connector-ref>
</static-connectors>
</cluster-connection>
In this example we have set the attribute
allow-direct-connections-only
which means that the only server that
this server can create a cluster connection to is server1-connector.
This means you can explicitly create any cluster topology you want.
Message Redistribution
Another important part of clustering is message redistribution. Earlier
we learned how server side message load balancing round robins messages
across the cluster. If message-load-balancing
is OFF
or ON_DEMAND
then messages won't be forwarded to nodes which don't have matching
consumers. This is great and ensures that messages aren't moved to a
queue which has no consumers to consume them. However, there is a
situation it doesn't solve: What happens if the consumers on a queue
close after the messages have been sent to the node? If there are no
consumers on the queue the message won't get consumed and we have a
starvation situation.
This is where message redistribution comes in. With message
redistribution Apache ActiveMQ Artemis can be configured to automatically
redistribute messages from queues which have no consumers back to
other nodes in the cluster which do have matching consumers. To enable
this functionality message-load-balancing
must be ON_DEMAND
.
Message redistribution can be configured to kick in immediately after the last consumer on a queue is closed, or to wait a configurable delay after the last consumer on a queue is closed before redistributing. By default message redistribution is disabled.
Message redistribution can be configured on a per address basis, by specifying the redistribution delay in the address settings. For more information on configuring address settings, please see Configuring Addresses and Queues via Address Settings.
Here's an address settings snippet from broker.xml
showing how message redistribution is enabled for a set of queues:
<address-settings>
<address-setting match="#">
<redistribution-delay>0</redistribution-delay>
</address-setting>
</address-settings>
The above address-settings
block would set a redistribution-delay
of
0
for any queue which is bound to any address. So the above would enable
instant (no delay) redistribution for all addresses.
The attribute match
can be an exact match or it can be a string that
conforms to the Apache ActiveMQ Artemis wildcard syntax (described in Wildcard Syntax).
The element redistribution-delay
defines the delay in milliseconds
after the last consumer is closed on a queue before redistributing
messages from that queue to other nodes of the cluster which do have
matching consumers. A delay of zero means the messages will be
immediately redistributed. A value of -1
signifies that messages will
never be redistributed. The default value is -1
.
It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.
Redistribution and filters (selectors)
Although "initial distribution" (described above) does support filters (selectors), redistribution does not support filters. Consider this scenario:
- A cluster of 2 nodes -
A
andB
- using aredistribution-delay
of0
and amessage-load-balancing
ofON_DEMAND
. A
andB
each has the queuefoo
.- A producer sends a message which is routed to queue
foo
on nodeA
. The message has property namedmyProperty
with a value of10
. - A consumer connects to queue
foo
on nodeA
with the filtermyProperty=5
. This filter doesn't match the message. - A consumer connects to queue
foo
on nodeB
with the filtermyProperty=10
. This filter does match the message .
Despite the fact that the filter of the consumer on queue foo
on node B
matches the message, the message will not be redistributed from node A
to
node B
because a consumer for the queue exists on node A
.
Not supporting redistribution based on filters was an explicit design decision in order to avoid two main problems - queue scanning and unnecessary redistribution.
From a performance perspective a consumer with a filter on a queue is already costly due to the scanning that the broker must do on the queue to find matching messages. In general, this is a bit of an anti-pattern as it turns the broker into something akin to a database where you can "select" the data you want using a filter. If brokers are configured in a cluster and a consumer with a filter connects and no matches are found after scanning the local queue then potentially every instance of that queue in the cluster would need to be scanned. This turns into a bit of a scalability nightmare with lots of consumers (especially short-lived consumers) with filters connecting & disconnecting frequently. The time & computing resources used for queue scanning would go through the roof.
It is also possible to get into a pathological situation where short-lived consumers with filters connect to nodes around the cluster and messages get redistributed back and forth between nodes without ever actually being consumed.
One common use-case for consumers with filters (selectors) on queues is
request/reply using a correlation ID. Following the standard pattern can be
problematic in a cluster due to the lack of redistribution based on filters
already described. However, there is a simple way to ensure an application
using this request/reply pattern gets its reply even when using a correlation
ID filter in a cluster - create the consumer before the request is sent. This
will ensure that when the reply is sent it will be routed the proper cluster
node since "initial distribution" (described above) does support filters.
For example, in the scenario outlined above if steps 3 and 5 were switched
(i.e. if the consumers were created before the message was sent) then the
consumer on node B
would in fact receive the message.
Cluster topologies
Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here
Symmetric cluster
A symmetric cluster is probably the most common cluster topology.
With a symmetric cluster every node in the cluster is connected to every other node in the cluster. In other words every node in the cluster is no more than one hop away from every other node.
To form a symmetric cluster every node in the cluster defines a cluster
connection with the attribute max-hops
set to 1
. Typically the
cluster connection will use server discovery in order to know what other
servers in the cluster it should connect to, although it is possible to
explicitly define each target server too in the cluster connection if,
for example, UDP is not available on your network.
With a symmetric cluster each node knows about all the queues that exist on all the other nodes and what consumers they have. With this knowledge it can determine how to load balance and redistribute messages around the nodes.
Don't forget this warning when creating a symmetric cluster.
Chain cluster
With a chain cluster, each node in the cluster is not connected to every node in the cluster directly, instead the nodes form a chain with a node on each end of the chain and all other nodes just connecting to the previous and next nodes in the chain.
An example of this would be a three node chain consisting of nodes A, B and C. Node A is hosted in one network and has many producer clients connected to it sending order messages. Due to corporate policy, the order consumer clients need to be hosted in a different network, and that network is only accessible via a third network. In this setup node B acts as a mediator with no producers or consumers on it. Any messages arriving on node A will be forwarded to node B, which will in turn forward them to node C where they can get consumed. Node A does not need to directly connect to C, but all the nodes can still act as a part of the cluster.
To set up a cluster in this way, node A would define a cluster connection that connects to node B, and node B would define a cluster connection that connects to node C. In this case we only want cluster connections in one direction since we're only moving messages from node A->B->C and never from C->B->A.
For this topology we would set max-hops
to 2
. With a value of 2
the knowledge of what queues and consumers that exist on node C would be
propagated from node C to node B to node A. Node A would then know to
distribute messages to node B when they arrive, even though node B has
no consumers itself, it would know that a further hop away is node C
which does have consumers.
Scaling Down
Apache ActiveMQ Artemis supports scaling down a cluster with no message loss (even for non-durable messages). This is especially useful in certain environments (e.g. the cloud) where the size of a cluster may change relatively frequently. When scaling up a cluster (i.e. adding nodes) there is no risk of message loss, but when scaling down a cluster (i.e. removing nodes) the messages on those nodes would be lost unless the broker sent them to another node in the cluster. Apache ActiveMQ Artemis can be configured to do just that.
The simplest way to enable this behavior is to set scale-down
to
true
. If the server is clustered and scale-down
is true
then when
the server is shutdown gracefully (i.e. stopped without crashing) it
will find another node in the cluster and send all of its messages
(both durable and non-durable) to that node. The messages are processed
in order and go to the back of the respective queues on the other node
(just as if the messages were sent from an external client for the first
time).
If more control over where the messages go is required then specify
scale-down-group-name
. Messages will only be sent to another node in
the cluster that uses the same scale-down-group-name
as the server
being shutdown.
Warning
If cluster nodes are grouped together with different
scale-down-group-name
values beware. If all the nodes in a single group are shut down then the messages from that node/group will be lost.
If the server is using multiple cluster-connection
then use
scale-down-clustername
to identify the name of the
cluster-connection
which should be used for scaling down.