当前位置:  数据库>oracle

通过JMS监听Oracle AQ,在数据库变化时触发执行Java程序

    来源: 互联网  发布时间:2017-06-20

    本文导语: 环境说明 本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。 一、Oracle高级消息队列AQ Oracle AQ是Ora...

环境说明

本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C支持多租户特性,相较于之前的Oracle版本,使用‘C##用户名‘表示用户,例如如果数据库用户叫kevin,则登陆时使用C##kevin进行登陆。

一、Oracle高级消息队列AQ

Oracle AQ是Oracle中的消息队列,是Oracle中的一种高级应用,每个版本都在不断的加强,使用DBMS_AQ系统包进行相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使用。使用AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进行数据传输。

下面分步骤说明如何创建Oracle AQ

1. 创建消息负荷payload

Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是用户自定义对象或XMLType或ANYDATA。本例中我们创建一个简单的对象类型用于传递消息。

create type demo_queue_payload_type as object (message varchar2(4000));
2. 创建队列表

队列表用于存储消息,在入队时自动存入表中,出队时自动删除。使用DBMS_AQADM包进行数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使用发布/订阅模式需要设置为true。

begin
  dbms_aqadm.create_queue_table(
    queue_table   => 'demo_queue_table',
    queue_payload_type => 'demo_queue_payload_type',
    multiple_consumers => false
  );
end;

执行完后可以查看oracle表中自动生成了demo_queue_table表,可以查看影响子段(含义比较清晰)。

3. 创建队列并启动

创建队列并启动队列:

begin
  dbms_aqadm.create_queue (
    queue_name  => 'demo_queue',
    queue_table => 'demo_queue_table'
  );

  dbms_aqadm.start_queue(
    queue_name  =>  'demo_queue'
  );
end;

至此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:

SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';

OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE
  • 1

我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。

消息多次处理出错等情况会自动转移到异常的队列,对于异常队列如何处理目前笔者还没有找到相应的写法,因为我使用的场景并不要求消息必须一对一的被处理,只要起到通知的作用即可。所以如果消息转移到异常队列,可以执行清空队列表中的数据

delete from demo_queue_table;
4. 队列的停止和删除

如果需要删除或重建可以使用下面的方法进行操作:

BEGIN
   DBMS_AQADM.STOP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE(
      queue_name => 'demo_queue'
      );
   DBMS_AQADM.DROP_QUEUE_TABLE(
      queue_table => 'demo_queue_table'
      );
END;
5. 入队消息

入列操作是一个基本的事务操作(就像往队列表Insert),因此我们需要提交。

declare
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  o_payload := demo_queue_payload_type('what is you name ?');

  dbms_aq.enqueue(
    queue_name  => 'demo_queue',
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );

  commit;
end;

通过SQL语句查看消息是否正常入队:

select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;
6. 出队消息

使用Oracle进行出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执行权限有关),代码如下,读者可以进行调试:

declare
  r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  DBMS_AQ.DEQUEUE(
    queue_name => 'demo_queue',
    dequeue_options => r_dequeue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );


  DBMS_OUTPUT.PUT_LINE(
    '***** Browse message is [' || o_payload.message || ']****'
  );

end;
二、Java使用JMS监听并处理Oracle AQ队列

Java使用JMS进行相应的处理,需要使用Oracle提供的jar,在Oracle安装目录可以找到:在linux中可以使用find命令进行查找,例如

find `pwd` -name 'jmscommon.jar'

需要的jar为:

  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
  • app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
  • app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
  • app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar
1. 创建连接参数类

实际使用时可以把参数信息配置在properties文件中,使用Spring进行注入。

package org.kevin.jms;
/**
 * 
 * @author 李文锴
 *  连接参数信息
 *
 */
public class JmsConfig {

    public String username = "c##kevin";
    public String password = "a111111111";
    public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
    public String queueName = "demo_queue";
}
  • 1
2. 创建消息转换类

因为消息载荷是Oracle数据类型,需要提供一个转换工厂类将Oracle类型转换为Java类型。

package org.kevin.jms;

import java.sql.SQLException;

import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;

/**
 * 
 * @author 李文锴 
 * 数据类型转换类
 *
 */
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
    public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    MutableStruct _struct;
    // 12表示字符串
    static int[] _sqlType = { 12 };
    static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
    static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();

    public static CustomDatumFactory getFactory() {
        return _MessageFactory;
    }

    public QUEUE_MESSAGE_TYPE() {
        _struct = new MutableStruct(new Object[1], _sqlType, _factory);
    }

    public Datum toDatum(OracleConnection c) throws SQLException {
        return _struct.toDatum(c, _SQL_NAME);
    }

    public CustomDatum create(Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public String getContent() throws SQLException {
        return (String) _struct.getAttribute(0);
    }

}
3. 主类进行消息处理
package org.kevin.jms;

