在我们的记录系统中,我们可能想订阅不仅根据严重性的日志,还可以根据源发出日志。你可能知道这个概念从UNIX 系统日志工具,它记录路由基于两严重性(信息/警告/暴击...)和设备(AUTH / cron/ KERN ...)。
这会给我们很大的灵活性 - 我们可能想听听来自通过“cron”也是从"kern"来的所有日志的严重错误。
为了实现我们的测井系统,我们需要了解一个更复杂的话题交换(topic exchange)。
主题交流发送到一个主题交换的消息不能有任意 routing_key的 -它必须是一个单词列表,以点分隔。这句话可以是任何东西,但他们通常指定连接到该消息的某些功能。一些有效的路由主要例子:“ stock.usd.nyse “,”nyse.vmw “,” quick.orange.rabbit “。有很多的话可以在路由的关键,只要你喜欢,高达255字节的限制。
绑定键(bingding key)也必须以同样的形式。话题交换背后的逻辑 是类似于一个直接交换 -一个消息发送给特定的路由键(routing
key)将被传递到所有队列匹配结合键约束。然而,有两个重要的特殊情况,绑定键(banding keys):
这是最简单的一个例子解释:
在这个例子中,我们将发送描述动物的消息。该消息将被发送一个路由键包含三个字(两个点)。在路由键将第一个字形容速度快,第二个颜色和第三物种:“ 兼容的<speed>.<colour>.<species> “。
我们创建了三个绑定:Q1势必具有约束力键“ *.orange.* “和Q2与” *.*.rabbit “和” lazy.# “。
这些绑定可以概括为:
ActiveMQ持久化
消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。
消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。
1、AMQ
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。默认配置如下:
<persistenceAdapter> <amqPersistenceAdapter directory="activemq-data"maxFileLength="32mb"/> </persistenceAdapter>
属性如下:
属性名称
默认值
描述
directory
activemq-data
消息文件和日志的存储目录
useNIO
true
使用NIO协议存储消息
syncOnWrite
false
同步写到磁盘,这个选项对性能影响非常大
maxFileLength
32Mb
一个消息文件的大小
persistentIndex
true
消息索引的持久化,如果为false,那么索引保存在内存中
maxCheckpointMessageAddSize
4kb
一个事务允许的最大消息量
cleanupInterval
30000
清除操作周期,单位ms
indexBinSize
1024
索引文件缓存页面数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,导致一定时间的阻塞,所以这个值应该调整到比较大,但是代码中实现会动态伸缩,调整效果并不理想。
indexKeySize
96
索引key的大小,key是消息ID
indexPageSize
16kb
索引的页大小
directoryArchive
archive
存储被归档的消息文件目录
archiveDataLogs
false
当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
2、KahaDB
KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。默认配置如下:
<persistenceAdapter> <kahaDB directory="activemq-data"journalMaxFileLength="32mb"/> </persistenceAdapter>
KahaDB的属性如下:
属性名称
默认值
描述
directory
activemq-data
消息文件和日志的存储目录
indexWriteBatchSize
1000
一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中
indexCacheSize
10000
内存中,索引的页大小
enableIndexWriteAsync
false
索引是否异步写到消息文件中
journalMaxFileLength
32mb
一个消息文件的大小
enableJournalDiskSyncs
true
是否讲非事务的消息同步写入到磁盘
cleanupInterval
30000
清除操作周期,单位ms
checkpointInterval
5000
索引写入到消息文件的周期,单位ms
ignoreMissingJournalfiles
false
忽略丢失的消息文件,false,当丢失了消息文件,启动异常
checkForCorruptJournalFiles
false
检查消息文件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFiles
false
产生一个checksum,以便能够检测journal文件是否损坏。
5.4版本之后有效的属性:
archiveDataLogs
false
当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
directoryArchive
null
存储被归档的消息文件目录
databaseLockedWaitDelay
10000
在使用负载时,等待获得文件锁的延迟时间,单位ms
maxAsyncJobs
10000
同个生产者产生等待写入的异步消息最大量
concurrentStoreAndDispatchTopics
false
当写入消息的时候,是否转发主题消息
concurrentStoreAndDispatchQueues
true
当写入消息的时候,是否转发队列消息
5.6版本之后有效的属性:
archiveCorruptedIndex
false
是否归档错误的索引
每个KahaDB的实例都可以配置单独的适配器,如果没有目标队列提交给filteredKahaDB,那么意味着对所有的队列有效。如果一个队列没有对应的适配器,那么将会抛出一个异常。配置如下:
<persistenceAdapter> <mKahaDBdirectory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- match all queues --> <filteredKahaDBqueue=">"> <persistenceAdapter> <kahaDBjournalMaxFileLength="32mb"/> </persistenceAdapter> </filteredKahaDB> <!-- match all destinations --> <filteredKahaDB> <persistenceAdapter> <kahaDBenableJournalDiskSyncs="false"/> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
如果filteredKahaDB的perDestination属性设置为true,那么匹配的目标队列将会得到自己对应的KahaDB实例。配置如下:
<persistenceAdapter> <mKahaDBdirectory="${activemq.base}/data/kahadb"> <filteredPersistenceAdapters> <!-- kahaDB per destinations --> <filteredKahaDB perDestination="true"> <persistenceAdapter> <kahaDBjournalMaxFileLength="32mb" /> </persistenceAdapter> </filteredKahaDB> </filteredPersistenceAdapters> </mKahaDB> </persistenceAdapter>
3、JDBC
可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。
配置JDBC适配器:
<persistenceAdapter> <jdbcPersistenceAdapterdataSource="#mysql-ds" createTablesOnStartup="false" /> </persistenceAdapter>
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认
MQ简介:
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBMWEBSPHERE MQ。
MQ特点:
MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
使用场景:
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
JMS简介:
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
定义:
JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
简介:
JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
JMS和MQ的关系:
JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
支持JMS的开源MQ:
目前选择的最多的是ActiveMQ。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。
优点:是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
缺点:ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。
ActiveMQ应用场景:
1、 不同语言应用集成
ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。
2、 作为RPC的替代
使用RPC同步调用的应用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,但是转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。使用同步请求的系统在规模上有较大的限制,因为请求会被阻塞,从而导致整个系统变慢。如果使用异步消息替代,可以很容易增加额外的消息接收者,使得消息能被并发消耗,从而加快请求处理。当然,你的系统应用间应该是解耦的。
3、 应用之间解耦
正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。不仅可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。相比使用同步的系统(调用者必须等待被调用者返回信息),异步系统(调用方发送消息后就不管,即fire-and-forget)能够给我们带来事件驱动架构(event-driven architecture EDA)。
4、 作为事件驱动架构的主干
解耦,异步架构的系统允许通过代理器自己配置更多的客户端,内存等(即vertical scalability)来扩大系统,而不是增加更多的代理器(即horizontal scalability)。考虑如亚马逊这样繁忙的电子商务系统。当用户购买物品,事实上系统需要很多步骤去处理,包括下单,创建发票,付款,执行订单,运输等。但是用户下单后,会立即返回“谢谢你下单”的界面。不只是没有延迟,而且用户还会受到一封邮件表明订单已经收到。在亚马逊下单的例子就是一个多步处理的例子。每一步都由单独的服务去处理。当用户下单是,有一个同步的体积表单动作,但整个处理流程并不通过浏览器同步处理。相反地,订单马上被接受和反馈。而剩下的步骤就通过异步处理。如果在处理过程中出错,用户会通过邮件收到通知。这样的异步处理能提供高负载和高可用性。
5、 提高系统扩展性
很多使用事件驱动设计的系统是为了获得高可扩展性,例如电子商务,政府,制造业,线上游戏等。通过异步消息分开商业处理步骤给各个应用,能够带来很多可能性。考虑设计一个应用来完成一项特殊的任务。这就是面向服务的架构(service-oriented architecture SOA)。每一个服务完成一个功能并且只有一个功能。应用就通过服务组合起来,服务间使用异步消息和最终一致性。这样的设计便可以引入一个复杂事件处理概念(complex event processing CEP)。使用CEP,部件间的交互可以被记录追踪。在异步消息系统中,可以很容易在部件间增加一层处理。
其他开源JMS供应商;
jbossmq(jboss 4)
jboss messaging (jboss 5)
joram-4.3.21 2006-09-22
openjms-0.7.7-alpha-3.zipDecember 26,2005
mantamq
ubermq
SomnifugiJMS 2005-7-27
开源的JMSProvider大部分都已经停止发展了,剩下的几个都是找到了东家,和某种J2EE 服务器挂钩,比如jbossmq 与jboss,joram与jonas(objectweb
▪机器学习理论与实战(二)决策树 ▪Hibernate(四)——全面解析一对多关联映射 ▪我所理解的设计模式(C++实现)——解释器模... iis7站长之家