首页

通过对activemq源码中DefaultBroker核心类了解其内部实现的的具体细节

标签:消息原理,activemq,broker设计原理,源码     发布时间:2017-12-04   

一、前言

通过activemq包(3.2)中关于org.activemq.broker.impl.DefaultBroker的分析,了解实现消息核心类包括事务管理TransactionManager、容量监控DelegateCapacityMonitor、消息监听MessageContainerManager、安全适配及相关管理等。

二、源码说明

1.定义Broker、BrokerAdmin接口

package org.activemq.broker;@b@@b@import java.io.File;@b@import java.util.Hashtable;@b@import java.util.Map;@b@import javax.jms.JMSException;@b@import javax.naming.Context;@b@import javax.transaction.xa.XAException;@b@import org.activemq.capacity.CapacityMonitor;@b@import org.activemq.message.ActiveMQMessage;@b@import org.activemq.message.ActiveMQXid;@b@import org.activemq.message.BrokerInfo;@b@import org.activemq.message.ConnectionInfo;@b@import org.activemq.message.ConsumerInfo;@b@import org.activemq.message.MessageAck;@b@import org.activemq.message.ProducerInfo;@b@import org.activemq.security.SecurityAdapter;@b@import org.activemq.service.DeadLetterPolicy;@b@import org.activemq.service.MessageContainerManager;@b@import org.activemq.service.RedeliveryPolicy;@b@import org.activemq.service.Service;@b@import org.activemq.store.PersistenceAdapter;@b@@b@public abstract interface Broker extends Service, CapacityMonitor@b@{@b@  public abstract BrokerAdmin getBrokerAdmin();@b@@b@  public abstract BrokerInfo getBrokerInfo();@b@@b@  public abstract void addClient(BrokerClient paramBrokerClient, ConnectionInfo paramConnectionInfo)@b@    throws JMSException;@b@@b@  public abstract void removeClient(BrokerClient paramBrokerClient, ConnectionInfo paramConnectionInfo)@b@    throws JMSException;@b@@b@  public abstract void addMessageProducer(BrokerClient paramBrokerClient, ProducerInfo paramProducerInfo)@b@    throws JMSException;@b@@b@  public abstract void removeMessageProducer(BrokerClient paramBrokerClient, ProducerInfo paramProducerInfo)@b@    throws JMSException;@b@@b@  public abstract void addMessageConsumer(BrokerClient paramBrokerClient, ConsumerInfo paramConsumerInfo)@b@    throws JMSException;@b@@b@  public abstract void removeMessageConsumer(BrokerClient paramBrokerClient, ConsumerInfo paramConsumerInfo)@b@    throws JMSException;@b@@b@  public abstract void sendMessage(BrokerClient paramBrokerClient, ActiveMQMessage paramActiveMQMessage)@b@    throws JMSException;@b@@b@  public abstract void acknowledgeMessage(BrokerClient paramBrokerClient, MessageAck paramMessageAck)@b@    throws JMSException;@b@@b@  public abstract ActiveMQXid[] getPreparedTransactions(BrokerClient paramBrokerClient)@b@    throws XAException;@b@@b@  public abstract void deleteSubscription(String paramString1, String paramString2)@b@    throws JMSException;@b@@b@  public abstract void startTransaction(BrokerClient paramBrokerClient, String paramString)@b@    throws JMSException;@b@@b@  public abstract void commitTransaction(BrokerClient paramBrokerClient, String paramString)@b@    throws JMSException;@b@@b@  public abstract void rollbackTransaction(BrokerClient paramBrokerClient, String paramString)@b@    throws JMSException;@b@@b@  public abstract void startTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@    throws XAException;@b@@b@  public abstract int prepareTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@    throws XAException;@b@@b@  public abstract void rollbackTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@    throws XAException;@b@@b@  public abstract void commitTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid, boolean paramBoolean)@b@    throws XAException;@b@@b@  public abstract File getTempDir();@b@@b@  public abstract String getBrokerName();@b@@b@  public abstract String getBrokerClusterName();@b@@b@  public abstract PersistenceAdapter getPersistenceAdapter();@b@@b@  public abstract void setPersistenceAdapter(PersistenceAdapter paramPersistenceAdapter);@b@@b@  public abstract Map getContainerManagerMap();@b@@b@  public abstract Context getDestinationContext(Hashtable paramHashtable);@b@@b@  public abstract void addConsumerInfoListener(ConsumerInfoListener paramConsumerInfoListener);@b@@b@  public abstract void removeConsumerInfoListener(ConsumerInfoListener paramConsumerInfoListener);@b@@b@  public abstract MessageContainerManager getPersistentTopicContainerManager();@b@@b@  public abstract MessageContainerManager getTransientTopicContainerManager();@b@@b@  public abstract MessageContainerManager getPersistentQueueContainerManager();@b@@b@  public abstract MessageContainerManager getTransientQueueContainerManager();@b@@b@  public abstract SecurityAdapter getSecurityAdapter();@b@@b@  public abstract void setSecurityAdapter(SecurityAdapter paramSecurityAdapter);@b@@b@  public abstract RedeliveryPolicy getRedeliveryPolicy();@b@@b@  public abstract void setRedeliveryPolicy(RedeliveryPolicy paramRedeliveryPolicy);@b@@b@  public abstract DeadLetterPolicy getDeadLetterPolicy();@b@@b@  public abstract void setDeadLetterPolicy(DeadLetterPolicy paramDeadLetterPolicy);@b@@b@  public abstract void sendToDeadLetterQueue(String paramString, ActiveMQMessage paramActiveMQMessage)@b@    throws JMSException;@b@}
package org.activemq.broker;@b@@b@import javax.jms.JMSException;@b@import org.activemq.message.ActiveMQDestination;@b@import org.activemq.service.MessageContainerAdmin;@b@@b@public abstract interface BrokerAdmin@b@{@b@  public abstract void createMessageContainer(ActiveMQDestination paramActiveMQDestination)@b@    throws JMSException;@b@@b@  public abstract void destoryMessageContainer(ActiveMQDestination paramActiveMQDestination)@b@    throws JMSException;@b@@b@  public abstract MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination paramActiveMQDestination)@b@    throws JMSException;@b@@b@  public abstract MessageContainerAdmin[] listMessageContainerAdmin()@b@    throws JMSException;@b@}

