什么是集群
集群就是將相同的程序、功能,部署在兩臺(tái)或多臺(tái)服務(wù)器上,這些服務(wù)器對(duì)外提供的功能是完全一樣的。
集群是通過(guò)不斷橫向擴(kuò)展增加服務(wù)器的方式,以提高服務(wù)的能力。
● 集群可以解決單點(diǎn)故障問(wèn)題
● 集群可以提高系統(tǒng)的可用性
● 集群可以提高系統(tǒng)的服務(wù)能力
通過(guò)共享存儲(chǔ)目錄(kahaDB)來(lái)實(shí)現(xiàn)master和slave的主從信息同步;
所有ActiveMQ的broker都在不斷地獲取共享目錄的控制權(quán),哪個(gè)broker搶到了控制權(quán),它就成為master,它將鎖定該目錄,其他broker就只能成為slave。
當(dāng)master主出現(xiàn)故障后,剩下的slave從將再進(jìn)行爭(zhēng)奪共享目錄的控制權(quán),誰(shuí)搶到共享目錄的控制權(quán),誰(shuí)就成為主,其他沒有搶到控制權(quán)的稱為從。
由于他們是基于共享目錄,所以當(dāng)主出現(xiàn)故障后,其上沒有被消費(fèi)的消息在接下來(lái)產(chǎn)生的新的master主中可以繼續(xù)進(jìn)行消費(fèi)。
這種方式客戶端訪問(wèn)的都是主,從只是起到了一個(gè)備份訪問(wèn)的作用
(1) 架構(gòu)圖

(2) 實(shí)現(xiàn)步驟
A、 安裝多個(gè)ActiveMQ
因?yàn)锳ctiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個(gè)ActiveMQ,我們這里復(fù)制3個(gè)ActiveMQ出來(lái)。
復(fù)制前,先將運(yùn)行的ActiveMQ停止。

B、 打開三個(gè)Xshell,分別連接不同的ActiveMQ方便操作

C、 配置每個(gè)activeMQ的conf /activemq.xml文件中的共享目錄
如果集群搭建在一臺(tái)機(jī)器上需要改端口,如果搭建在多臺(tái)上就不需要了
如果搭建在多臺(tái)服務(wù)器上,那么存放共享目錄的機(jī)器需要通過(guò)磁盤掛載的方式掛載到主從機(jī)器上。
● 修改三個(gè)ActiveMQ的共享目錄
persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
● 修改完持久化目錄后,需要在/opt目錄下創(chuàng)建該目錄

D、 配置每個(gè)activeMQ的conf /activemq.xml文件中的端口
為了避免端口號(hào)的沖突,前三個(gè)地址端口+1,后兩個(gè)端口地址-1,可以將文件下載下來(lái)替換。
一個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數(shù);
● wireFormat.maxFrameSize 表示一個(gè)完整消息的最大數(shù)據(jù)量,單位byte;
● 0.0.0.0表示任意ip
E、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺(tái))
第一個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
F、 啟動(dòng)三臺(tái)ActiveMQ,可以測(cè)試驗(yàn)證了
注意:?jiǎn)?dòng)后會(huì)有一段時(shí)間延時(shí),稍等一會(huì);
瀏覽器訪問(wèn)http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺(tái)能訪問(wèn)的是 master,不能訪問(wèn)的是 slave。
G、 修改11-activemq-java中的程序收發(fā)消息代碼
連接時(shí)使用故障轉(zhuǎn)義協(xié)議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發(fā)送以及接收idea控制臺(tái)輸出,停止ActiveMQ主,看效果
注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯(cuò),不能實(shí)現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測(cè)試,如果是非事務(wù)消息就注釋掉session.commit。
該方式與shared filesystem方式類似,只是共享的存儲(chǔ)介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫(kù)。
(1) 架構(gòu)圖

(2) 實(shí)現(xiàn)步驟
A、 安裝多個(gè)ActiveMQ(已做)
因?yàn)锳ctiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個(gè)ActiveMQ,我們這里復(fù)制3個(gè)ActiveMQ出來(lái)復(fù)制前,先將運(yùn)行的ActiveMQ停止。

B、 打開三個(gè)Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個(gè)activeMQ的conf /activemq.xml文件中的持久化適配器是jdbc數(shù)據(jù)庫(kù)方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
D、 配置每個(gè)數(shù)據(jù)庫(kù)連接池
注意:連接池的配置需要配置在的外面
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=utf8&useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
E、 在每個(gè)ActiveMQ的lib目錄下加入mysql的驅(qū)動(dòng)包和數(shù)據(jù)庫(kù)連接池Druid包,該包在我提供的資料05-ActiveMQ\resources\lib下
可以通過(guò)Xftp或者rz命令上傳。
F、 啟動(dòng)MySQL數(shù)據(jù)庫(kù),并創(chuàng)建activemq數(shù)據(jù)庫(kù)

