From c4dcf2eff459b9da5e647c0a3b92fc9acd3585b7 Mon Sep 17 00:00:00 2001 From: "zhongqing.lin" Date: Tue, 20 Oct 2015 09:21:27 +0800 Subject: [PATCH 1/2] Add exchange and queue parameters. --- .gitignore | 31 +- .../jmeter/protocol/amqp/AMQPPublisher.java | 562 +++++----- .../jmeter/protocol/amqp/AMQPSampler.java | 962 +++++++++--------- .../protocol/amqp/gui/AMQPSamplerGui.java | 635 ++++++------ 4 files changed, 1136 insertions(+), 1054 deletions(-) diff --git a/.gitignore b/.gitignore index e4f8f4c..04ed201 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,16 @@ -.classpath -.project -.settings/ -*\.sw[po] -.DS_Store -stats/ -target/ -gc.log -lib/ -/nbproject/private/ -/build/ -/dist/ -/ivy/ -*.iml -.idea +.classpath +.project +.settings/ +*\.sw[po] +.DS_Store +stats/ +target/ +gc.log +lib/ +/nbproject/private/ +/build/ +/dist/ +/ivy/ +*.iml +.idea +/bin/ diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java index 242de14..b81c8a5 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java @@ -1,281 +1,281 @@ -package com.zeroclue.jmeter.protocol.amqp; - -import com.rabbitmq.client.AMQP; - -import java.io.IOException; -import java.security.*; -import java.util.*; - -import org.apache.commons.lang3.StringUtils; -import org.apache.jmeter.config.Arguments; -import org.apache.jmeter.samplers.Entry; -import org.apache.jmeter.samplers.Interruptible; -import org.apache.jmeter.samplers.SampleResult; -import org.apache.jmeter.testelement.property.TestElementProperty; -import org.apache.jorphan.logging.LoggingManager; -import org.apache.log.Logger; - -import com.rabbitmq.client.Channel; - -/** - * JMeter creates an instance of a sampler class for every occurrence of the - * element in every thread. [some additional copies may be created before the - * test run starts] - * - * Thus each sampler is guaranteed to be called by a single thread - there is no - * need to synchronize access to instance variables. - * - * However, access to class fields must be synchronized. - */ -public class AMQPPublisher extends AMQPSampler implements Interruptible { - - private static final long serialVersionUID = -8420658040465788497L; - - private static final Logger log = LoggingManager.getLoggerForClass(); - - //++ These are JMX names, and must not be changed - private final static String MESSAGE = "AMQPPublisher.Message"; - private final static String MESSAGE_ROUTING_KEY = "AMQPPublisher.MessageRoutingKey"; - private final static String MESSAGE_TYPE = "AMQPPublisher.MessageType"; - private final static String REPLY_TO_QUEUE = "AMQPPublisher.ReplyToQueue"; - private final static String CONTENT_TYPE = "AMQPPublisher.ContentType"; - private final static String CORRELATION_ID = "AMQPPublisher.CorrelationId"; - private final static String MESSAGE_ID = "AMQPPublisher.MessageId"; - private final static String HEADERS = "AMQPPublisher.Headers"; - - public static boolean DEFAULT_PERSISTENT = false; - private final static String PERSISTENT = "AMQPConsumer.Persistent"; - - public static boolean DEFAULT_USE_TX = false; - private final static String USE_TX = "AMQPConsumer.UseTx"; - - private transient Channel channel; - - public AMQPPublisher() { - super(); - } - - /** - * {@inheritDoc} - */ - @Override - public SampleResult sample(Entry e) { - SampleResult result = new SampleResult(); - result.setSampleLabel(getName()); - result.setSuccessful(false); - result.setResponseCode("500"); - - try { - initChannel(); - } catch (Exception ex) { - log.error("Failed to initialize channel : ", ex); - result.setResponseMessage(ex.toString()); - return result; - } - - String data = getMessage(); // Sampler data - - result.setSampleLabel(getTitle()); - /* - * Perform the sampling - */ - - // aggregate samples. - int loop = getIterationsAsInt(); - result.sampleStart(); // Start timing - try { - AMQP.BasicProperties messageProperties = getProperties(); - byte[] messageBytes = getMessageBytes(); - - for (int idx = 0; idx < loop; idx++) { - // try to force jms semantics. - // but this does not work since RabbitMQ does not sync to disk if consumers are connected as - // seen by iostat -cd 1. TPS value remains at 0. - - channel.basicPublish(getExchange(), getMessageRoutingKey(), messageProperties, messageBytes); - - } - - // commit the sample. - if (getUseTx()) { - channel.txCommit(); - } - - /* - * Set up the sample result details - */ - result.setSamplerData(data); - result.setResponseData(new String(messageBytes), null); - result.setDataType(SampleResult.TEXT); - - result.setResponseCodeOK(); - result.setResponseMessage("OK"); - result.setSuccessful(true); - } catch (Exception ex) { - log.debug(ex.getMessage(), ex); - result.setResponseCode("000"); - result.setResponseMessage(ex.toString()); - } - finally { - result.sampleEnd(); // End timimg - } - - return result; - } - - - private byte[] getMessageBytes() { - return getMessage().getBytes(); - } - - /** - * @return the message routing key for the sample - */ - public String getMessageRoutingKey() { - return getPropertyAsString(MESSAGE_ROUTING_KEY); - } - - public void setMessageRoutingKey(String content) { - setProperty(MESSAGE_ROUTING_KEY, content); - } - - /** - * @return the message for the sample - */ - public String getMessage() { - return getPropertyAsString(MESSAGE); - } - - public void setMessage(String content) { - setProperty(MESSAGE, content); - } - - /** - * @return the message type for the sample - */ - public String getMessageType() { - return getPropertyAsString(MESSAGE_TYPE); - } - - public void setMessageType(String content) { - setProperty(MESSAGE_TYPE, content); - } - - /** - * @return the reply-to queue for the sample - */ - public String getReplyToQueue() { - return getPropertyAsString(REPLY_TO_QUEUE); - } - - public void setReplyToQueue(String content) { - setProperty(REPLY_TO_QUEUE, content); - } - - public String getContentType() { - return getPropertyAsString(CONTENT_TYPE); - } - - public void setContentType(String contentType) { - setProperty(CONTENT_TYPE, contentType); - } - - /** - * @return the correlation identifier for the sample - */ - public String getCorrelationId() { - return getPropertyAsString(CORRELATION_ID); - } - - public void setCorrelationId(String content) { - setProperty(CORRELATION_ID, content); - } - - /** - * @return the message id for the sample - */ - public String getMessageId() { - return getPropertyAsString(MESSAGE_ID); - } - - public void setMessageId(String content) { - setProperty(MESSAGE_ID, content); - } - - public Arguments getHeaders() { - return (Arguments) getProperty(HEADERS).getObjectValue(); - } - - public void setHeaders(Arguments headers) { - setProperty(new TestElementProperty(HEADERS, headers)); - } - - public Boolean getPersistent() { - return getPropertyAsBoolean(PERSISTENT, DEFAULT_PERSISTENT); - } - - public void setPersistent(Boolean persistent) { - setProperty(PERSISTENT, persistent); - } - - public Boolean getUseTx() { - return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX); - } - - public void setUseTx(Boolean tx) { - setProperty(USE_TX, tx); - } - - @Override - public boolean interrupt() { - cleanup(); - return true; - } - - @Override - protected Channel getChannel() { - return channel; - } - - @Override - protected void setChannel(Channel channel) { - this.channel = channel; - } - - protected AMQP.BasicProperties getProperties() { - final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); - - final int deliveryMode = getPersistent() ? 2 : 1; - final String contentType = StringUtils.defaultIfEmpty(getContentType(), "text/plain"); - - builder.contentType(contentType) - .deliveryMode(deliveryMode) - .priority(0) - .correlationId(getCorrelationId()) - .replyTo(getReplyToQueue()) - .type(getMessageType()) - .headers(prepareHeaders()) - .build(); - if (getMessageId() != null && getMessageId().isEmpty()) { - builder.messageId(getMessageId()); - } - return builder.build(); - } - - protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { - boolean ret = super.initChannel(); - if (getUseTx()) { - channel.txSelect(); - } - return ret; - } - - private Map prepareHeaders() { - Map result = new HashMap(); - Map source = getHeaders().getArgumentsAsMap(); - for (Map.Entry item : source.entrySet()) { - result.put(item.getKey(), item.getValue()); - } - return result; - } -} +package com.zeroclue.jmeter.protocol.amqp; + +import com.rabbitmq.client.AMQP; + +import java.io.IOException; +import java.security.*; +import java.util.*; + +import org.apache.commons.lang3.StringUtils; +import org.apache.jmeter.config.Arguments; +import org.apache.jmeter.samplers.Entry; +import org.apache.jmeter.samplers.Interruptible; +import org.apache.jmeter.samplers.SampleResult; +import org.apache.jmeter.testelement.property.TestElementProperty; +import org.apache.jorphan.logging.LoggingManager; +import org.apache.log.Logger; + +import com.rabbitmq.client.Channel; + +/** + * JMeter creates an instance of a sampler class for every occurrence of the + * element in every thread. [some additional copies may be created before the + * test run starts] + * + * Thus each sampler is guaranteed to be called by a single thread - there is no + * need to synchronize access to instance variables. + * + * However, access to class fields must be synchronized. + */ +public class AMQPPublisher extends AMQPSampler implements Interruptible { + + private static final long serialVersionUID = -8420658040465788497L; + + private static final Logger log = LoggingManager.getLoggerForClass(); + + //++ These are JMX names, and must not be changed + private final static String MESSAGE = "AMQPPublisher.Message"; + private final static String MESSAGE_ROUTING_KEY = "AMQPPublisher.MessageRoutingKey"; + private final static String MESSAGE_TYPE = "AMQPPublisher.MessageType"; + private final static String REPLY_TO_QUEUE = "AMQPPublisher.ReplyToQueue"; + private final static String CONTENT_TYPE = "AMQPPublisher.ContentType"; + private final static String CORRELATION_ID = "AMQPPublisher.CorrelationId"; + private final static String MESSAGE_ID = "AMQPPublisher.MessageId"; + private final static String HEADERS = "AMQPPublisher.Headers"; + + public static boolean DEFAULT_PERSISTENT = false; + private final static String PERSISTENT = "AMQPConsumer.Persistent"; + + public static boolean DEFAULT_USE_TX = false; + private final static String USE_TX = "AMQPConsumer.UseTx"; + + private transient Channel channel; + + public AMQPPublisher() { + super(); + } + + /** + * {@inheritDoc} + */ + @Override + public SampleResult sample(Entry e) { + SampleResult result = new SampleResult(); + result.setSampleLabel(getName()); + result.setSuccessful(false); + result.setResponseCode("500"); + + try { + initChannel(); + } catch (Exception ex) { + log.error("Failed to initialize channel : ", ex); + result.setResponseMessage(ex.toString()); + return result; + } + + String data = getMessage(); // Sampler data + + result.setSampleLabel(getTitle()); + /* + * Perform the sampling + */ + + // aggregate samples. + int loop = getIterationsAsInt(); + result.sampleStart(); // Start timing + try { + AMQP.BasicProperties messageProperties = getProperties(); + byte[] messageBytes = getMessageBytes(); + + for (int idx = 0; idx < loop; idx++) { + // try to force jms semantics. + // but this does not work since RabbitMQ does not sync to disk if consumers are connected as + // seen by iostat -cd 1. TPS value remains at 0. + + channel.basicPublish(getExchange(), getMessageRoutingKey(), messageProperties, messageBytes); + + } + + // commit the sample. + if (getUseTx()) { + channel.txCommit(); + } + + /* + * Set up the sample result details + */ + result.setSamplerData(data); + result.setResponseData(new String(messageBytes), null); + result.setDataType(SampleResult.TEXT); + + result.setResponseCodeOK(); + result.setResponseMessage("OK"); + result.setSuccessful(true); + } catch (Exception ex) { + log.debug(ex.getMessage(), ex); + result.setResponseCode("000"); + result.setResponseMessage(ex.toString()); + } + finally { + result.sampleEnd(); // End timimg + } + + return result; + } + + + private byte[] getMessageBytes() { + return getMessage().getBytes(); + } + + /** + * @return the message routing key for the sample + */ + public String getMessageRoutingKey() { + return getPropertyAsString(MESSAGE_ROUTING_KEY); + } + + public void setMessageRoutingKey(String content) { + setProperty(MESSAGE_ROUTING_KEY, content); + } + + /** + * @return the message for the sample + */ + public String getMessage() { + return getPropertyAsString(MESSAGE); + } + + public void setMessage(String content) { + setProperty(MESSAGE, content); + } + + /** + * @return the message type for the sample + */ + public String getMessageType() { + return getPropertyAsString(MESSAGE_TYPE); + } + + public void setMessageType(String content) { + setProperty(MESSAGE_TYPE, content); + } + + /** + * @return the reply-to queue for the sample + */ + public String getReplyToQueue() { + return getPropertyAsString(REPLY_TO_QUEUE); + } + + public void setReplyToQueue(String content) { + setProperty(REPLY_TO_QUEUE, content); + } + + public String getContentType() { + return getPropertyAsString(CONTENT_TYPE); + } + + public void setContentType(String contentType) { + setProperty(CONTENT_TYPE, contentType); + } + + /** + * @return the correlation identifier for the sample + */ + public String getCorrelationId() { + return getPropertyAsString(CORRELATION_ID); + } + + public void setCorrelationId(String content) { + setProperty(CORRELATION_ID, content); + } + + /** + * @return the message id for the sample + */ + public String getMessageId() { + return getPropertyAsString(MESSAGE_ID); + } + + public void setMessageId(String content) { + setProperty(MESSAGE_ID, content); + } + + public Arguments getHeaders() { + return (Arguments) getProperty(HEADERS).getObjectValue(); + } + + public void setHeaders(Arguments headers) { + setProperty(new TestElementProperty(HEADERS, headers)); + } + + public Boolean getPersistent() { + return getPropertyAsBoolean(PERSISTENT, DEFAULT_PERSISTENT); + } + + public void setPersistent(Boolean persistent) { + setProperty(PERSISTENT, persistent); + } + + public Boolean getUseTx() { + return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX); + } + + public void setUseTx(Boolean tx) { + setProperty(USE_TX, tx); + } + + @Override + public boolean interrupt() { + cleanup(); + return true; + } + + @Override + protected Channel getChannel() { + return channel; + } + + @Override + protected void setChannel(Channel channel) { + this.channel = channel; + } + + protected AMQP.BasicProperties getProperties() { + final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); + + final int deliveryMode = getPersistent() ? 2 : 1; + final String contentType = StringUtils.defaultIfEmpty(getContentType(), "text/plain"); + + builder.contentType(contentType) + .deliveryMode(deliveryMode) + .priority(0) + .correlationId(getCorrelationId()) + .replyTo(getReplyToQueue()) + .type(getMessageType()) + .headers(prepareHeaders()) + .build(); + if (getMessageId() != null && getMessageId().isEmpty()) { + builder.messageId(getMessageId()); + } + return builder.build(); + } + + protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { + boolean ret = super.initChannel(); + if (getUseTx()) { + channel.txSelect(); + } + return ret; + } + + private Map prepareHeaders() { + Map result = new HashMap(); + Map source = getHeaders().getArgumentsAsMap(); + for (Map.Entry item : source.entrySet()) { + result.put(item.getKey(), item.getValue()); + } + return result; + } +} diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java index c39cf9b..2fed028 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java @@ -1,471 +1,491 @@ -package com.zeroclue.jmeter.protocol.amqp; - -import java.io.IOException; -import java.util.*; -import java.security.*; - -import com.rabbitmq.client.*; -import org.apache.jmeter.samplers.AbstractSampler; -import org.apache.jmeter.testelement.ThreadListener; -import org.apache.jorphan.logging.LoggingManager; -import org.apache.log.Logger; - -import com.rabbitmq.client.AMQP.BasicProperties; -import org.apache.commons.lang3.StringUtils; - -public abstract class AMQPSampler extends AbstractSampler implements ThreadListener { - - public static final boolean DEFAULT_EXCHANGE_DURABLE = true; - public static final boolean DEFAULT_EXCHANGE_REDECLARE = false; - public static final boolean DEFAULT_QUEUE_REDECLARE = false; - - public static final int DEFAULT_PORT = 5672; - public static final String DEFAULT_PORT_STRING = Integer.toString(DEFAULT_PORT); - - public static final int DEFAULT_TIMEOUT = 1000; - public static final String DEFAULT_TIMEOUT_STRING = Integer.toString(DEFAULT_TIMEOUT); - - public static final int DEFAULT_ITERATIONS = 1; - public static final String DEFAULT_ITERATIONS_STRING = Integer.toString(DEFAULT_ITERATIONS); - - private static final Logger log = LoggingManager.getLoggerForClass(); - - - //++ These are JMX names, and must not be changed - protected static final String EXCHANGE = "AMQPSampler.Exchange"; - protected static final String EXCHANGE_TYPE = "AMQPSampler.ExchangeType"; - protected static final String EXCHANGE_DURABLE = "AMQPSampler.ExchangeDurable"; - protected static final String EXCHANGE_REDECLARE = "AMQPSampler.ExchangeRedeclare"; - protected static final String QUEUE = "AMQPSampler.Queue"; - protected static final String ROUTING_KEY = "AMQPSampler.RoutingKey"; - protected static final String VIRUTAL_HOST = "AMQPSampler.VirtualHost"; - protected static final String HOST = "AMQPSampler.Host"; - protected static final String PORT = "AMQPSampler.Port"; - protected static final String SSL = "AMQPSampler.SSL"; - protected static final String USERNAME = "AMQPSampler.Username"; - protected static final String PASSWORD = "AMQPSampler.Password"; - private static final String TIMEOUT = "AMQPSampler.Timeout"; - private static final String ITERATIONS = "AMQPSampler.Iterations"; - private static final String MESSAGE_TTL = "AMQPSampler.MessageTTL"; - private static final String MESSAGE_EXPIRES = "AMQPSampler.MessageExpires"; - private static final String QUEUE_DURABLE = "AMQPSampler.QueueDurable"; - private static final String QUEUE_REDECLARE = "AMQPSampler.Redeclare"; - private static final String QUEUE_EXCLUSIVE = "AMQPSampler.QueueExclusive"; - private static final String QUEUE_AUTO_DELETE = "AMQPSampler.QueueAutoDelete"; - private static final int DEFAULT_HEARTBEAT = 1; - - private transient ConnectionFactory factory; - private transient Connection connection; - - protected AMQPSampler(){ - factory = new ConnectionFactory(); - factory.setRequestedHeartbeat(DEFAULT_HEARTBEAT); - } - - protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { - Channel channel = getChannel(); - - if(channel != null && !channel.isOpen()){ - log.warn("channel " + channel.getChannelNumber() - + " closed unexpectedly: ", channel.getCloseReason()); - channel = null; // so we re-open it below - } - - if(channel == null) { - channel = createChannel(); - setChannel(channel); - - //TODO: Break out queue binding - boolean queueConfigured = (getQueue() != null && !getQueue().isEmpty()); - - if(queueConfigured) { - if (getQueueRedeclare()) { - deleteQueue(); - } - - AMQP.Queue.DeclareOk declareQueueResp = channel.queueDeclare(getQueue(), queueDurable(), queueExclusive(), queueAutoDelete(), getQueueArguments()); - } - - if(!StringUtils.isBlank(getExchange())) { //Use a named exchange - if (getExchangeRedeclare()) { - deleteExchange(); - } - - AMQP.Exchange.DeclareOk declareExchangeResp = channel.exchangeDeclare(getExchange(), getExchangeType(), getExchangeDurable()); - if (queueConfigured) { - channel.queueBind(getQueue(), getExchange(), getRoutingKey()); - } - } - - log.info("bound to:" - +"\n\t queue: " + getQueue() - +"\n\t exchange: " + getExchange() - +"\n\t exchange(D)? " + getExchangeDurable() - +"\n\t routing key: " + getRoutingKey() - +"\n\t arguments: " + getQueueArguments() - ); - - } - return true; - } - - private Map getQueueArguments() { - Map arguments = new HashMap(); - - if(getMessageTTL() != null && !getMessageTTL().isEmpty()) - arguments.put("x-message-ttl", getMessageTTLAsInt()); - - if(getMessageExpires() != null && !getMessageExpires().isEmpty()) - arguments.put("x-expires", getMessageExpiresAsInt()); - - return arguments; - } - - protected abstract Channel getChannel(); - protected abstract void setChannel(Channel channel); - - /** - * @return a string for the sampleResult Title - */ - protected String getTitle() { - return this.getName(); - } - - protected int getTimeoutAsInt() { - if (getPropertyAsInt(TIMEOUT) < 1) { - return DEFAULT_TIMEOUT; - } - return getPropertyAsInt(TIMEOUT); - } - - public String getTimeout() { - return getPropertyAsString(TIMEOUT, DEFAULT_TIMEOUT_STRING); - } - - - public void setTimeout(String s) { - setProperty(TIMEOUT, s); - } - - public String getIterations() { - return getPropertyAsString(ITERATIONS, DEFAULT_ITERATIONS_STRING); - } - - public void setIterations(String s) { - setProperty(ITERATIONS, s); - } - - public int getIterationsAsInt() { - return getPropertyAsInt(ITERATIONS); - } - - public String getExchange() { - return getPropertyAsString(EXCHANGE); - } - - public void setExchange(String name) { - setProperty(EXCHANGE, name); - } - - - public boolean getExchangeDurable() { - return getPropertyAsBoolean(EXCHANGE_DURABLE); - } - - public void setExchangeDurable(boolean durable) { - setProperty(EXCHANGE_DURABLE, durable); - } - - - public String getExchangeType() { - return getPropertyAsString(EXCHANGE_TYPE); - } - - public void setExchangeType(String name) { - setProperty(EXCHANGE_TYPE, name); - } - - - public Boolean getExchangeRedeclare() { - return getPropertyAsBoolean(EXCHANGE_REDECLARE); - } - - public void setExchangeRedeclare(Boolean content) { - setProperty(EXCHANGE_REDECLARE, content); - } - - public String getQueue() { - return getPropertyAsString(QUEUE); - } - - public void setQueue(String name) { - setProperty(QUEUE, name); - } - - - public String getRoutingKey() { - return getPropertyAsString(ROUTING_KEY); - } - - public void setRoutingKey(String name) { - setProperty(ROUTING_KEY, name); - } - - - public String getVirtualHost() { - return getPropertyAsString(VIRUTAL_HOST); - } - - public void setVirtualHost(String name) { - setProperty(VIRUTAL_HOST, name); - } - - - public String getMessageTTL() { - return getPropertyAsString(MESSAGE_TTL); - } - - public void setMessageTTL(String name) { - setProperty(MESSAGE_TTL, name); - } - - protected Integer getMessageTTLAsInt() { - if (getPropertyAsInt(MESSAGE_TTL) < 1) { - return null; - } - return getPropertyAsInt(MESSAGE_TTL); - } - - - public String getMessageExpires() { - return getPropertyAsString(MESSAGE_EXPIRES); - } - - public void setMessageExpires(String name) { - setProperty(MESSAGE_EXPIRES, name); - } - - protected Integer getMessageExpiresAsInt() { - if (getPropertyAsInt(MESSAGE_EXPIRES) < 1) { - return null; - } - return getPropertyAsInt(MESSAGE_EXPIRES); - } - - - public String getHost() { - return getPropertyAsString(HOST); - } - - public void setHost(String name) { - setProperty(HOST, name); - } - - - public String getPort() { - return getPropertyAsString(PORT); - } - - public void setPort(String name) { - setProperty(PORT, name); - } - - protected int getPortAsInt() { - if (getPropertyAsInt(PORT) < 1) { - return DEFAULT_PORT; - } - return getPropertyAsInt(PORT); - } - - public void setConnectionSSL(String content) { - setProperty(SSL, content); - } - - public void setConnectionSSL(Boolean value) { - setProperty(SSL, value.toString()); - } - - public boolean connectionSSL() { - return getPropertyAsBoolean(SSL); - } - - - public String getUsername() { - return getPropertyAsString(USERNAME); - } - - public void setUsername(String name) { - setProperty(USERNAME, name); - } - - - public String getPassword() { - return getPropertyAsString(PASSWORD); - } - - public void setPassword(String name) { - setProperty(PASSWORD, name); - } - - /** - * @return the whether or not the queue is durable - */ - public String getQueueDurable() { - return getPropertyAsString(QUEUE_DURABLE); - } - - public void setQueueDurable(String content) { - setProperty(QUEUE_DURABLE, content); - } - - public void setQueueDurable(Boolean value) { - setProperty(QUEUE_DURABLE, value.toString()); - } - - public boolean queueDurable(){ - return getPropertyAsBoolean(QUEUE_DURABLE); - } - - /** - * @return the whether or not the queue is exclusive - */ - public String getQueueExclusive() { - return getPropertyAsString(QUEUE_EXCLUSIVE); - } - - public void setQueueExclusive(String content) { - setProperty(QUEUE_EXCLUSIVE, content); - } - - public void setQueueExclusive(Boolean value) { - setProperty(QUEUE_EXCLUSIVE, value.toString()); - } - - public boolean queueExclusive(){ - return getPropertyAsBoolean(QUEUE_EXCLUSIVE); - } - - /** - * @return the whether or not the queue should auto delete - */ - public String getQueueAutoDelete() { - return getPropertyAsString(QUEUE_AUTO_DELETE); - } - - public void setQueueAutoDelete(String content) { - setProperty(QUEUE_AUTO_DELETE, content); - } - - public void setQueueAutoDelete(Boolean value) { - setProperty(QUEUE_AUTO_DELETE, value.toString()); - } - - public boolean queueAutoDelete(){ - return getPropertyAsBoolean(QUEUE_AUTO_DELETE); - } - - - public Boolean getQueueRedeclare() { - return getPropertyAsBoolean(QUEUE_REDECLARE); - } - - public void setQueueRedeclare(Boolean content) { - setProperty(QUEUE_REDECLARE, content); - } - - protected void cleanup() { - try { - //getChannel().close(); // closing the connection will close the channel if it's still open - if(connection != null && connection.isOpen()) - connection.close(); - } catch (IOException e) { - log.error("Failed to close connection", e); - } - } - - @Override - public void threadFinished() { - log.info("AMQPSampler.threadFinished called"); - cleanup(); - } - - @Override - public void threadStarted() { - - } - - protected Channel createChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { - log.info("Creating channel " + getVirtualHost()+":"+getPortAsInt()); - - if (connection == null || !connection.isOpen()) { - factory.setConnectionTimeout(getTimeoutAsInt()); - factory.setVirtualHost(getVirtualHost()); - factory.setUsername(getUsername()); - factory.setPassword(getPassword()); - if (connectionSSL()) { - factory.useSslProtocol("TLS"); - } - - log.info("RabbitMQ ConnectionFactory using:" - +"\n\t virtual host: " + getVirtualHost() - +"\n\t host: " + getHost() - +"\n\t port: " + getPort() - +"\n\t username: " + getUsername() - +"\n\t password: " + getPassword() - +"\n\t timeout: " + getTimeout() - +"\n\t heartbeat: " + factory.getRequestedHeartbeat() - +"\nin " + this - ); - - String[] hosts = getHost().split(","); - Address[] addresses = new Address[hosts.length]; - for (int i = 0; i < hosts.length; i++) { - addresses[i] = new Address(hosts[i], getPortAsInt()); - } - log.info("Using hosts: " + Arrays.toString(hosts) + " addresses: " + Arrays.toString(addresses)); - connection = factory.newConnection(addresses); - } - - Channel channel = connection.createChannel(); - if(!channel.isOpen()){ - log.fatalError("Failed to open channel: " + channel.getCloseReason().getLocalizedMessage()); - } - return channel; - } - - protected void deleteQueue() throws IOException, NoSuchAlgorithmException, KeyManagementException { - // use a different channel since channel closes on exception. - Channel channel = createChannel(); - try { - log.info("Deleting queue " + getQueue()); - channel.queueDelete(getQueue()); - } - catch(Exception ex) { - log.debug(ex.toString(), ex); - // ignore it. - } - finally { - if (channel.isOpen()) { - channel.close(); - } - } - } - - protected void deleteExchange() throws IOException, NoSuchAlgorithmException, KeyManagementException { - // use a different channel since channel closes on exception. - Channel channel = createChannel(); - try { - log.info("Deleting exchange " + getExchange()); - channel.exchangeDelete(getExchange()); - } - catch(Exception ex) { - log.debug(ex.toString(), ex); - // ignore it. - } - finally { - if (channel.isOpen()) { - channel.close(); - } - } - } -} +package com.zeroclue.jmeter.protocol.amqp; + +import java.io.IOException; +import java.util.*; +import java.security.*; + +import com.rabbitmq.client.*; + +import org.apache.jmeter.config.Arguments; +import org.apache.jmeter.samplers.AbstractSampler; +import org.apache.jmeter.testelement.ThreadListener; +import org.apache.jmeter.testelement.property.TestElementProperty; +import org.apache.jorphan.logging.LoggingManager; +import org.apache.log.Logger; + +import com.rabbitmq.client.AMQP.BasicProperties; + +import org.apache.commons.lang3.StringUtils; + +public abstract class AMQPSampler extends AbstractSampler implements ThreadListener { + + public static final boolean DEFAULT_EXCHANGE_DURABLE = true; + public static final boolean DEFAULT_EXCHANGE_REDECLARE = false; + public static final boolean DEFAULT_QUEUE_REDECLARE = false; + public static final boolean DEFAULT_EXCHANGE_AUTO_DELETE = false; + public static final boolean DEFAULT_EXCHANGE_INTERNAL = false; + + public static final int DEFAULT_PORT = 5672; + public static final String DEFAULT_PORT_STRING = Integer.toString(DEFAULT_PORT); + + public static final int DEFAULT_TIMEOUT = 1000; + public static final String DEFAULT_TIMEOUT_STRING = Integer.toString(DEFAULT_TIMEOUT); + + public static final int DEFAULT_ITERATIONS = 1; + public static final String DEFAULT_ITERATIONS_STRING = Integer.toString(DEFAULT_ITERATIONS); + + private static final Logger log = LoggingManager.getLoggerForClass(); + + // ++ These are JMX names, and must not be changed + protected static final String EXCHANGE = "AMQPSampler.Exchange"; + protected static final String EXCHANGE_TYPE = "AMQPSampler.ExchangeType"; + protected static final String EXCHANGE_DURABLE = "AMQPSampler.ExchangeDurable"; + protected static final String EXCHANGE_REDECLARE = "AMQPSampler.ExchangeRedeclare"; + protected static final String EXCHANGE_PARAMETERS = "AMQPSampler.ExchangeParameters"; + private static final String EXCHANGE_AUTO_DELETE = "AMQPSampler.ExchangeAutoDelete"; + private static final String EXCHANGE_INTERNAL = "AMQPSampler.ExchangeInternal"; + + protected static final String QUEUE = "AMQPSampler.Queue"; + protected static final String ROUTING_KEY = "AMQPSampler.RoutingKey"; + protected static final String VIRUTAL_HOST = "AMQPSampler.VirtualHost"; + protected static final String HOST = "AMQPSampler.Host"; + protected static final String PORT = "AMQPSampler.Port"; + protected static final String SSL = "AMQPSampler.SSL"; + protected static final String USERNAME = "AMQPSampler.Username"; + protected static final String PASSWORD = "AMQPSampler.Password"; + private static final String TIMEOUT = "AMQPSampler.Timeout"; + private static final String ITERATIONS = "AMQPSampler.Iterations"; + private static final String MESSAGE_TTL = "AMQPSampler.MessageTTL"; + private static final String MESSAGE_EXPIRES = "AMQPSampler.MessageExpires"; + private static final String QUEUE_DURABLE = "AMQPSampler.QueueDurable"; + private static final String QUEUE_REDECLARE = "AMQPSampler.Redeclare"; + private static final String QUEUE_EXCLUSIVE = "AMQPSampler.QueueExclusive"; + private static final String QUEUE_AUTO_DELETE = "AMQPSampler.QueueAutoDelete"; + protected static final String QUEUE_PARAMETERS = "AMQPSampler.QueueParameters"; + private static final int DEFAULT_HEARTBEAT = 1; + private transient ConnectionFactory factory; + private transient Connection connection; + + protected AMQPSampler() { + factory = new ConnectionFactory(); + factory.setRequestedHeartbeat(DEFAULT_HEARTBEAT); + } + + protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { + Channel channel = getChannel(); + + if (channel != null && !channel.isOpen()) { + log.warn("channel " + channel.getChannelNumber() + " closed unexpectedly: ", channel.getCloseReason()); + channel = null; // so we re-open it below + } + + if (channel == null) { + channel = createChannel(); + setChannel(channel); + + // TODO: Break out queue binding + boolean queueConfigured = (getQueue() != null && !getQueue().isEmpty()); + + if (queueConfigured) { + if (getQueueRedeclare()) { + deleteQueue(); + } + + AMQP.Queue.DeclareOk declareQueueResp = channel.queueDeclare(getQueue(), queueDurable(), queueExclusive(), + queueAutoDelete(), getQueueArguments()); + } + + if (!StringUtils.isBlank(getExchange())) { // Use a named exchange + if (getExchangeRedeclare()) { + deleteExchange(); + } + + AMQP.Exchange.DeclareOk declareExchangeResp = channel.exchangeDeclare(getExchange(), getExchangeType(), + getExchangeDurable(), getAutoDelete(), getInternal(), getExchangeArguments()); + if (queueConfigured) { + channel.queueBind(getQueue(), getExchange(), getRoutingKey()); + } + } + + log.info("bound to:" + "\n\t queue: " + getQueue() + "\n\t exchange: " + getExchange() + "\n\t exchange(D)? " + + getExchangeDurable() + "\n\t routing key: " + getRoutingKey() + "\n\t arguments: " + getQueueArguments()); + + } + return true; + } + + private Map getQueueArguments() { + Map arguments = new HashMap(); + + if (getMessageTTL() != null && !getMessageTTL().isEmpty()) + arguments.put("x-message-ttl", getMessageTTLAsInt()); + + if (getMessageExpires() != null && !getMessageExpires().isEmpty()) + arguments.put("x-expires", getMessageExpiresAsInt()); + arguments.putAll(getQueueParameters().getArgumentsAsMap()); + return arguments; + } + + private Map getExchangeArguments() { + Map arguments = new HashMap(); + arguments.putAll(getExchangeParameters().getArgumentsAsMap()); + return arguments; + } + + protected abstract Channel getChannel(); + + protected abstract void setChannel(Channel channel); + + /** + * @return a string for the sampleResult Title + */ + protected String getTitle() { + return this.getName(); + } + + protected int getTimeoutAsInt() { + if (getPropertyAsInt(TIMEOUT) < 1) { + return DEFAULT_TIMEOUT; + } + return getPropertyAsInt(TIMEOUT); + } + + public boolean getAutoDelete() { + return getPropertyAsBoolean(EXCHANGE_AUTO_DELETE, DEFAULT_EXCHANGE_AUTO_DELETE); + } + + public void setExchangeAutoDelete(boolean autoDelete) { + setProperty(EXCHANGE_AUTO_DELETE, autoDelete); + } + + public boolean getInternal() { + return getPropertyAsBoolean(EXCHANGE_INTERNAL, DEFAULT_EXCHANGE_INTERNAL); + } + + public void setInternal(boolean internal) { + setProperty(EXCHANGE_INTERNAL, internal); + } + + public String getTimeout() { + return getPropertyAsString(TIMEOUT, DEFAULT_TIMEOUT_STRING); + } + + public void setTimeout(String s) { + setProperty(TIMEOUT, s); + } + + public String getIterations() { + return getPropertyAsString(ITERATIONS, DEFAULT_ITERATIONS_STRING); + } + + public void setIterations(String s) { + setProperty(ITERATIONS, s); + } + + public int getIterationsAsInt() { + return getPropertyAsInt(ITERATIONS); + } + + public Arguments getExchangeParameters() { + return (Arguments) getProperty(EXCHANGE_PARAMETERS).getObjectValue(); + } + + public void setExchangeParameters(Arguments parameters) { + setProperty(new TestElementProperty(EXCHANGE_PARAMETERS, parameters)); + } + + public Arguments getQueueParameters() { + return (Arguments) getProperty(QUEUE_PARAMETERS).getObjectValue(); + } + + public void setQueueParameters(Arguments parameters) { + setProperty(new TestElementProperty(QUEUE_PARAMETERS, parameters)); + } + + public String getExchange() { + return getPropertyAsString(EXCHANGE); + } + + public void setExchange(String name) { + setProperty(EXCHANGE, name); + } + + public boolean getExchangeDurable() { + return getPropertyAsBoolean(EXCHANGE_DURABLE); + } + + public void setExchangeDurable(boolean durable) { + setProperty(EXCHANGE_DURABLE, durable); + } + + public String getExchangeType() { + return getPropertyAsString(EXCHANGE_TYPE); + } + + public void setExchangeType(String name) { + setProperty(EXCHANGE_TYPE, name); + } + + public Boolean getExchangeRedeclare() { + return getPropertyAsBoolean(EXCHANGE_REDECLARE); + } + + public void setExchangeRedeclare(Boolean content) { + setProperty(EXCHANGE_REDECLARE, content); + } + + public String getQueue() { + return getPropertyAsString(QUEUE); + } + + public void setQueue(String name) { + setProperty(QUEUE, name); + } + + public String getRoutingKey() { + return getPropertyAsString(ROUTING_KEY); + } + + public void setRoutingKey(String name) { + setProperty(ROUTING_KEY, name); + } + + public String getVirtualHost() { + return getPropertyAsString(VIRUTAL_HOST); + } + + public void setVirtualHost(String name) { + setProperty(VIRUTAL_HOST, name); + } + + public String getMessageTTL() { + return getPropertyAsString(MESSAGE_TTL); + } + + public void setMessageTTL(String name) { + setProperty(MESSAGE_TTL, name); + } + + protected Integer getMessageTTLAsInt() { + if (getPropertyAsInt(MESSAGE_TTL) < 1) { + return null; + } + return getPropertyAsInt(MESSAGE_TTL); + } + + public String getMessageExpires() { + return getPropertyAsString(MESSAGE_EXPIRES); + } + + public void setMessageExpires(String name) { + setProperty(MESSAGE_EXPIRES, name); + } + + protected Integer getMessageExpiresAsInt() { + if (getPropertyAsInt(MESSAGE_EXPIRES) < 1) { + return null; + } + return getPropertyAsInt(MESSAGE_EXPIRES); + } + + public String getHost() { + return getPropertyAsString(HOST); + } + + public void setHost(String name) { + setProperty(HOST, name); + } + + public String getPort() { + return getPropertyAsString(PORT); + } + + public void setPort(String name) { + setProperty(PORT, name); + } + + protected int getPortAsInt() { + if (getPropertyAsInt(PORT) < 1) { + return DEFAULT_PORT; + } + return getPropertyAsInt(PORT); + } + + public void setConnectionSSL(String content) { + setProperty(SSL, content); + } + + public void setConnectionSSL(Boolean value) { + setProperty(SSL, value.toString()); + } + + public boolean connectionSSL() { + return getPropertyAsBoolean(SSL); + } + + public String getUsername() { + return getPropertyAsString(USERNAME); + } + + public void setUsername(String name) { + setProperty(USERNAME, name); + } + + public String getPassword() { + return getPropertyAsString(PASSWORD); + } + + public void setPassword(String name) { + setProperty(PASSWORD, name); + } + + /** + * @return the whether or not the queue is durable + */ + public String getQueueDurable() { + return getPropertyAsString(QUEUE_DURABLE); + } + + public void setQueueDurable(String content) { + setProperty(QUEUE_DURABLE, content); + } + + public void setQueueDurable(Boolean value) { + setProperty(QUEUE_DURABLE, value.toString()); + } + + public boolean queueDurable() { + return getPropertyAsBoolean(QUEUE_DURABLE); + } + + /** + * @return the whether or not the queue is exclusive + */ + public String getQueueExclusive() { + return getPropertyAsString(QUEUE_EXCLUSIVE); + } + + public void setQueueExclusive(String content) { + setProperty(QUEUE_EXCLUSIVE, content); + } + + public void setQueueExclusive(Boolean value) { + setProperty(QUEUE_EXCLUSIVE, value.toString()); + } + + public boolean queueExclusive() { + return getPropertyAsBoolean(QUEUE_EXCLUSIVE); + } + + /** + * @return the whether or not the queue should auto delete + */ + public String getQueueAutoDelete() { + return getPropertyAsString(QUEUE_AUTO_DELETE); + } + + public void setQueueAutoDelete(String content) { + setProperty(QUEUE_AUTO_DELETE, content); + } + + public void setQueueAutoDelete(Boolean value) { + setProperty(QUEUE_AUTO_DELETE, value.toString()); + } + + public boolean queueAutoDelete() { + return getPropertyAsBoolean(QUEUE_AUTO_DELETE); + } + + public Boolean getQueueRedeclare() { + return getPropertyAsBoolean(QUEUE_REDECLARE); + } + + public void setQueueRedeclare(Boolean content) { + setProperty(QUEUE_REDECLARE, content); + } + + protected void cleanup() { + try { + // getChannel().close(); // closing the connection will close the channel if it's still open + if (connection != null && connection.isOpen()) + connection.close(); + } catch (IOException e) { + log.error("Failed to close connection", e); + } + } + + @Override + public void threadFinished() { + log.info("AMQPSampler.threadFinished called"); + cleanup(); + } + + @Override + public void threadStarted() { + + } + + protected Channel createChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { + log.info("Creating channel " + getVirtualHost() + ":" + getPortAsInt()); + + if (connection == null || !connection.isOpen()) { + factory.setConnectionTimeout(getTimeoutAsInt()); + factory.setVirtualHost(getVirtualHost()); + factory.setUsername(getUsername()); + factory.setPassword(getPassword()); + if (connectionSSL()) { + factory.useSslProtocol("TLS"); + } + + log.info("RabbitMQ ConnectionFactory using:" + "\n\t virtual host: " + getVirtualHost() + "\n\t host: " + getHost() + + "\n\t port: " + getPort() + "\n\t username: " + getUsername() + "\n\t password: " + getPassword() + + "\n\t timeout: " + getTimeout() + "\n\t heartbeat: " + factory.getRequestedHeartbeat() + "\nin " + this); + + String[] hosts = getHost().split(","); + Address[] addresses = new Address[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + addresses[i] = new Address(hosts[i], getPortAsInt()); + } + log.info("Using hosts: " + Arrays.toString(hosts) + " addresses: " + Arrays.toString(addresses)); + connection = factory.newConnection(addresses); + } + + Channel channel = connection.createChannel(); + if (!channel.isOpen()) { + log.fatalError("Failed to open channel: " + channel.getCloseReason().getLocalizedMessage()); + } + return channel; + } + + protected void deleteQueue() throws IOException, NoSuchAlgorithmException, KeyManagementException { + // use a different channel since channel closes on exception. + Channel channel = createChannel(); + try { + log.info("Deleting queue " + getQueue()); + channel.queueDelete(getQueue()); + } catch (Exception ex) { + log.debug(ex.toString(), ex); + // ignore it. + } finally { + if (channel.isOpen()) { + channel.close(); + } + } + } + + protected void deleteExchange() throws IOException, NoSuchAlgorithmException, KeyManagementException { + // use a different channel since channel closes on exception. + Channel channel = createChannel(); + try { + log.info("Deleting exchange " + getExchange()); + channel.exchangeDelete(getExchange()); + } catch (Exception ex) { + log.debug(ex.toString(), ex); + // ignore it. + } finally { + if (channel.isOpen()) { + channel.close(); + } + } + } +} diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java b/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java index 108b8ed..a4b30b7 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java @@ -1,287 +1,348 @@ -package com.zeroclue.jmeter.protocol.amqp.gui; - -import java.awt.*; - -import javax.swing.BorderFactory; -import javax.swing.JCheckBox; -import javax.swing.JPanel; - -import org.apache.jmeter.gui.util.VerticalPanel; -import org.apache.jmeter.samplers.gui.AbstractSamplerGui; -import org.apache.jmeter.testelement.TestElement; -import org.apache.jmeter.util.JMeterUtils; -import org.apache.jorphan.gui.JLabeledChoice; -import org.apache.jorphan.gui.JLabeledTextField; -import org.apache.jorphan.logging.LoggingManager; -import org.apache.log.Logger; - -import com.zeroclue.jmeter.protocol.amqp.AMQPSampler; - -public abstract class AMQPSamplerGui extends AbstractSamplerGui { - - private static final long serialVersionUID = 1L; - - private static final Logger log = LoggingManager.getLoggerForClass(); - - protected JLabeledTextField exchange = new JLabeledTextField("Exchange"); - private final JCheckBox exchangeRedeclare = new JCheckBox("Redeclare?", AMQPSampler.DEFAULT_EXCHANGE_REDECLARE); - protected JLabeledTextField queue = new JLabeledTextField("Queue"); - protected JLabeledTextField routingKey = new JLabeledTextField("Routing Key"); - protected JLabeledTextField virtualHost = new JLabeledTextField("Virtual Host"); - protected JLabeledTextField messageTTL = new JLabeledTextField("Message TTL"); - protected JLabeledTextField messageExpires = new JLabeledTextField("Expires"); - protected JLabeledChoice exchangeType = new JLabeledChoice("Exchange Type", new String[]{ "direct", "topic", "headers", "fanout"}); - private final JCheckBox exchangeDurable = new JCheckBox("Durable?", AMQPSampler.DEFAULT_EXCHANGE_DURABLE); - private final JCheckBox queueDurable = new JCheckBox("Durable?", true); - private final JCheckBox queueRedeclare = new JCheckBox("Redeclare?", AMQPSampler.DEFAULT_QUEUE_REDECLARE); - private final JCheckBox queueExclusive = new JCheckBox("Exclusive", true); - private final JCheckBox queueAutoDelete = new JCheckBox("Auto Delete?", true); - - protected JLabeledTextField host = new JLabeledTextField("Host"); - protected JLabeledTextField port = new JLabeledTextField("Port"); - protected JLabeledTextField timeout = new JLabeledTextField("Timeout"); - protected JLabeledTextField username = new JLabeledTextField("Username"); - protected JLabeledTextField password = new JLabeledTextField("Password"); - private final JCheckBox SSL = new JCheckBox("SSL?", false); - - private final JLabeledTextField iterations = new JLabeledTextField("Number of samples to Aggregate"); - - - - protected abstract void setMainPanel(JPanel panel); - - /** - * {@inheritDoc} - */ - @Override - public void configure(TestElement element) { - super.configure(element); - if (!(element instanceof AMQPSampler)) return; - AMQPSampler sampler = (AMQPSampler) element; - - exchange.setText(sampler.getExchange()); - exchangeType.setText(sampler.getExchangeType()); - exchangeDurable.setSelected(sampler.getExchangeDurable()); - exchangeRedeclare.setSelected(sampler.getExchangeRedeclare()); - queue.setText(sampler.getQueue()); - routingKey.setText(sampler.getRoutingKey()); - virtualHost.setText(sampler.getVirtualHost()); - messageTTL.setText(sampler.getMessageTTL()); - messageExpires.setText(sampler.getMessageExpires()); - queueDurable.setSelected(sampler.queueDurable()); - queueExclusive.setSelected(sampler.queueExclusive()); - queueAutoDelete.setSelected(sampler.queueAutoDelete()); - queueRedeclare.setSelected(sampler.getQueueRedeclare()); - - timeout.setText(sampler.getTimeout()); - iterations.setText(sampler.getIterations()); - - host.setText(sampler.getHost()); - port.setText(sampler.getPort()); - username.setText(sampler.getUsername()); - password.setText(sampler.getPassword()); - SSL.setSelected(sampler.connectionSSL()); - log.info("AMQPSamplerGui.configure() called"); - } - - /** - * {@inheritDoc} - */ - @Override - public void clearGui() { - exchange.setText("jmeterExchange"); - queue.setText("jmeterQueue"); - exchangeDurable.setSelected(AMQPSampler.DEFAULT_EXCHANGE_DURABLE); - exchangeRedeclare.setSelected(AMQPSampler.DEFAULT_EXCHANGE_REDECLARE); - routingKey.setText("jmeterRoutingKey"); - virtualHost.setText("/"); - messageTTL.setText(""); - messageExpires.setText(""); - exchangeType.setText("direct"); - queueDurable.setSelected(true); - queueExclusive.setSelected(false); - queueAutoDelete.setSelected(false); - queueRedeclare.setSelected(AMQPSampler.DEFAULT_QUEUE_REDECLARE); - - - timeout.setText(AMQPSampler.DEFAULT_TIMEOUT_STRING); - iterations.setText(AMQPSampler.DEFAULT_ITERATIONS_STRING); - - host.setText("localhost"); - port.setText(AMQPSampler.DEFAULT_PORT_STRING); - username.setText("guest"); - password.setText("guest"); - SSL.setSelected(false); - } - - /** - * {@inheritDoc} - */ - @Override - public void modifyTestElement(TestElement element) { - AMQPSampler sampler = (AMQPSampler) element; - sampler.clear(); - configureTestElement(sampler); - - sampler.setExchange(exchange.getText()); - sampler.setExchangeDurable(exchangeDurable.isSelected()); - sampler.setExchangeRedeclare(exchangeRedeclare.isSelected()); - sampler.setQueue(queue.getText()); - sampler.setRoutingKey(routingKey.getText()); - sampler.setVirtualHost(virtualHost.getText()); - sampler.setMessageTTL(messageTTL.getText()); - sampler.setMessageExpires(messageExpires.getText()); - sampler.setExchangeType(exchangeType.getText()); - sampler.setQueueDurable(queueDurable.isSelected()); - sampler.setQueueExclusive(queueExclusive.isSelected()); - sampler.setQueueAutoDelete(queueAutoDelete.isSelected()); - sampler.setQueueRedeclare(queueRedeclare.isSelected()); - - sampler.setTimeout(timeout.getText()); - sampler.setIterations(iterations.getText()); - - sampler.setHost(host.getText()); - sampler.setPort(port.getText()); - sampler.setUsername(username.getText()); - sampler.setPassword(password.getText()); - sampler.setConnectionSSL(SSL.isSelected()); - log.info("AMQPSamplerGui.modifyTestElement() called, set user/pass to " + username.getText() + "/" + password.getText() + " on sampler " + sampler); - } - - protected void init() { - setLayout(new BorderLayout(0, 5)); - setBorder(makeBorder()); - add(makeTitlePanel(), BorderLayout.NORTH); // Add the standard title - - JPanel mainPanel = new VerticalPanel(); - - mainPanel.add(makeCommonPanel()); - - iterations.setPreferredSize(new Dimension(50,25)); - mainPanel.add(iterations); - - add(mainPanel); - - setMainPanel(mainPanel); - } - - private Component makeCommonPanel() { - GridBagConstraints gridBagConstraints, gridBagConstraintsCommon; - - gridBagConstraintsCommon = new GridBagConstraints(); - gridBagConstraintsCommon.fill = GridBagConstraints.HORIZONTAL; - gridBagConstraintsCommon.anchor = GridBagConstraints.WEST; - gridBagConstraintsCommon.weightx = 0.5; - - gridBagConstraints = new GridBagConstraints(); - gridBagConstraints.insets = new java.awt.Insets(2, 2, 2, 2); - gridBagConstraints.fill = GridBagConstraints.NONE; - gridBagConstraints.anchor = GridBagConstraints.WEST; - gridBagConstraints.weightx = 0.5; - - JPanel commonPanel = new JPanel(new GridBagLayout()); - - JPanel exchangeSettings = new JPanel(new GridBagLayout()); - exchangeSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Exchange")); - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 0; - exchangeSettings.add(exchange, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 0; - exchangeSettings.add(exchangeType, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 1; - exchangeSettings.add(exchangeDurable, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 1; - exchangeSettings.add(exchangeRedeclare, gridBagConstraints); - - JPanel queueSettings = new JPanel(new GridBagLayout()); - queueSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Queue")); - - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 0; - queueSettings.add(queue, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 1; - queueSettings.add(routingKey, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 2; - queueSettings.add(messageTTL, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 3; - queueSettings.add(messageExpires, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 1; - queueSettings.add(queueDurable, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 2; - queueSettings.add(queueExclusive, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 3; - queueSettings.add(queueAutoDelete, gridBagConstraints); - - gridBagConstraints.gridx = 2; - gridBagConstraints.gridy = 1; - queueSettings.add(queueRedeclare, gridBagConstraints); - - gridBagConstraintsCommon.gridx = 0; - gridBagConstraintsCommon.gridy = 0; - - JPanel exchangeQueueSettings = new VerticalPanel(); - exchangeQueueSettings.add(exchangeSettings); - exchangeQueueSettings.add(queueSettings); - - commonPanel.add(exchangeQueueSettings, gridBagConstraintsCommon); - - - JPanel serverSettings = new JPanel(new GridBagLayout()); - serverSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection")); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 0; - serverSettings.add(virtualHost, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 1; - serverSettings.add(host, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 2; - serverSettings.add(port, gridBagConstraints); - - gridBagConstraints.gridx = 1; - gridBagConstraints.gridy = 2; - serverSettings.add(SSL, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 3; - serverSettings.add(username, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 4; - serverSettings.add(password, gridBagConstraints); - - gridBagConstraints.gridx = 0; - gridBagConstraints.gridy = 5; - serverSettings.add(timeout, gridBagConstraints); - - gridBagConstraintsCommon.gridx = 1; - gridBagConstraintsCommon.gridy = 0; - - commonPanel.add(serverSettings, gridBagConstraintsCommon); - - return commonPanel; - } - -} +package com.zeroclue.jmeter.protocol.amqp.gui; + +import java.awt.*; + +import javax.swing.BorderFactory; +import javax.swing.JCheckBox; +import javax.swing.JPanel; + +import org.apache.jmeter.config.Arguments; +import org.apache.jmeter.config.gui.ArgumentsPanel; +import org.apache.jmeter.gui.util.VerticalPanel; +import org.apache.jmeter.samplers.gui.AbstractSamplerGui; +import org.apache.jmeter.testelement.TestElement; +import org.apache.jmeter.util.JMeterUtils; +import org.apache.jorphan.gui.JLabeledChoice; +import org.apache.jorphan.gui.JLabeledTextField; +import org.apache.jorphan.logging.LoggingManager; +import org.apache.log.Logger; + +import com.zeroclue.jmeter.protocol.amqp.AMQPPublisher; +import com.zeroclue.jmeter.protocol.amqp.AMQPSampler; + +public abstract class AMQPSamplerGui extends AbstractSamplerGui { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggingManager.getLoggerForClass(); + + protected JLabeledTextField exchange = new JLabeledTextField("Exchange"); + private final JCheckBox exchangeRedeclare = new JCheckBox("Redeclare?", AMQPSampler.DEFAULT_EXCHANGE_REDECLARE); + protected final JCheckBox exchangeAutoDelete = new JCheckBox("Auto delete?", AMQPSampler.DEFAULT_EXCHANGE_AUTO_DELETE); + protected final JCheckBox exchangeInternal = new JCheckBox("Internal?", AMQPSampler.DEFAULT_EXCHANGE_INTERNAL); + protected JLabeledTextField queue = new JLabeledTextField("Queue"); + protected JLabeledTextField routingKey = new JLabeledTextField("Routing Key"); + protected JLabeledTextField virtualHost = new JLabeledTextField("Virtual Host"); + protected JLabeledTextField messageTTL = new JLabeledTextField("Message TTL"); + protected JLabeledTextField messageExpires = new JLabeledTextField("Expires"); + protected JLabeledChoice exchangeType = new JLabeledChoice("Exchange Type", new String[] { "direct", "topic", "headers", + "fanout" }); + private final JCheckBox exchangeDurable = new JCheckBox("Durable?", AMQPSampler.DEFAULT_EXCHANGE_DURABLE); + private final JCheckBox queueDurable = new JCheckBox("Durable?", true); + private final JCheckBox queueRedeclare = new JCheckBox("Redeclare?", AMQPSampler.DEFAULT_QUEUE_REDECLARE); + private final JCheckBox queueExclusive = new JCheckBox("Exclusive", true); + private final JCheckBox queueAutoDelete = new JCheckBox("Auto Delete?", true); + + protected JLabeledTextField host = new JLabeledTextField("Host"); + protected JLabeledTextField port = new JLabeledTextField("Port"); + protected JLabeledTextField timeout = new JLabeledTextField("Timeout"); + protected JLabeledTextField username = new JLabeledTextField("Username"); + protected JLabeledTextField password = new JLabeledTextField("Password"); + private final JCheckBox SSL = new JCheckBox("SSL?", false); + + private final JLabeledTextField iterations = new JLabeledTextField("Number of samples to Aggregate"); + + protected ArgumentsPanel exchangeParameters = new ArgumentsPanel("Exchange parameters"); + protected ArgumentsPanel queueParameters = new ArgumentsPanel("Queue parameters"); + + protected abstract void setMainPanel(JPanel panel); + + /** + * {@inheritDoc} + */ + @Override + public void configure(TestElement element) { + super.configure(element); + if (!(element instanceof AMQPSampler)) + return; + AMQPSampler sampler = (AMQPSampler) element; + + exchange.setText(sampler.getExchange()); + exchangeType.setText(sampler.getExchangeType()); + exchangeDurable.setSelected(sampler.getExchangeDurable()); + exchangeRedeclare.setSelected(sampler.getExchangeRedeclare()); + queue.setText(sampler.getQueue()); + routingKey.setText(sampler.getRoutingKey()); + virtualHost.setText(sampler.getVirtualHost()); + messageTTL.setText(sampler.getMessageTTL()); + messageExpires.setText(sampler.getMessageExpires()); + queueDurable.setSelected(sampler.queueDurable()); + queueExclusive.setSelected(sampler.queueExclusive()); + queueAutoDelete.setSelected(sampler.queueAutoDelete()); + queueRedeclare.setSelected(sampler.getQueueRedeclare()); + exchangeAutoDelete.setSelected(sampler.getAutoDelete()); + exchangeInternal.setSelected(sampler.getInternal()); + + configureExchangeParameters(sampler); + configureQueueParameters(sampler); + + timeout.setText(sampler.getTimeout()); + iterations.setText(sampler.getIterations()); + + host.setText(sampler.getHost()); + port.setText(sampler.getPort()); + username.setText(sampler.getUsername()); + password.setText(sampler.getPassword()); + SSL.setSelected(sampler.connectionSSL()); + log.info("AMQPSamplerGui.configure() called"); + } + + private void configureExchangeParameters(AMQPSampler sampler) { + Arguments sampleHeaders = sampler.getExchangeParameters(); + if (sampleHeaders != null) { + exchangeParameters.configure(sampleHeaders); + } else { + exchangeParameters.clearGui(); + } + } + + private void configureQueueParameters(AMQPSampler sampler) { + Arguments sampleHeaders = sampler.getQueueParameters(); + if (sampleHeaders != null) { + queueParameters.configure(sampleHeaders); + } else { + queueParameters.clearGui(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void clearGui() { + exchange.setText("jmeterExchange"); + queue.setText("jmeterQueue"); + exchangeDurable.setSelected(AMQPSampler.DEFAULT_EXCHANGE_DURABLE); + exchangeRedeclare.setSelected(AMQPSampler.DEFAULT_EXCHANGE_REDECLARE); + routingKey.setText("jmeterRoutingKey"); + virtualHost.setText("/"); + messageTTL.setText(""); + messageExpires.setText(""); + exchangeType.setText("direct"); + queueDurable.setSelected(true); + queueExclusive.setSelected(false); + queueAutoDelete.setSelected(false); + exchangeAutoDelete.setSelected(false); + exchangeInternal.setSelected(false); + queueRedeclare.setSelected(AMQPSampler.DEFAULT_QUEUE_REDECLARE); + + exchangeParameters.clearGui(); + queueParameters.clearGui(); + + timeout.setText(AMQPSampler.DEFAULT_TIMEOUT_STRING); + iterations.setText(AMQPSampler.DEFAULT_ITERATIONS_STRING); + + host.setText("localhost"); + port.setText(AMQPSampler.DEFAULT_PORT_STRING); + username.setText("guest"); + password.setText("guest"); + SSL.setSelected(false); + } + + /** + * {@inheritDoc} + */ + @Override + public void modifyTestElement(TestElement element) { + AMQPSampler sampler = (AMQPSampler) element; + sampler.clear(); + configureTestElement(sampler); + + sampler.setExchange(exchange.getText()); + sampler.setExchangeDurable(exchangeDurable.isSelected()); + sampler.setExchangeRedeclare(exchangeRedeclare.isSelected()); + sampler.setExchangeAutoDelete(exchangeAutoDelete.isSelected()); + sampler.setInternal(exchangeInternal.isSelected()); + sampler.setQueue(queue.getText()); + sampler.setRoutingKey(routingKey.getText()); + sampler.setVirtualHost(virtualHost.getText()); + sampler.setMessageTTL(messageTTL.getText()); + sampler.setMessageExpires(messageExpires.getText()); + sampler.setExchangeType(exchangeType.getText()); + sampler.setQueueDurable(queueDurable.isSelected()); + sampler.setQueueExclusive(queueExclusive.isSelected()); + sampler.setQueueAutoDelete(queueAutoDelete.isSelected()); + sampler.setQueueRedeclare(queueRedeclare.isSelected()); + + sampler.setExchangeParameters((Arguments) exchangeParameters.createTestElement()); + sampler.setQueueParameters((Arguments) queueParameters.createTestElement()); + + sampler.setTimeout(timeout.getText()); + sampler.setIterations(iterations.getText()); + + sampler.setHost(host.getText()); + sampler.setPort(port.getText()); + sampler.setUsername(username.getText()); + sampler.setPassword(password.getText()); + sampler.setConnectionSSL(SSL.isSelected()); + log.info("AMQPSamplerGui.modifyTestElement() called, set user/pass to " + username.getText() + "/" + password.getText() + + " on sampler " + sampler); + } + + protected void init() { + setLayout(new BorderLayout(0, 5)); + setBorder(makeBorder()); + add(makeTitlePanel(), BorderLayout.NORTH); // Add the standard title + + JPanel mainPanel = new VerticalPanel(); + + mainPanel.add(makeCommonPanel()); + + iterations.setPreferredSize(new Dimension(50, 25)); + mainPanel.add(iterations); +// mainPanel.add(exchangeAutoDelete); +// mainPanel.add(exchangeInternal); +// mainPanel.add(exchangeParameters); +// mainPanel.add(queueParameters); + add(mainPanel); + + setMainPanel(mainPanel); + } + + private Component makeCommonPanel() { + GridBagConstraints gridBagConstraints, gridBagConstraintsCommon; + + gridBagConstraintsCommon = new GridBagConstraints(); + gridBagConstraintsCommon.fill = GridBagConstraints.HORIZONTAL; + gridBagConstraintsCommon.anchor = GridBagConstraints.WEST; + gridBagConstraintsCommon.weightx = 0.5; + + gridBagConstraints = new GridBagConstraints(); + gridBagConstraints.insets = new java.awt.Insets(2, 2, 2, 2); + gridBagConstraints.fill = GridBagConstraints.NONE; + gridBagConstraints.anchor = GridBagConstraints.WEST; + gridBagConstraints.weightx = 0.5; + + JPanel commonPanel = new JPanel(new GridBagLayout()); + + // =====================Exchange settings========================== + JPanel exchangeSettings = new JPanel(new GridBagLayout()); + exchangeSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Exchange")); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 0; + exchangeSettings.add(exchange, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 0; + exchangeSettings.add(exchangeType, gridBagConstraints); + + gridBagConstraints.gridx = 2; + gridBagConstraints.gridy = 0; + exchangeSettings.add(exchangeDurable, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 1; + exchangeSettings.add(exchangeRedeclare, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 1; + exchangeSettings.add(exchangeAutoDelete, gridBagConstraints); + + gridBagConstraints.gridx = 2; + gridBagConstraints.gridy = 1; + exchangeSettings.add(exchangeInternal, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 2; + exchangeSettings.add(exchangeParameters, gridBagConstraints); + // ==================================================== + + JPanel queueSettings = new JPanel(new GridBagLayout()); + queueSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Queue")); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 0; + queueSettings.add(queue, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 1; + queueSettings.add(routingKey, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 2; + queueSettings.add(messageTTL, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 3; + queueSettings.add(messageExpires, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 1; + queueSettings.add(queueDurable, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 2; + queueSettings.add(queueExclusive, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 3; + queueSettings.add(queueAutoDelete, gridBagConstraints); + + gridBagConstraints.gridx = 2; + gridBagConstraints.gridy = 1; + queueSettings.add(queueRedeclare, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 4; + queueSettings.add(queueParameters, gridBagConstraints); + + gridBagConstraintsCommon.gridx = 0; + gridBagConstraintsCommon.gridy = 0; + + JPanel exchangeQueueSettings = new VerticalPanel(); + exchangeQueueSettings.add(exchangeSettings); + exchangeQueueSettings.add(queueSettings); + + commonPanel.add(exchangeQueueSettings, gridBagConstraintsCommon); + + JPanel serverSettings = new JPanel(new GridBagLayout()); + serverSettings.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Connection")); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 0; + serverSettings.add(virtualHost, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 1; + serverSettings.add(host, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 2; + serverSettings.add(port, gridBagConstraints); + + gridBagConstraints.gridx = 1; + gridBagConstraints.gridy = 2; + serverSettings.add(SSL, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 3; + serverSettings.add(username, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 4; + serverSettings.add(password, gridBagConstraints); + + gridBagConstraints.gridx = 0; + gridBagConstraints.gridy = 5; + serverSettings.add(timeout, gridBagConstraints); + + gridBagConstraintsCommon.gridx = 1; + gridBagConstraintsCommon.gridy = 0; + + commonPanel.add(serverSettings, gridBagConstraintsCommon); + + return commonPanel; + } + +} From d7db140aaa90ec7b400f1ad563c4de096be10822 Mon Sep 17 00:00:00 2001 From: "zhongqing.lin" Date: Tue, 27 Oct 2015 13:37:57 +0800 Subject: [PATCH 2/2] Change queue parameter type integer to long. --- .../jmeter/protocol/amqp/AMQPSampler.java | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java index 2fed028..a6cb3b2 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java @@ -2,6 +2,8 @@ import java.io.IOException; import java.util.*; +import java.util.Map.Entry; +import java.util.regex.Pattern; import java.security.*; import com.rabbitmq.client.*; @@ -66,6 +68,8 @@ public abstract class AMQPSampler extends AbstractSampler implements ThreadListe private transient ConnectionFactory factory; private transient Connection connection; + private Pattern digitalP = Pattern.compile("\\d+"); + protected AMQPSampler() { factory = new ConnectionFactory(); factory.setRequestedHeartbeat(DEFAULT_HEARTBEAT); @@ -115,14 +119,25 @@ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, Ke } private Map getQueueArguments() { - Map arguments = new HashMap(); - if (getMessageTTL() != null && !getMessageTTL().isEmpty()) - arguments.put("x-message-ttl", getMessageTTLAsInt()); + Map arguments = new HashMap(); + if (getMessageTTL() != null && !getMessageTTL().isEmpty()) { + Long ttl = getMessageTTLAsLong(); + if (ttl != null) { + arguments.put("x-message-ttl", getMessageTTLAsLong()); + } + } if (getMessageExpires() != null && !getMessageExpires().isEmpty()) - arguments.put("x-expires", getMessageExpiresAsInt()); - arguments.putAll(getQueueParameters().getArgumentsAsMap()); + arguments.put("x-expires", getMessageExpiresAsLong()); + Map map = getQueueParameters().getArgumentsAsMap(); + for (Entry entry : map.entrySet()) { + if (digitalP.matcher(entry.getValue()).matches()) { + arguments.put(entry.getKey(), new Long(entry.getValue())); + } else { + arguments.put(entry.getKey(), entry.getValue()); + } + } return arguments; } @@ -153,7 +168,7 @@ protected int getTimeoutAsInt() { public boolean getAutoDelete() { return getPropertyAsBoolean(EXCHANGE_AUTO_DELETE, DEFAULT_EXCHANGE_AUTO_DELETE); } - + public void setExchangeAutoDelete(boolean autoDelete) { setProperty(EXCHANGE_AUTO_DELETE, autoDelete); } @@ -161,7 +176,7 @@ public void setExchangeAutoDelete(boolean autoDelete) { public boolean getInternal() { return getPropertyAsBoolean(EXCHANGE_INTERNAL, DEFAULT_EXCHANGE_INTERNAL); } - + public void setInternal(boolean internal) { setProperty(EXCHANGE_INTERNAL, internal); } @@ -266,11 +281,12 @@ public void setMessageTTL(String name) { setProperty(MESSAGE_TTL, name); } - protected Integer getMessageTTLAsInt() { - if (getPropertyAsInt(MESSAGE_TTL) < 1) { + protected Long getMessageTTLAsLong() { + String ttl = getMessageTTL(); + if (ttl == null || ttl.isEmpty()) { return null; } - return getPropertyAsInt(MESSAGE_TTL); + return getPropertyAsLong(MESSAGE_TTL); } public String getMessageExpires() { @@ -281,11 +297,11 @@ public void setMessageExpires(String name) { setProperty(MESSAGE_EXPIRES, name); } - protected Integer getMessageExpiresAsInt() { - if (getPropertyAsInt(MESSAGE_EXPIRES) < 1) { + protected Long getMessageExpiresAsLong() { + if (getPropertyAsLong(MESSAGE_EXPIRES) < 1) { return null; } - return getPropertyAsInt(MESSAGE_EXPIRES); + return getPropertyAsLong(MESSAGE_EXPIRES); } public String getHost() { @@ -433,6 +449,7 @@ protected Channel createChannel() throws IOException, NoSuchAlgorithmException, factory.setVirtualHost(getVirtualHost()); factory.setUsername(getUsername()); factory.setPassword(getPassword()); + factory.setNetworkRecoveryInterval(30000); if (connectionSSL()) { factory.useSslProtocol("TLS"); }