同步接收
receive()方法接收消息叫同步接收一個(gè)線程在工作,接收到消息后,執(zhí)行結(jié)束只能接收一次消息,如果想不間斷地接收消息,寫一個(gè)while true循環(huán)。
使用監(jiān)聽器接收消息,這種接收方式叫異步接收,兩個(gè)線程在工作,一個(gè)負(fù)責(zé)接收消息,一個(gè)負(fù)責(zé)處理消息。
為了實(shí)現(xiàn)不間斷的監(jiān)聽接收消息,在開發(fā)代碼的時(shí)候,我們不應(yīng)該關(guān)閉連接。
注意
在同一個(gè)consumer中,我們不能同時(shí)使用這2種接收方式;
比如在使用listener的情況下,當(dāng)調(diào)用receive()方法將會(huì)獲得一個(gè)Exception;
● 監(jiān)聽器監(jiān)聽指定目的地的消息
● 如果有消息,那么監(jiān)聽器回調(diào)onMessage方法,并將消息傳遞給該方法
● 在該方法中對(duì)消息進(jìn)行處理
● 拷貝QueueReceiver類,新的類名字為QueueListenerReceiver
● 將對(duì)消息的處理放到監(jiān)聽器的onMessage方法中
messageConsumer = session.createConsumer(destination,messageSelector);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
//判斷消息類型是否為文本消息
if(message instanceof TextMessage){
String text = ((TextMessage) message).getText();
System.out.println(text);
}else if(message instanceof ObjectMessage){
User user = (User) ((ObjectMessage) message).getObject();
System.out.println("接收的對(duì)象:"+user.getId() +"::"+ user.getName()+"::"+user.getAge());
}else if(message instanceof MapMessage){
System.out.println(((MapMessage) message).getString("school")
+"辦學(xué)"+((MapMessage) message).getInt("age") +"年了");
}else if(message instanceof BytesMessage){
boolean flag = ((BytesMessage) message).readBoolean();
String s = ((BytesMessage) message).readUTF();
System.out.println(flag +":::"+s);
}else if(message instanceof StreamMessage){
Long lo = ((StreamMessage) message).readLong();
String s = ((StreamMessage) message).readString();
System.out.println(lo +":::"+s);
}
//手動(dòng)確認(rèn)消息,如果不確認(rèn),消息不會(huì)被成功消費(fèi)
message.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
});