(integer) 1
redis 127.0.0.1:6379> hset uset user3 8000
(integer) 1
redis 127.0.0.1:6379> hset user user3 8000
(integer) 1
redis 127.0.0.1:6379> hset user user4 6000 user5 7000
(error) ERR wrong number of arguments for 'hset' command
redis 127.0.0.1:6379> hmset user user4 6000 user5 7000
OK
redis 127.0.0.1:6379> hgetall user
1) "user2"
2) "9000"
3) "user3"
4) "8000"
5) "user4"
6) "6000"
7) "user5"
8) "7000"
redis 127.0.0.1:6379> hmget user user2 user5
1) "9000"
2) "7000"
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> keys *user*
1) "user.uid.69999"
2) "user.uid.u123456"
3) "user"
4) "user.uid.63806380"
redis 127.0.0.1:6379> type user
hash
redis 127.0.0.1:6379> hgetall user
1) "user2"
2) "9000"
3) "user3"
4) "8000"
5) "user4"
6) "6000"
7) "user5"
8) "7000"
redis 127.0.0.1:6379>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <!-- 在localhost:7001生产消息然后消息被转到localst:7000然后再转到远程114.113.145.63:61616,在远程进行消息处理--> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.base}/conf/credentials.properties</value> </property> </bean> <bean id="mainBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop"> <property name="brokerName" value = "mainBroker"/> <property name="persistent" value="false"/> <property name="transportConnectorURIs"> <list> <!--第一个被转的目的地的监听地址--> <value>tcp://localhost:7000</value> </list> </property> <property name="jmsBridgeConnectors"> <list> <bean class="org.apache.activemq.network.jms.JmsQueueConnector"> <property name="outboundQueueConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!--第二个被转的目的地--> <property name="brokerURL" value="tcp://114.113.145.63:61616" /> </bean> </property> <property name="outboundQueueBridges"> <list> <bean class="org.apache.activemq.network.jms.OutboundQueueBridge"> <constructor-arg value="messages.input"/> </bean> </list> </property> </bean> </list> </property> </bean> <bean id="bridgedBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop"> <property name="brokerName" value = "bridgedBroker"/> <property name="persistent" value="false"/> <property name="transportConnectorURIs"> <list> <!--生产地址--> <value>tcp://localhost:7001</value> </list> </property> <property name="jmsBridgeConnectors"> <list> <bean class="org.apache.activemq.network.jms.JmsQueueConnector"> <property name="outboundQueueConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!--第一个被转的目的地--> <property name="brokerURL" value="tcp://localhost:7000" /> </bean> </property> <property name="outboundQueueBridges"> <list> <bean class="org.apache.activemq.network.jms.OutboundQueueBridge"> <!--对这类消息进行转发--> <constructor-arg value="messages.input"/> </bean> </list> </property> </bean> </list> </property> </bean> <!-- Enable web consoles, REST and Ajax APIs and demos It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/> </beans> <!-- END SNIPPET: example -->
如果只通过brokerB来消费消息,可以实现桥接。
如果通过brokerA或brokerB来消费消息,则可实现负载分担。
配置的关键点在于:
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/> </networkConnectors>
ActiveMQ已经做的很好了,在Conf的目录下有现成的配置文件
activemq-static-network-broker1.xml
主要信息摘录:
<networkConnectors> </networkConnectors>
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> </transportConnectors>
activemq-static-network-broker2.xml
主要信息摘录:
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/> </networkConnectors>
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/> </transportConnectors>
我们需要做的就是将ActiveMQ复制1份,单独放置到另外的文件夹中,这样我们就有了2份activemq啦。
分别将第一份中的activemq-static-network-broker1.xml 改名为activemq.xml 第二份中的activemq-static-network-broker2.xml改名为activemq.xml
分别启动2个activemq,日志输出分别如下
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi INFO | ActiveMQ 5.5.0 JMS Message Broker (static-broker1) is starting INFO | For help or more information please see: http://activemq.apache.org/ INFO | Listening for connections at: tcp://SONGJIE:61616 INFO | Connector openwire Started INFO | ActiveMQ JMS Message Broker (static-broker1, ID:SONGJIE-60899-1369726682642-0:1) started INFO | Connector vm://static-broker1 Started INFO | Started responder end of duplex bridge NC@ID:SONGJIE-60962-1369726697807-0:1 INFO | Network connection between vm://static-broker1#0 and tcp:///127.0.0.1:60965(static-broker1) has been established.
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1100/jmxrmi INFO | KahaDB is version 3 INFO | Recovering from the journal ... INFO | Recovery replayed 2 operations from the journal in 0.019 seconds. INFO | ActiveMQ 5.5.0 JMS Message Broker (static-broker1) is starting INFO | For help or more information please see: http://activemq.apache.org/ INFO | Listening for connections at: tcp://SONGJIE:61618 INFO | Connector openwire Started INFO | Establishing network connection from vm://static-broker1?async=false&network=true to tcp://localhost:61616 INFO | Connector vm://static-broker1 Started INFO | Network Connector DiscoveryNetworkConnector:NC:BrokerService[static-broker1] Started INFO | ActiveMQ JMS Message Broker (static-broker1, ID:SONGJIE-60962-1369726697807-0:1) started INFO | Network connection between vm://static-broker1#0 and tcp://localhost/127.0.0.1:61616(static-broker1) has been established.
可见启动成功了。
需要验证一下
一个生产者,生产到broker1,端口为61616,每3秒生产一个消息,总计生产20个。
2个消费着,一个消费者到broker1的61616消费消息,另外一个消费者到broker2的61618端口消费消息。
按照顺序启动生产者、第一消费者、第二消费者,在启动第二消费者的时候需要等第一消费者开始消费的时候再进行启动,以便验证消息被第一消费者消费之后,是否会被第二消费者重复消费。
第一消费者日志,前10个日志略掉。
11 2013-5-28 15:39:40 ActiveMQObjectMessage {commandId = 21, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:13, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:13, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726783382, arrival = 0, brokerInTime = 1369726786422, brokerOutTime = 1369726786422, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@f4d6b3, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} ###ActiveMQObjectMessage {commandId = 21, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:13, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:13, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726783382, arrival = 0, brokerInTime = 1369726786422, brokerOutTime = 1369726786422, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@f4d6b3, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} org.apache.activemq.command.ActiveMQObjectMessage 12 2013-5-28 15:39:43 ActiveMQObjectMessage {commandId = 22, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:14, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:14, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726786421, arrival = 0, brokerInTime = 1369726789439, brokerOutTime = 1369726789440, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@85f3d6, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} ###ActiveMQObjectMessage {commandId = 22, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:14, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:14, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726786421, arrival = 0, brokerInTime = 1369726789439, brokerOutTime = 1369726789440, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@85f3d6, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} org.apache.activemq.command.ActiveMQObjectMessage 13 2013-5-28 15:39:46 ActiveMQObjectMessage {commandId = 23, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:15, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:15, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726789438, arrival = 0, brokerInTime = 1369726792472, brokerOutTime = 1369726792473, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@17af435, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} ###ActiveMQObjectMessage {commandId = 23, responseRequired = false, messageId = ID:SONGJIE-61158-1369726746561-0:1:1:1:15, originalDestination = null, originalTransactionId = TX:ID:SONGJIE-61158-1369726746561-0:1:15, producerId = ID:SONGJIE-60899-1369726682642-2:1:1:1, destination = queue://TOOL.DEFAULT, transactionId = null, expiration = 0, timestamp = 1369726789438, arrival = 0, brokerInTime = 1369726792472, brokerOutTime = 1369726792473, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@17af435, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, read