ITPub博客

首页 > Linux操作系统 > Linux操作系统 > 使用JMS和ESB构建强大而可靠的SOA之二

使用JMS和ESB构建强大而可靠的SOA之二

原创 Linux操作系统 作者:lambor_zhou 时间:2009-02-03 22:36:22 0 删除 编辑
引言

  在本系列的第 1 部分, 我们向您介绍了SOA一系列概念,包括企业服务总线(Enterprise Service Bus,ESB)的概念,以及 Java Message Service (JMS) 之类可靠的标准化 API 可以如何帮助保证服务使用者、服务提供者和总线之间的通信服务质量。我们还了解了 WebSphere ESB 产品提供的 ESB 实现,该实现基于一个新编程模型,即服务组件体系结构(Service Component Architecture,SCA)。SCA 描述导出和导入绑定的概念,我们可以将此类绑定用于通过 JMS 与中介流组件进行交互。所需的所有构件都可以使用 IBM WebSphere Integration Developer 工具进行构建和部署。

  在第 2 部分,我们将开始应用这些概念,并演示如何构建实际的应用程序。

  用例

  在 实际的 SOA 业务应用程序——特别是涉及到异类 IT 基础设施的应用程序以及希望对这些 IT 基础设施提供的服务间的工作流进行组合以形成松散耦合的交互的应用程序中,服务使用者和提供者间的消息流并不需要采用同步方式处理。由于 JMS 作为面向消息的中间件标准得到了广泛的应用,因此经常用作同步和异步服务调用的首选协议。

  有很多典型的场景,其中服务间的连接可以使用 JMS 进行传输:电子政务、电子商务或工业制造就是其中的一些例子。

  示例 1

  一 个具体的用例就是处理会计系统中的文档。影响公司财务的文档(如购买货物的发票)在企业中多个系统流动。其中很多都必须采用允许以后进行审计的方式处理 (某些情况下,这是法律强制要求的做法)。这意味着此类文档的流通必须能够跟踪,而这又要求传输协议要十分可靠。而且,所涉及的系统之间的很多交互都实现 了具有异步性本质的工作流。因此,所选的协议既要支持消息的可靠事务性交换,也要支持各种消息传递模式(异步和同步调用以及发布/订阅消息)。

  同时,要对所有消息进行定向,使其通过企业服务总线,以便能够应用其他功能;如基于内容的动态路由或数据转换以及日记记录和日志记录。

  此会计示例代表了一个场景,在此场景中,解决方案采用面向服务的体系结构构建的,但并不一定会使用 Web 服务;在我们的例子中,服务通过交换普通 JMS 消息进行通信。为此类解决方案部署 ESB 仍然十分可行。

  示例 2

  让 我们看看另一个场景,制造行业的一个例子:某个公司制造需使用大量部件的产品,其中一些部件来自内部的工厂,而其他从外部供应商处购买。该公司希望加速其 库存周转,降低其库存水平,从而降低内部成本和提高其产品的上市时间。实现此目标的一个方法是与其业务合作伙伴(包括供应商)建立更紧密的关系。

  通常,制造商将利用生产计划系统(production planning system,PPS)来协调其内部生产与外部供应链。在 PPS 中,内部部件的库存较低时,将生成生产请求,以生产此部件。

  图 1. 生产计划系统——概略体系结构

  图 1. 生产计划系统——概略体系结构

  除 了使用内部工厂提供的部件外,该制造商还从外部供应商购买其他部件。为了提高所涉及各方的集成水平,该制造商需要将其 PPS 与其若干供应商系统集成,以便自动交换供求信息。为了将 PPS 系统连接到任意数量的供应商(每个供应商都采用自己独特的协议和数据格式),可能必须开发大量的代码。

  很多要集成的现有系统都使用 JMS 作为外部消息传递协议。其他系统(特别是最近构建的系统)可能支持 Web 服务。在此情况下,制造商决定建立 ESB 来集中处理协议和消息格式的转换,从而将对现有系统的任何影响降到最低。例如,外部接口使用 JMS 的现有系统可以与 Web 服务交互,并将处理不同协议的细节委托给 ESB。

  测试应用程序

  我们在上面了解了作为协议的 JMS 如何与基于 SOA 的解决方案相关(包括从头创建系统以及必须集成现有系统时的情况)。在这一部分中,我们将给出一个测试应用程序实现,以便重点了解使上述场景可行的 WebSphere ESB 功能。此实现包括:

  • JMS 客户机,将发送不同 JMS 消息格式的请求。
  • 服务提供者,接受 JMS 格式服务请求,并使用 JMS 消息进行响应。
  • 一个 WebSphere ESB 中介模块,包含用于执行实际消息中介操作的中介流。

  图 2 给出了测试应用程序概略结构。请注意,总共要使用四个队列和 ESB 进行通信。

  图 2. 测试应用程序

  图 2. 测试应用程序

  在演示如何构建实际中介流并说明如何插入自定义代码之前(我们将在本系列的第 3 部分进行讨论),我们将了解测试客户端和测试服务提供者(将分别发送和接收 JMS 消息)。

  另外,您将注意到,客户端和提供者代码中没有特定于 WebSphere ESB 的内容。设计就是这样的,因为我们假定服务使用者和服务提供者都不需要了解其间存在的 ESB 的任何信息。

  客户端

  我们将不会讨论将 EAR 文件导入到 WebSphere Integration Developer 中的详细步骤。应用程序符合普通 J2EE 标准,可以像任何其他 Web 应用程序一样导入和处理。另外,我们也不会对整个源代码进行详细说明,而仅重点讨论对我们示例重要的那些部分。最后,我们将假定会将给出的所有代码部署到 WebSphere ESB 服务器;例如,WID 工具中包含的测试服务器环境。

  测试客户端应用程序基于某个 Web 应用程序中的一组 JavaServer Page 和 Servlet,允许在一对 JMS 队列间发送和接收不同类型的 JMS 消息。该程序打包为 JMSTestClientV1_6.ear 文件,可以从本文的下载部分获得此文件。

  图 3. WebSphere Integration Developer 工作区中的 JMS 客户端项目

  图 3. WebSphere Integration Developer 工作区中的 JMS 客户端项目

  在图 3 中,可以看到将客户端项目导入到 WebSphere Integration Developer 工作区后的结构。请注意,其中包含两个 Servlet,分别命名为 ReadQueue 和 WriteQueue,我们将在下面对其进行更为详细的讨论。