2.DefaultBroker

package org.activemq.broker.impl;@b@@b@import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;@b@import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;@b@import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;@b@import java.io.File;@b@import java.io.IOException;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.Hashtable;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Set;@b@import javax.jms.JMSException;@b@import javax.naming.Context;@b@import javax.transaction.xa.XAException;@b@import org.activemq.broker.Broker;@b@import org.activemq.broker.BrokerAdmin;@b@import org.activemq.broker.BrokerClient;@b@import org.activemq.broker.ConsumerInfoListener;@b@import org.activemq.capacity.DelegateCapacityMonitor;@b@import org.activemq.io.util.MemoryBoundedObjectManager;@b@import org.activemq.io.util.MemoryBoundedQueueManager;@b@import org.activemq.jndi.ReadOnlyContext;@b@import org.activemq.message.ActiveMQDestination;@b@import org.activemq.message.ActiveMQMessage;@b@import org.activemq.message.ActiveMQXid;@b@import org.activemq.message.BrokerInfo;@b@import org.activemq.message.ConnectionInfo;@b@import org.activemq.message.ConsumerInfo;@b@import org.activemq.message.MessageAck;@b@import org.activemq.message.ProducerInfo;@b@import org.activemq.security.SecurityAdapter;@b@import org.activemq.service.DeadLetterPolicy;@b@import org.activemq.service.MessageContainerAdmin;@b@import org.activemq.service.MessageContainerManager;@b@import org.activemq.service.RedeliveryPolicy;@b@import org.activemq.service.Transaction;@b@import org.activemq.service.TransactionManager;@b@import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager;@b@import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager;@b@import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager;@b@import org.activemq.service.impl.DurableTopicMessageContainerManager;@b@import org.activemq.store.PersistenceAdapter;@b@import org.activemq.store.PersistenceAdapterFactory;@b@import org.activemq.store.TransactionStore;@b@import org.activemq.store.vm.VMPersistenceAdapter;@b@import org.activemq.store.vm.VMTransactionManager;@b@import org.activemq.util.Callback;@b@import org.activemq.util.ExceptionTemplate;@b@import org.activemq.util.JMSExceptionHelper;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public class DefaultBroker extends DelegateCapacityMonitor@b@  implements Broker, BrokerAdmin@b@{@b@  private static final Log log = LogFactory.getLog(DefaultBroker.class);@b@  protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";@b@  protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory";@b@  protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = { File.class };@b@  private static final long DEFAULT_MAX_MEMORY_USAGE = 20971520L;@b@  private PersistenceAdapter persistenceAdapter;@b@  private TransactionManager transactionManager;@b@  private MessageContainerManager[] containerManagers;@b@  private File tempDir;@b@  private MemoryBoundedObjectManager memoryManager;@b@  private MemoryBoundedQueueManager queueManager;@b@  private TransactionStore preparedTransactionStore;@b@  private Map containerManagerMap;@b@  private CopyOnWriteArrayList consumerInfoListeners;@b@  private MessageContainerManager persistentTopicMCM;@b@  private MessageContainerManager transientTopicMCM;@b@  private TransientQueueBoundedMessageManager transientQueueMCM;@b@  private DurableQueueBoundedMessageManager persistentQueueMCM;@b@  private SecurityAdapter securityAdapter;@b@  private RedeliveryPolicy redeliveryPolicy;@b@  private DeadLetterPolicy deadLetterPolicy;@b@  private AdvisorySupport advisory;@b@  private Map messageConsumers;@b@  private BrokerInfo brokerInfo;@b@  private SynchronizedBoolean started;@b@  private BrokerContainerImpl brokerContainer;@b@@b@  public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager)@b@  {@b@    this.messageConsumers = new ConcurrentHashMap();@b@@b@    this.started = new SynchronizedBoolean(false);@b@@b@    this.brokerInfo = new LocalBrokerInfo(this);@b@    this.brokerInfo.setBrokerName(brokerName);@b@    this.brokerInfo.setClusterName(brokerClusterName);@b@    this.memoryManager = memoryManager;@b@    this.queueManager = new MemoryBoundedQueueManager(memoryManager);@b@    setDelegate(memoryManager);@b@    this.containerManagerMap = new ConcurrentHashMap();@b@    this.consumerInfoListeners = new CopyOnWriteArrayList();@b@    this.advisory = new AdvisorySupport(this);@b@  }@b@@b@  public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) {@b@    this(brokerName, "default", memoryManager);@b@  }@b@@b@  public DefaultBroker(String brokerName, String cluserName) {@b@    this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@  }@b@@b@  public DefaultBroker(String brokerName) {@b@    this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@  }@b@@b@  public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {@b@    this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@    this.persistenceAdapter = persistenceAdapter;@b@  }@b@@b@  public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {@b@    this(brokerName);@b@    this.persistenceAdapter = persistenceAdapter;@b@  }@b@@b@  public boolean isStarted() {@b@    return this.started.get();@b@  }@b@@b@  public void start()@b@    throws JMSException@b@  {@b@    if (this.started.commit(false, true)) {@b@      if (this.redeliveryPolicy == null)@b@        this.redeliveryPolicy = new RedeliveryPolicy();@b@@b@      if (this.deadLetterPolicy == null)@b@        this.deadLetterPolicy = new DeadLetterPolicy(this);@b@@b@      if (this.persistenceAdapter == null)@b@        this.persistenceAdapter = createPersistenceAdapter();@b@@b@      this.persistenceAdapter.start();@b@@b@      if (this.transactionManager == null) {@b@        this.preparedTransactionStore = this.persistenceAdapter.createTransactionStore();@b@        this.transactionManager = new VMTransactionManager(this, this.preparedTransactionStore);@b@      }@b@@b@      if (this.containerManagerMap.isEmpty())@b@        makeDefaultContainerManagers();@b@@b@      getContainerManagers();@b@@b@      for (int i = 0; i < this.containerManagers.length; ++i) {@b@        this.containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy);@b@        this.containerManagers[i].start();@b@      }@b@      this.transactionManager.start();@b@    }@b@  }@b@@b@  public void stop()@b@    throws JMSException@b@  {@b@    if (this.started.commit(true, false)) {@b@      int i;@b@      ExceptionTemplate template = new ExceptionTemplate();@b@@b@      if (this.containerManagers != null)@b@        for (i = 0; i < this.containerManagers.length; ++i) {@b@          MessageContainerManager containerManager = this.containerManagers[i];@b@          template.run(new Callback(this, containerManager) { private final MessageContainerManager val$containerManager;@b@            private final DefaultBroker this$0;@b@@b@            public void execute() throws Throwable { this.val$containerManager.stop();@b@            }@b@          });@b@        }@b@@b@      if (this.transactionManager != null)@b@        template.run(new Callback(this) {@b@          private final DefaultBroker this$0;@b@@b@          public void execute() throws Throwable { DefaultBroker.access$000(this.this$0).stop();@b@          }@b@@b@        });@b@@b@      template.run(new Callback(this) {@b@        private final DefaultBroker this$0;@b@@b@        public void execute() throws Throwable { DefaultBroker.access$100(this.this$0).stop();@b@        }@b@@b@      });@b@      template.throwJMSException();@b@    }@b@  }@b@@b@  public void addClient(BrokerClient client, ConnectionInfo info)@b@    throws JMSException@b@  {@b@    if (this.securityAdapter != null)@b@      this.securityAdapter.authorizeConnection(client, info);@b@@b@    this.advisory.addConnection(client, info);@b@  }@b@@b@  public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {@b@    if (this.transactionManager != null)@b@      this.transactionManager.cleanUpClient(client);@b@@b@    this.advisory.removeConnection(client, info);@b@  }@b@@b@  public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {@b@    if (this.securityAdapter != null)@b@      this.securityAdapter.authorizeProducer(client, info);@b@@b@    this.advisory.addProducer(client, info);@b@  }@b@@b@  public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {@b@    this.advisory.removeProducer(client, info);@b@  }@b@@b@  public void addMessageConsumer(BrokerClient client, ConsumerInfo info)@b@    throws JMSException@b@  {@b@    validateConsumer(info);@b@    if (this.securityAdapter != null)@b@      this.securityAdapter.authorizeConsumer(client, info);@b@@b@    this.advisory.addAdvisory(client, info);@b@    MessageContainerManager[] array = getContainerManagers();@b@    for (int i = 0; i < array.length; ++i)@b@      array[i].addMessageConsumer(client, info);@b@@b@    fireConsumerInfo(client, info);@b@    this.messageConsumers.put(info, client);@b@  }@b@@b@  public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)@b@    throws JMSException@b@  {@b@    validateConsumer(info);@b@    this.advisory.removeAdvisory(client, info);@b@    for (int i = 0; i < this.containerManagers.length; ++i)@b@      this.containerManagers[i].removeMessageConsumer(client, info);@b@@b@    fireConsumerInfo(client, info);@b@    this.messageConsumers.remove(info);@b@  }@b@@b@  public void sendMessage(BrokerClient client, ActiveMQMessage message)@b@    throws JMSException@b@  {@b@    checkValid();@b@    ActiveMQDestination destination = message.getJMSActiveMQDestination();@b@    if (destination == null)@b@      throw new JMSException("No destination specified for the Message");@b@@b@    if ((message.getJMSMessageID() == null) && (!(destination.isAdvisory())))@b@      throw new JMSException("No messageID specified for the Message");@b@@b@    associateTransaction(message);@b@    try {@b@      boolean first;@b@      Iterator iter;@b@      if (destination.isComposite()) {@b@        first = true;@b@        for (iter = destination.getChildDestinations().iterator(); iter.hasNext(); ) {@b@          ActiveMQDestination childDestination = (ActiveMQDestination)iter.next();@b@@b@          if (first) {@b@            first = false;@b@          }@b@          else@b@            message = message.shallowCopy();@b@@b@          message.setJMSDestination(childDestination);@b@          doMessageSend(client, message);@b@        }@b@      }@b@      else {@b@        if ((destination.isTempDestinationAdvisory()) && (!(client.isBrokerConnection())))@b@          this.advisory.processTempDestinationAdvisory(client, message);@b@@b@        doMessageSend(client, message);@b@      }@b@    }@b@    finally {@b@      disAssociateTransaction();@b@    }@b@  }@b@@b@  public void acknowledgeMessage(BrokerClient client, MessageAck ack)@b@    throws JMSException@b@  {@b@    int i;@b@    associateTransaction(ack);@b@    try {@b@      for (i = 0; i < this.containerManagers.length; ++i)@b@        this.containerManagers[i].acknowledgeMessage(client, ack);@b@    }@b@    finally {@b@      disAssociateTransaction();@b@    }@b@  }@b@@b@  public void deleteSubscription(String clientId, String subscriberName) throws JMSException@b@  {@b@    for (int i = 0; i < this.containerManagers.length; ++i)@b@      this.containerManagers[i].deleteSubscription(clientId, subscriberName);@b@  }@b@@b@  public void startTransaction(BrokerClient client, String transactionId)@b@    throws JMSException@b@  {@b@    this.transactionManager.createLocalTransaction(client, transactionId); }@b@@b@  public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {@b@    Transaction transaction;@b@    try {@b@      transaction = this.transactionManager.getLocalTransaction(transactionId);@b@      transaction.commit(true);@b@    }@b@    catch (XAException e)@b@    {@b@      throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@    }@b@  }@b@@b@  public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException@b@  {@b@    Transaction transaction;@b@    try@b@    {@b@      transaction = this.transactionManager.getLocalTransaction(transactionId);@b@      transaction.rollback();@b@    }@b@    catch (XAException e)@b@    {@b@      throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@    }@b@  }@b@@b@  public void startTransaction(BrokerClient client, ActiveMQXid xid)@b@    throws XAException@b@  {@b@    this.transactionManager.createXATransaction(client, xid);@b@  }@b@@b@  public int prepareTransaction(BrokerClient client, ActiveMQXid xid)@b@    throws XAException@b@  {@b@    Transaction transaction = this.transactionManager.getXATransaction(xid);@b@    return transaction.prepare();@b@  }@b@@b@  public void rollbackTransaction(BrokerClient client, ActiveMQXid xid)@b@    throws XAException@b@  {@b@    Transaction transaction = this.transactionManager.getXATransaction(xid);@b@    transaction.rollback();@b@  }@b@@b@  public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase)@b@    throws XAException@b@  {@b@    Transaction transaction = this.transactionManager.getXATransaction(xid);@b@    transaction.commit(onePhase);@b@  }@b@@b@  public ActiveMQXid[] getPreparedTransactions(BrokerClient client)@b@    throws XAException@b@  {@b@    return this.transactionManager.getPreparedXATransactions();@b@  }@b@@b@  public File getTempDir()@b@  {@b@    if (this.tempDir == null) {@b@      String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");@b@      this.tempDir = new File(dirName);@b@    }@b@    return this.tempDir;@b@  }@b@@b@  public String getBrokerName() {@b@    return this.brokerInfo.getBrokerName();@b@  }@b@@b@  public String getBrokerClusterName()@b@  {@b@    return this.brokerInfo.getClusterName();@b@  }@b@@b@  public void setTempDir(File tempDir)@b@  {@b@    this.tempDir = tempDir;@b@  }@b@@b@  public MessageContainerManager[] getContainerManagers() {@b@    if (this.containerManagers == null)@b@      this.containerManagers = createContainerManagers();@b@@b@    return this.containerManagers;@b@  }@b@@b@  public Map getContainerManagerMap() {@b@    return this.containerManagerMap;@b@  }@b@@b@  public void setContainerManagerMap(Map containerManagerMap) {@b@    this.containerManagerMap = containerManagerMap;@b@    this.containerManagers = null;@b@  }@b@@b@  public PersistenceAdapter getPersistenceAdapter() {@b@    return this.persistenceAdapter;@b@  }@b@@b@  public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {@b@    this.persistenceAdapter = persistenceAdapter;@b@  }@b@@b@  public TransactionManager getTransactionManager() {@b@    return this.transactionManager;@b@  }@b@@b@  public void setTransactionManager(TransactionManager transactionManager) {@b@    this.transactionManager = transactionManager;@b@  }@b@@b@  public SecurityAdapter getSecurityAdapter() {@b@    return this.securityAdapter;@b@  }@b@@b@  public void setSecurityAdapter(SecurityAdapter securityAdapter) {@b@    this.securityAdapter = securityAdapter;@b@  }@b@@b@  public RedeliveryPolicy getRedeliveryPolicy() {@b@    return this.redeliveryPolicy;@b@  }@b@@b@  public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {@b@    this.redeliveryPolicy = redeliveryPolicy;@b@  }@b@@b@  public TransactionStore getPreparedTransactionStore() {@b@    return this.preparedTransactionStore;@b@  }@b@@b@  public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) {@b@    this.preparedTransactionStore = preparedTransactionStore;@b@  }@b@@b@  public DeadLetterPolicy getDeadLetterPolicy()@b@  {@b@    return this.deadLetterPolicy;@b@  }@b@@b@  public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy)@b@  {@b@    this.deadLetterPolicy = deadLetterPolicy;@b@  }@b@@b@  public long getMaximumMemoryUsage()@b@  {@b@    return this.memoryManager.getValueLimit();@b@  }@b@@b@  public void setMaximumMemoryUsage(long maximumMemoryUsage)@b@  {@b@    this.memoryManager.setValueLimit(maximumMemoryUsage);@b@  }@b@@b@  public Context getDestinationContext(Hashtable environment)@b@  {@b@    Map data = new ConcurrentHashMap();@b@    for (Iterator iter = this.containerManagerMap.entrySet().iterator(); iter.hasNext(); ) {@b@      Map.Entry entry = (Map.Entry)iter.next();@b@      String name = entry.getKey().toString();@b@      MessageContainerManager manager = (MessageContainerManager)entry.getValue();@b@      Context context = new ReadOnlyContext(environment, manager.getDestinations());@b@      data.put(name, context);@b@    }@b@    return new ReadOnlyContext(environment, data);@b@  }@b@@b@  protected void doMessageSend(BrokerClient client, ActiveMQMessage message)@b@    throws JMSException@b@  {@b@    if (this.securityAdapter != null)@b@      this.securityAdapter.authorizeSendMessage(client, message);@b@@b@    ActiveMQDestination dest = message.getJMSActiveMQDestination();@b@    if (dest.isTopic()) {@b@      if ((message.isPersistent()) && (!(dest.isTemporary())))@b@        this.persistentTopicMCM.sendMessage(client, message);@b@@b@      this.transientTopicMCM.sendMessage(client, message);@b@    } else {@b@      this.transientQueueMCM.sendMessage(client, message);@b@      this.persistentQueueMCM.sendMessage(client, message);@b@    }@b@  }@b@@b@  protected PersistenceAdapter createPersistenceAdapter()@b@    throws JMSException@b@  {@b@    File directory = new File(getStoreDirectory());@b@@b@    PersistenceAdapter answer = null;@b@    String property = System.getProperty("activemq.persistenceAdapterFactory");@b@    if (property != null)@b@      answer = tryCreatePersistenceAdapter(property, directory, false);@b@@b@    if (answer == null)@b@      answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true);@b@@b@    if (answer != null) {@b@      return answer;@b@    }@b@@b@    log.warn("Default message store (journal+derby) could not be found in the classpath or property 'activemq.persistenceAdapterFactory' not specified so defaulting to use RAM based message persistence");@b@@b@    return new VMPersistenceAdapter();@b@  }@b@@b@  protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException@b@  {@b@    Class adapterClass = loadClass(className, ignoreErrors);@b@    if (adapterClass != null)@b@      try {@b@        PersistenceAdapterFactory factory = (PersistenceAdapterFactory)adapterClass.newInstance();@b@        PersistenceAdapter answer = factory.createPersistenceAdapter(directory, this.memoryManager);@b@        log.info("Persistence adapter created using: " + className);@b@        return answer;@b@      }@b@      catch (IOException cause) {@b@        throw createInstantiateAdapterException(className, cause);@b@      }@b@      catch (Throwable e) {@b@        if (!(ignoreErrors))@b@          throw createInstantiateAdapterException(className, e);@b@      }@b@@b@@b@    return null;@b@  }@b@@b@  protected JMSException createInstantiateAdapterException(String className, Throwable e) {@b@    return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: " + className + ". Reason: " + e, e);@b@  }@b@@b@  protected Class loadClass(String name, boolean ignoreErrors)@b@    throws JMSException@b@  {@b@    try@b@    {@b@      return Thread.currentThread().getContextClassLoader().loadClass(name);@b@    }@b@    catch (ClassNotFoundException e) {@b@      try {@b@        return getClass().getClassLoader().loadClass(name);@b@      }@b@      catch (ClassNotFoundException e2) {@b@        if (ignoreErrors) {@b@          log.trace("Could not find class: " + name + " on the classpath");@b@          return null;@b@        }@b@@b@        throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);@b@      }@b@    }@b@  }@b@@b@  protected String getStoreDirectory()@b@  {@b@    String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(getBrokerInfo().getBrokerName());@b@    return System.getProperty("activemq.store.dir", defaultDirectory);@b@  }@b@@b@  protected MessageContainerManager[] createContainerManagers()@b@  {@b@    int size = this.containerManagerMap.size();@b@    MessageContainerManager[] answer = new MessageContainerManager[size];@b@    this.containerManagerMap.values().toArray(answer);@b@    return answer;@b@  }@b@@b@  protected void makeDefaultContainerManagers() {@b@    this.transientTopicMCM = new TransientTopicBoundedMessageManager(this.queueManager);@b@    this.containerManagerMap.put("transientTopicContainer", this.transientTopicMCM);@b@    this.persistentTopicMCM = new DurableTopicMessageContainerManager(this.persistenceAdapter, this.redeliveryPolicy, this.deadLetterPolicy);@b@    this.containerManagerMap.put("persistentTopicContainer", this.persistentTopicMCM);@b@    this.persistentQueueMCM = new DurableQueueBoundedMessageManager(this.persistenceAdapter, this.queueManager, this.redeliveryPolicy, this.deadLetterPolicy);@b@    this.containerManagerMap.put("persistentQueueContainer", this.persistentQueueMCM);@b@    this.transientQueueMCM = new TransientQueueBoundedMessageManager(this.queueManager, this.redeliveryPolicy, this.deadLetterPolicy);@b@    this.containerManagerMap.put("transientQueueContainer", this.transientQueueMCM);@b@  }@b@@b@  protected void validateConsumer(ConsumerInfo info)@b@    throws JMSException@b@  {@b@    if (info.getConsumerId() == null)@b@      throw new JMSException("No consumerId specified for the ConsumerInfo");@b@  }@b@@b@  protected void checkValid() throws JMSException@b@  {@b@    if (this.containerManagers == null)@b@      throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");@b@  }@b@@b@  public void addConsumerInfoListener(ConsumerInfoListener l)@b@  {@b@    Iterator i;@b@    if (l != null) {@b@      this.consumerInfoListeners.add(l);@b@@b@      for (i = this.messageConsumers.entrySet().iterator(); i.hasNext(); ) {@b@        Map.Entry entry = (Map.Entry)i.next();@b@        ConsumerInfo info = (ConsumerInfo)entry.getKey();@b@        BrokerClient client = (BrokerClient)entry.getValue();@b@        l.onConsumerInfo(client, info);@b@      }@b@    }@b@  }@b@@b@  public void removeConsumerInfoListener(ConsumerInfoListener l)@b@  {@b@    this.consumerInfoListeners.remove(l);@b@  }@b@@b@  protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {@b@    for (Iterator i = this.consumerInfoListeners.iterator(); i.hasNext(); ) {@b@      ConsumerInfoListener l = (ConsumerInfoListener)i.next();@b@      l.onConsumerInfo(client, info);@b@    }@b@  }@b@@b@  public MessageContainerManager getPersistentTopicContainerManager()@b@  {@b@    return this.persistentTopicMCM;@b@  }@b@@b@  public MessageContainerManager getTransientTopicContainerManager()@b@  {@b@    return this.transientTopicMCM;@b@  }@b@@b@  public MessageContainerManager getPersistentQueueContainerManager()@b@  {@b@    return this.persistentQueueMCM;@b@  }@b@@b@  public MessageContainerManager getTransientQueueContainerManager()@b@  {@b@    return this.transientQueueMCM;@b@  }@b@@b@  public BrokerAdmin getBrokerAdmin()@b@  {@b@    return this;@b@  }@b@@b@  public void createMessageContainer(ActiveMQDestination dest) throws JMSException {@b@    for (int i = 0; i < this.containerManagers.length; ++i)@b@      this.containerManagers[i].createMessageContainer(dest);@b@  }@b@@b@  public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException@b@  {@b@    for (int i = 0; i < this.containerManagers.length; ++i)@b@      this.containerManagers[i].destroyMessageContainer(dest);@b@  }@b@@b@  public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException@b@  {@b@    for (int i = 0; i < this.containerManagers.length; ++i) {@b@      Map messageContainerAdmins = this.containerManagers[i].getMessageContainerAdmins();@b@      MessageContainerAdmin mca = (MessageContainerAdmin)messageContainerAdmins.get(dest);@b@      if (mca != null)@b@        return mca;@b@    }@b@@b@    return null;@b@  }@b@@b@  public MessageContainerAdmin[] listMessageContainerAdmin()@b@    throws JMSException@b@  {@b@    ArrayList l = new ArrayList();@b@    for (int i = 0; i < this.containerManagers.length; ++i) {@b@      Map messageContainerAdmins = this.containerManagers[i].getMessageContainerAdmins();@b@      for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext(); ) {@b@        MessageContainerAdmin mca = (MessageContainerAdmin)iter.next();@b@        l.add(mca);@b@      }@b@    }@b@@b@    MessageContainerAdmin[] answer = new MessageContainerAdmin[l.size()];@b@    l.toArray(answer);@b@    return answer;@b@  }@b@@b@  public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage expiredMessage)@b@    throws JMSException@b@  {@b@    if (this.persistentQueueMCM != null) {@b@      Transaction original = TransactionManager.getContexTransaction();@b@      try {@b@        TransactionManager.setContexTransaction(null);@b@        this.persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, expiredMessage);@b@        log.debug(expiredMessage + " sent to DLQ: " + deadLetterName);@b@      } finally {@b@        TransactionManager.setContexTransaction(original);@b@      }@b@    }@b@  }@b@@b@  private final void associateTransaction(ActiveMQMessage message)@b@    throws JMSException@b@  {@b@    Transaction transaction;@b@    if (message.isPartOfTransaction()) {@b@      if (message.isXaTransacted())@b@        try {@b@          transaction = this.transactionManager.getXATransaction((ActiveMQXid)message.getTransactionId());@b@        }@b@        catch (XAException e) {@b@          throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@        }@b@@b@      transaction = this.transactionManager.getLocalTransaction((String)message.getTransactionId());@b@    }@b@    else@b@    {@b@      transaction = null;@b@    }@b@    TransactionManager.setContexTransaction(transaction);@b@  }@b@@b@  private void disAssociateTransaction() {@b@    TransactionManager.setContexTransaction(null);@b@  }@b@@b@  private void associateTransaction(MessageAck ack)@b@    throws JMSException@b@  {@b@    Transaction transaction;@b@    if (ack.isPartOfTransaction()) {@b@      if (ack.isXaTransacted())@b@        try {@b@          transaction = this.transactionManager.getXATransaction((ActiveMQXid)ack.getTransactionId());@b@        }@b@        catch (XAException e) {@b@          throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@        }@b@@b@      transaction = this.transactionManager.getLocalTransaction((String)ack.getTransactionId());@b@    }@b@    else@b@    {@b@      transaction = null;@b@    }@b@    TransactionManager.setContexTransaction(transaction);@b@  }@b@@b@  private String sanitizeString(String in) {@b@    String result = null;@b@    if (in != null) {@b@      result = in.replace(':', '_');@b@      result = result.replace('/', '_');@b@      result = result.replace('\\', '_');@b@    }@b@    return result;@b@  }@b@@b@  public MemoryBoundedObjectManager getMemoryManager()@b@  {@b@    return this.memoryManager;@b@  }@b@@b@  public MemoryBoundedQueueManager getQueueManager()@b@  {@b@    return this.queueManager;@b@  }@b@@b@  public String getName()@b@  {@b@    return getBrokerName();@b@  }@b@@b@  public String toString()@b@  {@b@    return "broker: " + getName();@b@  }@b@@b@  public BrokerInfo getBrokerInfo()@b@  {@b@    return this.brokerInfo;@b@  }@b@@b@  protected void setBrokercontainer(BrokerContainerImpl container) {@b@    this.brokerContainer = container;@b@  }@b@@b@  protected BrokerContainerImpl getBrokerContainer() {@b@    return this.brokerContainer;@b@  }@b@@b@  static TransactionManager access$000(DefaultBroker x0)@b@  {@b@    return x0.transactionManager; } @b@  static PersistenceAdapter access$100(DefaultBroker x0) { return x0.persistenceAdapter;@b@  }@b@}

3.容器监控

package org.activemq.capacity;@b@@b@import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;@b@import java.util.Iterator;@b@@b@public class DelegateCapacityMonitor@b@  implements CapacityMonitor@b@{@b@  String name;@b@  CapacityMonitor monitor;@b@  CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();@b@@b@  public DelegateCapacityMonitor()@b@  {@b@  }@b@@b@  public DelegateCapacityMonitor(String name, CapacityMonitor cm)@b@  {@b@    this.name = name;@b@    this.monitor = cm;@b@  }@b@@b@  public void setDelegate(CapacityMonitor cm)@b@  {@b@    Iterator i;@b@    this.monitor = cm;@b@    if (cm != null)@b@      for (i = this.listeners.iterator(); i.hasNext(); ) {@b@        CapacityMonitorEventListener listener = (CapacityMonitorEventListener)i.next();@b@        cm.addCapacityEventListener(listener);@b@      }@b@  }@b@@b@  public String getName()@b@  {@b@    return this.name;@b@  }@b@@b@  public void setName(String newName)@b@  {@b@    this.name = newName;@b@  }@b@@b@  public int getRoundingFactor()@b@  {@b@    return ((this.monitor == null) ? 0 : this.monitor.getRoundingFactor());@b@  }@b@@b@  public void setRoundingFactor(int newRoundingFactor)@b@  {@b@    if (this.monitor != null)@b@      this.monitor.setRoundingFactor(newRoundingFactor);@b@  }@b@@b@  public void addCapacityEventListener(CapacityMonitorEventListener l)@b@  {@b@    this.listeners.add(l);@b@    if (this.monitor != null)@b@      this.monitor.addCapacityEventListener(l);@b@  }@b@@b@  public void removeCapacityEventListener(CapacityMonitorEventListener l)@b@  {@b@    this.listeners.remove(l);@b@    if (this.monitor != null)@b@      this.monitor.removeCapacityEventListener(l);@b@  }@b@@b@  public int getCurrentCapacity()@b@  {@b@    return ((this.monitor == null) ? 100 : this.monitor.getCurrentCapacity());@b@  }@b@@b@  public int getRoundedCapacity()@b@  {@b@    return ((this.monitor == null) ? 100 : this.monitor.getRoundedCapacity());@b@  }@b@@b@  public long getCurrentValue()@b@  {@b@    return ((this.monitor == null) ? 100L : this.monitor.getCurrentValue());@b@  }@b@@b@  public void setCurrentValue(long newCurrentValue)@b@  {@b@    if (this.monitor != null)@b@      this.monitor.setCurrentValue(newCurrentValue);@b@  }@b@@b@  public long getValueLimit()@b@  {@b@    return ((this.monitor == null) ? 100L : this.monitor.getValueLimit());@b@  }@b@@b@  public void setValueLimit(long newValueLimit)@b@  {@b@    if (this.monitor != null)@b@      this.monitor.setValueLimit(newValueLimit);@b@  }@b@@b@  public CapacityMonitorEvent generateCapacityMonitorEvent()@b@  {@b@    return ((this.monitor != null) ? this.monitor.generateCapacityMonitorEvent() : null);@b@  }@b@}
package org.activemq.capacity;@b@@b@public abstract interface CapacityMonitor@b@{@b@  public abstract String getName();@b@@b@  public abstract void setName(String paramString);@b@@b@  public abstract int getRoundingFactor();@b@@b@  public abstract void setRoundingFactor(int paramInt);@b@@b@  public abstract void addCapacityEventListener(CapacityMonitorEventListener paramCapacityMonitorEventListener);@b@@b@  public abstract void removeCapacityEventListener(CapacityMonitorEventListener paramCapacityMonitorEventListener);@b@@b@  public abstract int getCurrentCapacity();@b@@b@  public abstract int getRoundedCapacity();@b@@b@  public abstract long getCurrentValue();@b@@b@  public abstract void setCurrentValue(long paramLong);@b@@b@  public abstract long getValueLimit();@b@@b@  public abstract void setValueLimit(long paramLong);@b@@b@  public abstract CapacityMonitorEvent generateCapacityMonitorEvent();@b@@b@  public static class BasicCapacityMonitor@b@  {@b@  }@b@}
  • ◆ 相关内容