博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMq-拦截创建消息队列
阅读量:6264 次
发布时间:2019-06-22

本文共 6531 字,大约阅读时间需要 21 分钟。

ActiveMQ拦截客户端创建/接收消息队列

1.创建插件

public class AuthPlugin implements BrokerPlugin{    private String mqName;//本MQ服务器名称    private JdbcTemplate jdbcTemplate;//数据库操作类        public AuthPlugin(JdbcTemplate jdbcTemplate,String mqName) {        this.jdbcTemplate=jdbcTemplate;        this.mqName=mqName;    }        @Override    public Broker installPlugin(Broker broker) throws Exception {        return new AuthBroker(broker,jdbcTemplate,mqName);    }}

2.修改apache-activemq\conf\activemq.xml

3.创建插件类

public class AuthBroker extends AbstractAuthenticationBroker{    private static Log log = LogFactory.getLog(AuthBroker.class);   //用户 对应的权限   private Map
> powers=new HashMap
>();//权限 private static final String ACTIVEMQ_ADVISORY_PRODUCER_QUEUE="ActiveMQ.Advisory.Producer.Queue.";//消息生产者前缀 private static final String ACTIVEMQ_ADVISORY_CONSUMER_QUEUE="ActiveMQ.Advisory.Consumer.Queue.";//消息消费者前缀 private JdbcTemplate jdbcTemplate;//数据库操作 private String mqName;//MQ服务器名称 public AuthBroker(Broker next,JdbcTemplate jdbcTemplate,String mqName) { super(next); this.jdbcTemplate=jdbcTemplate; this.mqName=mqName; } /** * 连接拦截器 */ @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { log.info("用户["+info.getUserName()+"]请求连接["+mqName+"]!"); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } /** * 认证 *

Title: authenticate

*/ @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { SecurityContext securityContext = null; Com com=getCom(username,password); //验证用户信息 if(com!=null&&com.getId()!=null){ securityContext = new SecurityContext(username) { @Override public Set
getPrincipals() { Set
groups = new HashSet
(); groups.add(new GroupPrincipal("users"));//默认加入了users的组 return groups; } };// log.info("用户:"+username+"验证成功!"); }else{ log.error("用户:"+username+"验证失败!"); throw new SecurityException("验证失败"); } return securityContext; } /** * 添加一个目标 *

Title: addDestination

* @see org.apache.activemq.broker.BrokerFilter#addDestination(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination, boolean) */ @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { boolean destStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, 0,ACTIVEMQ_ADVISORY_PRODUCER_QUEUE.length()); //发送消息者 if(destStats){ if(context.getSecurityContext()!=null){ //判断不是默认用户 if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){ String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, "");//得到消息队列名 if(powers.containsKey(context.getSecurityContext().getUserName())){ //判断该用户是否有权限 Map
map=powers.get(context.getSecurityContext().getUserName()); if(map!=null&&map.containsKey(queuesName)){ //判断是否有发送的权限 if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){ if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.SEND.getValue().intValue()){ return super.addDestination(context, destination, createIfTemporary); } } } throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有发送消息的权限"); }else{ throw new Exception("请登录后再操作!"); } } } }else{ boolean consumerStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, 0,ACTIVEMQ_ADVISORY_CONSUMER_QUEUE.length()); //消息接收者 if(consumerStats){ if(context.getSecurityContext()!=null){ //判断不是默认用户 if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){ String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, "");//得到消息队列名称 if(powers.containsKey(context.getSecurityContext().getUserName())){ //判断用户是否有对应的权限 Map
map=powers.get(context.getSecurityContext().getUserName()); if(map!=null&&map.containsKey(queuesName)){ if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){ if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.RECEIVE.getValue().intValue()){ return super.addDestination(context, destination, createIfTemporary); } } } throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有获取消息的权限"); }else{ throw new Exception("请登录后再操作!"); } } } } } return super.addDestination(context, destination, createIfTemporary); } /** * 监控发送消息 *

Title: send

* @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, org.apache.activemq.command.Message) */ @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { String userName=producerExchange.getConnectionContext().getUserName(); ActiveMQDestination msgDest = messageSend.getDestination(); String physicalName = msgDest.getPhysicalName(); }/** * 监控消息接收者 *

Title: acknowledge

* @see org.apache.activemq.broker.BrokerFilter#acknowledge(org.apache.activemq.broker.ConsumerBrokerExchange, org.apache.activemq.command.MessageAck) */ @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { String userName=consumerExchange.getConnectionContext().getUserName(); String queues=ack.getDestination().getPhysicalName(); }}

 

转载于:https://www.cnblogs.com/huangzhex/p/6358214.html

你可能感兴趣的文章
实现淘宝和QQ ToolBar透明渐变效果
查看>>
源码安装lamp
查看>>
python --int 对象
查看>>
关于selenium的一些资料
查看>>
【高级内部资料】.NET数据批量写入性能分析 第二篇
查看>>
我为什么要写Sinon.JS
查看>>
在RHEL5下建立自己的YUM源服务器
查看>>
CCNP学习笔记8——OSPF
查看>>
为什么 (obj.foo)() 调用的执行环境中的 this 就是 obj ?
查看>>
对HTML语义化的理解
查看>>
Redis的内存管理
查看>>
CSS中元素空间占用
查看>>
IP san方式的共享存储的实现(target)
查看>>
rocketmq 延迟队列的实现
查看>>
Linux下的虚拟Bridge实现
查看>>
我的友情链接
查看>>
反向代理负载均衡
查看>>
SVN 精细版本控制
查看>>
如何理解Kubernetes认证和授权
查看>>
安全与加密之加密算法,CA,openssl,证书管理
查看>>