清单 1. WriteQueue.java JMS 消息创建


//WriteQueue.java
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);

if (msg_type.equals("TextMessage")) {
TextMessage message = session.createTextMessage(msg_text);
producer.send(message);
}
else if (msg_type.equals("BytesMessage")) {
... ...
}
else if (msg_type.equals("StreamMessage")) {
... ...
}
else if (msg_type.equals("MapMessage")) {
... ...
}
else if (msg_type.equals("ObjectMessage")) {
... ...
}

  Servlet WriteQueue(清单 1)将产生发送到指定队列目的地的所有类型的 JMS 消息。它使用标准 JMS API 调用来根据 JSP 页插入的输入构造 JMS 消息。根据指示的 JMS 消息类型对输入进行了格式设置。上面的代码示例非常明白地说明了如何构建 TextMessage。可以在可下载源代码中找到与其他类型对应的代码。以下代码摘录演示了如何解析对恰当 JMS 目的地队列的引用:

清单 2. WriteQueue.java JMS 消息创建


InitialContext ic = null;
try {
//Get the jndi initial context
ic = new InitialContext();
Context myEnv = null;

//Get the context that is specific for this WAR
//This holds items such as resource references, ejb references, etc
//from the web.xml
myEnv = (Context) ic.lookup("java:comp/env");

//Get the resource reference for the JMS QueueConnectionFactory
//that is defined in the web.xml with the folllowing attributes:
// Name: jms/qcf Type: javax.jms.QueueConnectionFactory
// Authentication: Container
// WebSphere Bindings JNDI Name: jms/qcf
qcf = (QueueConnectionFactory) myEnv.lookup("jms/qcf");

//Get the resource reference for the JMS QueueConnectionFactory
//that is defined in the web.xml with the folllowing attributes:
// Name: jms/queue Type: javax.jms.Queue
// Authentication: Container
// WebSphere Bindings JNDI Name: jms/queue
queue = (Queue) myEnv.lookup("jms/queue_sender");

} catch (NamingException e2) {
...
}

  在清单 2,可以看到,使用了 jms/qcf 作为队列连接工厂的 JNDI 名称,而使用 jms/queue_sender 作为实际队列的 JNDI 名称。您将需要在运行应用程序前设置这两个名称(将在下面进行更为详细的说明)。

