ITPub博客

首页 > 应用开发 > IT综合 > Spring 4 + Websocket + Stomp + ActimeMQ 实现消息推送

Spring 4 + Websocket + Stomp + ActimeMQ 实现消息推送

原创 IT综合 作者:百联达 时间:2015-10-22 17:25:40 0 删除 编辑
一:代码结构

二: pom.xml文件
<project

: web.xml文件

: spring-servlet.xml文件

: WebSocketConfig.java文件
package com.gemdale.gmap.message.dispatch.center;
import java.util.List;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import com.gemdale.gmap.common.util.ConfigureUtil;

/**
 * TODO:
 * 
 * @author gengchong
 * @date 2015年10月20日 下午5:13:17
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {


    /*
     * 服务器要监听的端口,message会从这里进来,要对这里加一个Handler
     * 这样在网页中就可以通过websocket连接上服务了
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket")//websocket 端点
                .setAllowedOrigins("*")
                .setHandshakeHandler(new HandshakeHandler())
                .addInterceptors(new HandshakeInterceptor()).withSockJS();
    }


    /*
     * 消息传输参数配置 
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(8192) //设置消息字节数大小
        .setSendBufferSizeLimit(8192)//设置消息缓存大小
        .setSendTimeLimit(10000); //设置消息发送时间限制毫秒
    }


    /*
     * 输入通道参数设置
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(4) //设置消息输入通道的线程池线程数
        .maxPoolSize(8)//最大线程数
        .keepAliveSeconds(60);//线程活动时间
    }


    /*
     * 输出通道参数设置
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
    }

    @Override
    public void addArgumentResolvers(List argumentResolvers) {
    }

    @Override
    public void addReturnValueHandlers(List returnValueHandlers) {
    }

    @Override
    public boolean configureMessageConverters(List messageConverters) {
        return true;
    }

    /*
     * 配置broker
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic") // 设置可以订阅的地址,也就是服务器可以发送的地址
                .setRelayHost(ConfigureUtil.getProperty("BrokerUrl")).setRelayPort(Integer.valueOf(ConfigureUtil.getProperty("BrokerPort"))) // 设置broker的地址及端口号
                .setSystemHeartbeatReceiveInterval(2000) // 设置心跳信息接收时间间隔
                .setSystemHeartbeatSendInterval(2000); // 设置心跳信息发送时间间隔
        registry.setApplicationDestinationPrefixes("/ws");
    }

}

: HandshakeHandler.java文件

/**
 * TODO:
 * 
 * @author gengchong
 * @date 2015年10月20日 下午5:02:35
 */
public class HandshakeHandler extends DefaultHandshakeHandler{
    
    private static Logger logger = Logger.getLogger(HandshakeHandler.class);
   
}

: HandshakeInterceptor.java文件

public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
private static Logger logger = Logger.getLogger(HandshakeInterceptor.class);
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Map attributes) throws Exception {
        logger.info("============Before Handshake===========");
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception ex) {
        logger.info("============After Handshake==============");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}

: GreetingController.java文件

public class GreetingController {
    
    @MessageMapping("/hello")
    @SendTo("/topic/gmapws")
    public Greeting greeting(HelloMessage message)
    {
        try {
            return new Greeting("Hello, "+message.getName() + " !");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        
        return null;
    }


}

: HelloMessage .java文件

public class HelloMessage {


    private String name;


    /**
     * @return the name
     */
    public String getName() {
        return name;
    }


    /**
     * @param name the name to set
     */
    public void setName(String name) {
        this.name = name;
    }
    
}

: Greeting.java文件

public class Greeting {
 private String content;
 
 
 public Greeting(String content)
 {
     this.content=content;
 }


/**
 * @return the content
 */
public String getContent() {
    return content;
}


/**
 * @param content the content to set
 */
public void setContent(String content) {
    this.content = content;
}
 
}

: index.html文件



</project

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/28624388/viewspace-1815701/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
10年以上互联网经验,先后从事过制造业,证券业,物业行业和物流行业信息系统和互联网产品的研发,6年系统架构经验。最近关注Kubernetes微服务架构和Istio微服务治理框架。

注册时间:2013-02-05

  • 博文量
    320
  • 访问量
    1025614