import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;

import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

/**
 * 
 * @author 李文锴 消息处理类
 *
 */
public class Main {

    public static void main(String[] args) throws Exception {
        JmsConfig config = new JmsConfig();

        QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,
                new Properties());

        QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);
        AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

        conn.start();

        Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);
        MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("ok");

                AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;

                try {
                    QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
                    System.out.println(payload.getContent());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        Thread.sleep(1000000);
    }

}

使用Oracle程序块进行入队操作,在没有启动Java时看到队列表中存在数据。启动Java后,控制台正确的输出的消息;通过Oracle程序块再次写入消息,发现控制台正确处理消息。Java的JMS监听不是立刻进行处理,可能存在几秒中的时间差,时间不等。

三、监控表记录变化通知Java

下面的例子创建一个数据表,然后在表中添加触发器,当数据变化后触发器调用存储过程给Oracle AQ发送消息,然后使用Java JMS对消息进行处理。

1. 创建表

创建student表,包含username和age两个子段,其中username时varchar2类型,age时number类型。

2. 创建存储过程

创建send_aq_msg存储过程,因为存储过程中调用dbms数据包,系统包在存储过程中执行需要进行授权(使用sys用户进行授权):

grant execute on dbms_aq to c##kevin;
  • 1

注意存储过程中包含commit语句。

create or replace 
PROCEDURE send_aq_msg (info IN VARCHAR2) as
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload demo_queue_payload_type;
begin
  o_payload := demo_queue_payload_type(info);

  dbms_aq.enqueue(
    queue_name  => 'demo_queue',
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );

  commit;
end send_aq_msg;
3. 创建触发器

在student表中创建触发器,当数据写入或更新时,如果age=18,则进行入队操作。需要调用存储过程发送消息,但触发器中不能包含事物提交语句,因此需要使用pragma autonomous_transaction;声明自由事物:

CREATE OR REPLACE TRIGGER STUDENT_TR 
AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW 
DECLARE
pragma autonomous_transaction;
BEGIN
  if :new.age = 18 then
      send_aq_msg(:new.username);  
  end if;  
END;

创建完触发器后向执行插入或更新操作:

insert into student (username,age) values ('jack.lee.3k', 18);
update student set age=18 where username='jack003';

Java JMS可以正确的处理消息。


    
 
 
 
本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • 通过javascript实现DIV居中,兼容各浏览器版本
  • applet可以不通过数字签名,通过设置IE直接在本地访问本地文件吗
  • php通过socket_bind()设置IP地址代码示例
  • 我使用.net编译通过,但是使用g++编译不能通过。总是提示我undefined reference to ~myclass()
  • 通过javascript库JQuery实现页面跳转功能代码
  • 紧急求救!能通过jdbc怎样连接sqlsever 然后通过 for xml 关键字得到xml流吗?
  • c#通过委托delegate与Dictionary实现action选择器代码举例
  • 我想我的网站屏蔽掉通过某些网站过来的访问,我想通过htaccess 文件来做,请大家帮帮我。
  • linux下通过crond实现自动执行程序
  • 如何通过INTERNET访问通过共项一条线路上网的局域网中的机器???
  • IT科技资讯 iis7站长之家
  • 为什么g++编译通过了,而gcc却编译通过不了???
  • 通过docker run命令运行新的docker镜像
  • 请指点: 在windows下能否通过程序来获取linux下的用户列表,甚至通过自己写的windows程序界面增加修改linux的用户
  • 通过docker ps命令检查运行中的docker镜像
  • Jbuilder第一次编译说缺包,引入通过!然后把原来引入的注释,又通过!上帝,救我!
  • Session id实现通过Cookie来传输方法及代码参考
  • 红旗Linux主机可以通过127.0.0.1访问,但如何是连网的Win2000机器通过Linux的IP去访问Linux
  • 通过docker search命令搜索可用docker镜像
  • 请指点: 在windows下能否通过程序来获取linux下的用户列表,甚至通过自己写的windows程序界面增加修改linux的用户 100分相赠
  • Python3通过request.urlopen实现Web网页图片下载
  • 工作站Redhat Linux7.2如何通过NT4.0 proxy代理服务器上网,我不能通过其验证!请高手指点思路和方法!


  • 站内导航:


    特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3