清单 3. ReadQueue.java JMS 处理


//ReadQueue.java
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
con.start();
Message msg = consumer.receive(3000L);

String exceptionMark = "";
while(msg != null){
exceptionMark = "";
if (msg.propertyExists("IsBusinessException")) {
if (msg.getBooleanProperty("IsBusinessException")) {
exceptionMark = "Exception ";
}
}

if(msg instanceof TextMessage){
TextMessage textMessage = (TextMessage) msg;
if (textMessage.getText() != null){
res.add(exceptionMark + "TextMessage#" + textMessage.getText());
}
}
else if (msg instanceof BytesMessage){
... ...
}
else if (msg instanceof StreamMessage){
... ...
}
else if (msg instanceof MapMessage){
... ...
}
else if (msg instanceof ObjectMessage){
... ...
}
}

  Servlet ReadQueue.java(清单 3 中为其代码摘录)将使用来自指定队列目的地的所有类型的 JMS 消息。它将接收和分析队列中存在的所有 JMS 消息,并将经过分析的内容发送到相应的 JSP 页,以便进行显示。如果 JMS 消息具有 IsBusinessException 属性,且其值为 true,则为异常消息。普通消息和异常消息将采用不同的方式显示。ReadQueue Servlet 还使用 jms/qcf 来解析到队列连接工厂,并使用 jms/queue_receiver 作为相应队列的名称。

  在 JMSTestClientV1_6.ear 企业应用程序安装到 WebSphere ESB 运行时期间,所有选项都将使用缺省值。可以使用该工具中包括的 WebSphere ESB 测试环境以及任何独立安装的运行时。安装后,需要在 WebSphere ESB 管理控制台中执行这些配置步骤:

  通过选择 Buses => SCA.APPLICATION.esbCell.Bus => Destinations 创建两个队列目的地。队列名称为 JMSCustomBinding_Export_Receive_Q 和 JMSCustomBinding_Export_Send_Q。让所有其他选项都采用缺省值。

  通过选择 Default messaging provider => JMS queue connection factory 创建队列连接工厂。输入或选择以下值:

  • Name: JMSCustomBindingFactory
  • JNDI name: jms/JMSCustomBindingFactory
  • Bus name: SCA.APPLICATION.esbCell.Bus

  选择 Default messaging provider => JMS queue,以创建两个队列目的地。第一个队列具有以下设置:

  • Name: JMSCustomBinding_Export_Receive_Q
  • JNDI name: jms/JMSCustomBinding_Export_Receive_Q
  • Queue name: JMSCustomBinding_Export_Receive_Q
  • Bus name: SCA.APPLICATION.esbCell.Bus

  第二个队列具有以下设置:

  • Name: JMSCustomBinding_Export_Send_Q
  • JNDI name: jms/JMSCustomBinding_Export_Send_Q
  • Queue name: JMSCustomBinding_Export_Send_Q
  • Bus name: SCA.APPLICATION.esbCell.Bus

  将与中介模块共享在第 1 步到第 3 步中创建的 JMS 资源。在第 2 步中创建的队列连接工厂还将由中介模块和服务提供者(将在下文中讨论)共享。因此,当处理这些部分时,将不需要再次创建这些构件。

  选择 Enterprise Applications => JMSTestClient => Map resource references to resources,以对资源引用进行映射。

  Reference binding: jms/queue_receiver

  JNDI name: jms/JMSCustomBinding_Export_Send_Q

  Reference binding: jms/queue_sender

  JNDI name: jms/JMSCustomBinding_Export_Receive_Q

  Reference binding: jms/qcf

  JNDI name: jms/JMSCustomBindingFactory

  Login configuration: esbNode/CommonEventInfrastructureJMSAuthAlias

  在缺省情况下,这些值应该已经配置。

  正确安装并配置了应用程序后,在服务器上运行该 Web 应用程序将出现图 4 中显示的内容。

  图 4. JMSTestClient 的初始网页

  图 4. JMSTestClient 的初始网页

  选择 Click here to post a TextMessage,将随即显示一个新页面(图 5)。

  图 5. JMSTestClient 的 TextMessage 输入网页

  图 5. JMSTestClient 的 TextMessage 输入网页

  请 暂时不要单击 Post Message 按钮。我们尚未安装所需的其他应用程序部分,因此单击此按钮将不会产生预期的结果。不过,完全可以选择带有 Click here 标签的链接来发送另一种类型的 JMS 消息,以试验不同类型的 JMS 消息及其各自的输入 JSP。

