javamq取消息代碼 java mq消息隊列詳解

java如何獲取rabbitmq隊列中消息數(shù)量

下面是RabbitMQ的消息確認機制:“為了確保消息不會丟失,RabbitMQ支持消息確認機制??蛻舳嗽诮邮艿较⒉⑻幚硗旰?,可以發(fā)送一個ack消息給RabbitMQ,告訴它該消息可以安全的刪除了。假如客戶端在發(fā)送ack之前意外死掉了,那么RabbitMQ會將消息投遞到下一個consumer客戶端。如果有多個consumer客戶端,RabbitMQ在投遞消息時是輪詢的。RabbitMQ如何判斷客戶端死掉了?唯一根據(jù)是客戶端連接是否斷開。這里沒有超時機制,也就是說客戶端可以處理一個消息很長時間,只要沒斷開連接,RabbitMQ就一直等待ack消息?!蔽椰F(xiàn)在遇到的問題是這樣的:我這邊有幾條線程去消息隊列里取數(shù)據(jù),但是會有異常數(shù)據(jù)導(dǎo)致線程掛掉,就是上邊的“客戶端在發(fā)送ack之前意外死掉了”,RabbitMQ會將消息投遞到下一個consumer客戶端,這樣一條異常數(shù)據(jù)會把我的所有線程掛掉,我現(xiàn)在想實現(xiàn)這樣的功能:如果有異常數(shù)據(jù)導(dǎo)致進程掛掉,那么我不讓RabbitMQ將這條消息投遞到下一個consumer客戶端,而是放到另一個地方或者另外處理,請問該如何實現(xiàn)呢?

成都創(chuàng)新互聯(lián)公司專注于中大型企業(yè)的成都網(wǎng)站設(shè)計、做網(wǎng)站和網(wǎng)站改版、網(wǎng)站營銷服務(wù),追求商業(yè)策劃與數(shù)據(jù)分析、創(chuàng)意藝術(shù)與技術(shù)開發(fā)的融合,累計客戶上千,服務(wù)滿意度達97%。幫助廣大客戶順利對接上互聯(lián)網(wǎng)浪潮,準確優(yōu)選出符合自己需要的互聯(lián)網(wǎng)運用,我們將一直專注品牌網(wǎng)站建設(shè)和互聯(lián)網(wǎng)程序開發(fā),在前進的路上,與客戶一起成長!

java使用mq get api從mq中取數(shù)據(jù)怎樣觸發(fā)偵聽器連續(xù)取數(shù)據(jù)

{

//前面是準備管理器和隊列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//讀取五秒超時,這里目的是要有個讀取阻塞,和Socket編程類似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){{

//前面是準備管理器和隊列

MQQueueManager qMgr = new MQQueueManager(qManager);

int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;

MQQueue queue = qMgr.accessQueue(qName, openOptions);

MQMessage rcvMessage = new MQMessage();

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;

//讀取五秒超時,這里目的是要有個讀取阻塞,和Socket編程類似。

gmo.waitInterval = 5000;

queue.get(rcvMessage, gmo);

//后面就是操作消息的部分【略】

}catch(Exception e){

java怎么將mq接收的文件消息提取出來

WebSphere MQ 接收發(fā)送

添加mq jar

類介紹:

SendMSG:消息發(fā)送類。

Main():主方法。

SendMSG():消息發(fā)送方法。

方法描述:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

package test;

public class SendMSG{

MQEnvironment.hostname = "192.168.10.201";

//通道類型為服務(wù)器連接通道

MQEnvironment.channel = "tongdao";

MQEnvironment.CCSID = 1381;

//消息隊列端口號

MQEnvironment.port = 10618;

try{

//建立隊列管理器QM_SERVER為隊列管理器名稱

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUTMQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列

MQQueue queue = qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message = new MQMessage();

message.format = MQC.MQFMT_STRING;

message.characterSet = 1381;

message.writeString("王輝");

message.expiry = -1;//設(shè)置消息用不過期

queue.put(message);//將消息放入隊列

queue.close();//關(guān)閉隊列

qMgr.disconnect();//斷開連接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

ReceiveMSG:消息接收類。

Main():主方法。

ReceiveMSG():消息接收方法。

public class ReceiveMSG {

MQEnvironment.hostname="192.168.10.201";//通道類型為服務(wù)器連接通道

MQEnvironment.channel="tongdao";

MQEnvironment.CCSID=1381;

MQEnvironment.port=10618;

try{

//建立隊列管理器QM_SERVER為隊列管理器名稱

MQQueueManager qMgr = new MQQueueManager("test");

int openOptions=MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;//建立隊列INITQ隊列名稱INITQ為本地隊列

MQQueue queue=qMgr.accessQueue("wanghui",openOptions,null,null,null);

System.out.println("成功建立通道");

MQMessage message= new MQMessage();

message.format=MQC.MQFMT_STRING;

message.characterSet=1381;

//從隊列中獲取消息

MQGetMessage Optionspmo=new MQGetMessageOptions();

queue.get(message,pmo);

Stringchars=message.readLine();

System.out.println(chars);

queue.close();//關(guān)閉隊列

qMgr.disconnect();//斷開連接

}catch(EOFExceptione){

e.printStackTrace();

}catch(MQExceptione){

e.printStackTrace();

}catch(Exceptione){

e.printStackTrace();

}

}

用java代碼如何設(shè)置activemq消息持久化到數(shù)據(jù)庫中?

ActiveMQ持久化消息的二種方式;

1、持久化為文件

這個裝ActiveMQ時默認就是這種,只要設(shè)置消息為持久化就可以了。涉及到的配置和代碼有:

persistenceAdapter

kahaDB directory="${activemq.base}/data/kahadb"/

/persistenceAdapter

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

2、持久化為MySql

首先需要把MySql的驅(qū)動放到ActiveMQ的Lib目錄下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

接下來修改配置文件

persistenceAdapter

jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/

/persistenceAdapter

在配置文件中的broker節(jié)點外增加

bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"

property name="driverClassName" value="com.mysql.jdbc.Driver"/

property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/

property name="username" value="activemq"/

property name="password" value="activemq"/

property name="maxActive" value="200"/

property name="poolPreparedStatements" value="true"/

/bean

從配置中可以看出數(shù)據(jù)庫的名稱是activemq,需要手動在MySql中增加這個庫。

然后重新啟動消息隊列,會發(fā)現(xiàn)多了3張表

1:activemq_acks

2:activemq_lock

3:activemq_msgs

網(wǎng)頁標題:javamq取消息代碼 java mq消息隊列詳解
當前地址:http://muchs.cn/article38/doesdpp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、企業(yè)網(wǎng)站制作外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計公司、品牌網(wǎng)站制作、品牌網(wǎng)站設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管