G、 配置每個(gè)activeMQ的conf /activemq.xml文件中的端口(已做)
如果集群搭建在一臺(tái)機(jī)器上需要改端口,如果搭建在多臺(tái)上就不需要了;
為了避免端口號(hào)的沖突,前三個(gè)地址端口+1,后兩個(gè)端口地址-1。
第一個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個(gè)ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數(shù);
● wireFormat.maxFrameSize 表示一個(gè)完整消息的最大數(shù)據(jù)量,單位byte;
● 0.0.0.0表示任意ip
H、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺(tái)) (已做)
第一個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個(gè)ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
I、啟動(dòng)三臺(tái)ActiveMQ,可以測(cè)試驗(yàn)證了
瀏覽器訪問(wèn)http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺(tái)能訪問(wèn)的是 master,不能訪問(wèn)的是 slave。
J、 修改11-activemq-java中的程序收發(fā)消息類(已做)
連接時(shí)使用故障轉(zhuǎn)義協(xié)議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
session.commit();
}
查看發(fā)送以及接收idea控制臺(tái)輸出,停止ActiveMQ主,看效果
注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯(cuò),不能實(shí)現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測(cè)試,如果是非事務(wù)消息就注釋掉session.commit。
3、Replicated LevelDB Store方式主從集群(常用)
基于可復(fù)制的LevelDB存儲(chǔ)方式的集群;
這種集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper從一組broker中協(xié)調(diào)選擇一個(gè)broker作為master主,其他broker作為slave從的模式。所有slave從節(jié)點(diǎn)通過(guò)復(fù)制master主節(jié)點(diǎn)的消息來(lái)實(shí)現(xiàn)消息同步,當(dāng)主出現(xiàn)故障后,沒有被消費(fèi)的消息在從服務(wù)器上也同步了一份,所以不會(huì)有消息的丟失。
LevelDB 是 Google開發(fā)的一套用于持久化數(shù)據(jù)的高性能kv數(shù)據(jù)庫(kù),ActiveMQ利用該數(shù)據(jù)庫(kù)進(jìn)行數(shù)據(jù)的存儲(chǔ)。
只有master 接受客戶端連接,slave不接受客戶端連接,Master的所有存儲(chǔ)操作都將被復(fù)制到slaves。
在這個(gè)模式中,需要有半數(shù)以上的broker是正常的,集群才是可用的,超過(guò)半數(shù)broker故障,ZooKeeper的選舉算法將不能選擇master,從而導(dǎo)致集群不可用。
(1)架構(gòu)圖

(2) 實(shí)現(xiàn)步驟
A、 安裝多個(gè)ActiveMQ(已做)
因?yàn)锳ctiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個(gè)ActiveMQ,我們這里復(fù)制3個(gè)ActiveMQ出來(lái)。
復(fù)制前,先將運(yùn)行的ActiveMQ停止

B、 打開三個(gè)Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個(gè)activeMQ的conf /activemq.xml文件中的持久化適配器replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="localhost:2181"/>
</persistenceAdapter>
參數(shù)說(shuō)明
● replicas :集群中存在的節(jié)點(diǎn)的數(shù)目
● bind :當(dāng)該節(jié)點(diǎn)成為master后,將使用該bind配置的ip和端口進(jìn)行數(shù)據(jù)復(fù)制
● zkAddress :ZooKeeper的地址
D、 啟動(dòng)ZooKeeper服務(wù)器

E、 啟動(dòng)三臺(tái)ActiveMQ,可以測(cè)試驗(yàn)證了
瀏覽器訪問(wèn)http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺(tái)能訪問(wèn)的是 master,不能訪問(wèn)的是 slave。
F、 修改11-activemq-java中的程序收發(fā)消息類(已做)
連接時(shí)使用故障轉(zhuǎn)義協(xié)議failover
ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發(fā)送以及接收idea控制臺(tái)輸出,停止ActiveMQ主,看效果。
G、 把其中的一臺(tái)master關(guān)閉,留下兩臺(tái)運(yùn)行,觀察效果
H、 繼續(xù)關(guān)閉下一臺(tái)master,留下一臺(tái)運(yùn)行,觀察效果
I、 啟動(dòng)其中一臺(tái),讓兩個(gè)運(yùn)行,再觀察效果
(3) 總結(jié)
這種方式,不適合集群太大,也就是activemq不能太多,因?yàn)槎鄠€(gè)activemq之間需要復(fù)制消息,這個(gè)比較耗資源,占用網(wǎng)絡(luò),建議3、5臺(tái)。