服务提供者

  支持 JMS 的服务提供者在 WebSphere Integration Developer 中作为常规消息驱动 Bean(message-driven bean,MDB)实现,此类构件将接收来自已定义的 JMS 对象的 JMS 消息,然后直接将相同的消息发送回响应队列。在下载 ZIP 文件中提供了该应用程序,文件名为 JMSTestMDBV1_6.ear。将此 EAR 文件导入到工具中后,将会注意到一个名为 JMSMDBServiceEJB 的 EJB 项目,其中包含所需的 MDB JMSServiceMDB,如图 6 中所示。

  图 6. WebSphere Integration Developer 工作区中的 JMS 服务提供者

  图 6. WebSphere Integration Developer 工作区中的 JMS 服务提供者

  此 MDB 的 onMessage() 方法充当服务提供者实现。以下代码摘录演示了该方法实现的主要内容(应在本文包含的下载文件中对完整源代码进行分析):

清单 4. JMSServiceMDBBean 的 onMessage 方法实现


public void onMessage(javax.jms.Message msg) {
// get the message in the correct format
ObjectMessage bjMsg = null;
try {
InitialContext context = new InitialContext();
Queue q = (Queue) context.lookup("java:comp/env/test/sender");
QueueConnectionFactory qcf = (QueueConnectionFactory) context.lookup("java:comp/env/test/qcf");
QueueConnection conn = qcf.createQueueConnection();
conn.start();
QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(q);

String MessageID = msg.getJMSMessageID();
msg.setJMSCorrelationID(MessageID);

if (msg instanceof TextMessage) {
System.out.println("MDB: TextMessage Received");

System.out.println("MDB: TextMessage Content:");
String payload = ((TextMessage)msg).getText();
System.out.println(payload);

if(payload.equals("exception"))
{
msg.clearProperties();
msg.setBooleanProperty("IsBusinessException",true);
}
}
else if (msg instanceof ObjectMessage) {
......
}
else if (msg instanceof BytesMessage){
......
}
else if (msg instanceof StreamMessage){
......
}
else if (msg instanceof MapMessage){
......
} catch (Exception e) {
e.printStackTrace();
}
}

  在该方法实现的第一部分,我们通过查找和创建相关 JMS 资源(如消息队列、队列工厂、队列连接、队列会话以及队列发送器),以构建 JMS 消息传输上下文。它们将由 MDB 用于发送应答消息(请记住,我们示例中的服务提供者直接将接收到的消息发送回应答队列)。所有这些都是使用常规 JMS API 实现的。

  在第二部分中,将根据其 JMS 消息格式处理接收到的 JMS 消息(作为方法参数传递)。该 MDB 支持所有五种类型的消息,这五种类型在 JMS 规范中进行了定义:

  • Text
  • Object
  • Stream
  • Map
  • Byte

  尽管格式不同,但对每条消息都采用相同的处理机制:print MDB: XXXMessage Received ,提取消息内容,然后使用追加的实际内容执行 print MDB: XXXMessage Content:(其中,XXX 可以为上面列出的消息类型之一)。最后,会将原始消息直接发送回应答队列。(请注意,常规 JMS 应用程序将从请求消息中的 ReplyTo 字段中读取响应队列的名称)。

  代码中有一段特殊的代码,用于处理允许测试异常情况的 Text 消息:会将包含字符串“Exception”的消息视为导致提供者中出现异常的请求。在此情况下,我们会向 JMS 响应消息添加名为 IsBusinessException 的 Header 字段,并将其值设置为“true”。在第 3 部分中,我们将演示如何在中介模块中处理此类异常消息,以及如何将其转换为错误,以发送回原始客户端。

  在 WebSphere ESB 服务器运行时中安装了 JMSTestMDBV1_6.ear EAR 文件后,需要在 WebSphere ESB 管理控制台中执行以下配置步骤:

  •   选择 Buses => SCA.APPLICATION.esbCell.Bus => Destinations,以创建两个队列目的地。响应的队列名称为 JMSCustomBinding_Import_Receive_Q 和 JMSCustomBinding_Import_Send_Q。让所有其他选项都采用缺省值。
  •   对于此 MDB 所使用的队列连接工厂,将使用已在 JMS 客户端应用程序安装过程的第 2 步中创建的队列连接工厂。
  •   选择 Default messaging =>JMS activation specification,并采用以下设置,从而创建 JMS 激活规范:

  Name: JMSCustomBinding_Import_Send_AS

  JNDI name: jms/JMSCustomBinding_Import_Send_AS

  Destination JNDI name: jms/JMSCustomBinding_Import_Send_Q

  Bus name: SCA.APPLICATION.esbCell.Bus

  • 选择 Default messaging provider => JMS queue,以创建两个队列目的地。第一个队列具有以下设置:

  Name: JMSCustomBinding_Import_Receive_Q

  JNDI name: jms/JMSCustomBinding_Import_Receive_Q

  Queue name: JMSCustomBinding_Import_Receive_Q

  Bus name: SCA.APPLICATION.esbCell.Bus

  第二个队列具有以下设置:

  Name: JMSCustomBinding_Import_Send_Q

  JNDI name: jms/JMSCustomBinding_Import_Send_Q

  Queue name: JMSCustomBinding_Import_Send_Q

  Bus name: SCA.APPLICATION.esbCell.Bus

  • 选择 Enterprise Applications => JMSTestMDB => Bind message destination references to administrated object,以绑定消息目的地引用。

  Message destination object: test/sender

  JNDI name: jms/JMSCustomBinding_Import_Receive_Q

  Message destination object: test/qcf

  JNDI name: jms/JMSCustomBindingFactory

  在缺省情况下,这些值应该已经配置。

  • 选择 Enterprise Applications => JMSTestMDB => Provide listening bindings for message-driven beans,以提供侦听绑定。

  Activation specification

  JNDI name: jms/JMSCustomBinding_Import_Send_AS

  Destination JNDI name: jms/JMSCustomBinding_Import_Send_Q

  在缺省情况下,这些值也应该已经配置。

  完成这些步骤后,测试应用程序的客户端和服务提供者就均已就绪,可以供使用了。唯一剩下的就是开发和安装用于将其连接起来的部件:中介模块,该模块运行于 WebSphere ESB 上,并使用自定义绑定处理传入和传出 JMS 消息。我们将把这部分内容留到本系列的最后一部分讨论。

  结束语

  在本文中,我们向您展示了一些用例示例,这些示例利用 WebSphere ESB 来处理服务使用者和服务提供者(二者均使用 JMS 作为基础消息传递机制)之间交换的消息。

  我们还演示了如何构建允许对创建的中介模块进行灵活测试的测试应用程序。这包括用于将消息发送到中介模块的基于 JSP 的 JMS 客户端,以及用于从模块接收消息的消息驱动 Bean (MDB)。

  在本系列的最后一部分,我们将讨论最有意义的部分:我们将演示如何构建中介模块本身,并说明部署以处理不同类型的 JMS 消息的自定义中介代码。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/9665688/viewspace-545180/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2008-11-18

  • 博文量
    28
  • 访问量
    23412