Skip to content
This repository has been archived by the owner on Dec 25, 2019. It is now read-only.

Commit

Permalink
Merge pull request #19 from crossoverJie/use-mq
Browse files Browse the repository at this point in the history
	✨ Use MQ
  • Loading branch information
crossoverJie committed Nov 1, 2017
2 parents bccff96 + 5199eb6 commit b766f11
Show file tree
Hide file tree
Showing 18 changed files with 478 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ protected Object determineCurrentLookupKey() {
return DataSourceHolder.getDataSources();
}

@Override
public Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class DemoApiImpl implements DemoApi {
* @return
* @throws RuntimeException
*/
@Override
public String readMsg(int userId) throws RuntimeException {
return null;
}
Expand Down
8 changes: 7 additions & 1 deletion SSM-WEB/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
<version>0.9.0.0</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -332,6 +332,12 @@
<artifactId>pagehelper</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crossoverJie.controller;

/**
* Created by Administrator on 2016/8/7.
* Created by crossoverJie on 2016/8/7.
*/
import com.crossoverJie.pojo.Content;
import com.crossoverJie.service.ContentService;
Expand Down Expand Up @@ -121,4 +121,6 @@ public static synchronized void subOnlineCount() {
MyWebSocket.onlineCount--;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
@WebService
public class HelloWorldImpl implements HelloWorld {

public String say(String str) {
@Override
public String say(String str) {
return "Hello"+str;
}

Expand Down
45 changes: 28 additions & 17 deletions SSM-WEB/src/main/java/com/crossoverJie/kafka/KafkaMsgConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,24 @@ public class KafkaMsgConsumer {
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int BLOCKING_QUEUE_CAPACITY = 4000;
private static final String KAFKA_CONFIG = "kafkaConfig";
private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));
private static final ExecutorService FIXED_THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));

//最后更新时间
/**
* 最后更新时间
*/
private static AtomicLong LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());

private static MsgIterator iter = null;
private static String topic;//主题名称
private static MsgIterator iterator = null;

/**
* 主题名称
*/
private static String topic;

/**
* 检测时间
*/
private final static int CHECK_MIN = 10 ;


static {
Expand All @@ -58,17 +69,17 @@ public class KafkaMsgConsumer {
}

private static void iteratorTopic() {
if (iter == null) {
iter = MsgUtil.consume(topic);
if (iterator == null) {
iterator = MsgUtil.consume(topic);
}
long i = 0L;
while (iter.hasNext()) {
while (iterator.hasNext()) {
i++;
if (i % 10000 == 0) {
LOGGER.info("consume i:" + i);
}
try {
String message = iter.next();
String message = iterator.next();
if (StringUtils.isEmpty(message)) {
continue;
}
Expand All @@ -90,13 +101,13 @@ private static void iteratorTopic() {

public static void main(String[] args) {
topic = System.getProperty("topic");
checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is must!");
checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is necessary!");
while (true) {
try {
iteratorTopic();
} catch (Exception e) {
MsgUtil.shutdownConsummer();
iter = null;
iterator = null;

LOGGER.error("KafkaMsgConsumer err:", e);
try {
Expand All @@ -105,19 +116,19 @@ public static void main(String[] args) {
LOGGER.error("Thread InterruptedException", e1);
}
} finally {
//此处关闭之后,由crontab每分钟检查一次,挂掉的话会重新拉起来
if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) { //10分钟
fixedThreadPool.shutdown();
LOGGER.info("线程池是否关闭:" + fixedThreadPool.isShutdown());
//此处关闭之后,由crontab每分钟检查一次,挂掉的话会重新拉起来 10分钟检测一次
if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) {
FIXED_THREAD_POOL.shutdown();
LOGGER.info("线程池是否关闭:" + FIXED_THREAD_POOL.isShutdown());
try {
//当前线程阻塞10ms后,去检测线程池是否终止,终止则返回true
while (!fixedThreadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
LOGGER.info("检测线程池是否终止:" + fixedThreadPool.isTerminated());
while (!FIXED_THREAD_POOL.awaitTermination(CHECK_MIN, TimeUnit.MILLISECONDS)) {
LOGGER.info("检测线程池是否终止:" + FIXED_THREAD_POOL.isTerminated());
}
} catch (InterruptedException e) {
LOGGER.error("等待线程池关闭错误", e);
}
LOGGER.info("线程池是否终止:" + fixedThreadPool.isTerminated());
LOGGER.info("线程池是否终止:" + FIXED_THREAD_POOL.isTerminated());
LOGGER.info("in 10 min dont have data break");
break;
}
Expand Down
52 changes: 52 additions & 0 deletions SSM-WEB/src/main/java/com/crossoverJie/kafka/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.crossoverJie.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

/** Kafka生产者
* @author crossoverJie
*/
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

/**
* 消费配置文件
*/
private static String consumerProPath;

public static void main(String[] args) throws IOException {
// set up the producer
consumerProPath = System.getProperty("product_path");
KafkaProducer<String, String> producer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath));
Properties properties = new Properties();
properties.load(inputStream);
producer = new KafkaProducer<String, String>(properties);

} catch (IOException e) {
LOGGER.error("load config error", e);
}

try {
// send lots of messages
for (int i=0 ;i<100 ; i++){
producer.send(new ProducerRecord<String, String>(
"topic_optimization", i+"", i+""));

}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.crossoverJie.kafka.official;

import com.crossoverJie.util.StringUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
* Function:kafka官方消费
*
* @author crossoverJie
* Date: 2017/10/19 01:11
* @since JDK 1.8
*/
public class KafkaOfficialConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOfficialConsumer.class);

/**
* 日志文件地址
*/
private static String logPath;

/**
* 主题名称
*/
private static String topic;

/**
* 消费配置文件
*/
private static String consumerProPath ;


/**
* 初始化参数校验
* @return
*/
private static boolean initCheck() {
topic = System.getProperty("topic") ;
logPath = System.getProperty("log_path") ;
consumerProPath = System.getProperty("consumer_pro_path") ;
if (StringUtil.isEmpty(topic) || logPath.isEmpty()) {
LOGGER.error("system property topic ,consumer_pro_path, log_path is required !");
return true;
}
return false;
}

/**
* 初始化kafka配置
* @return
*/
private static KafkaConsumer<String, String> initKafkaConsumer() {
KafkaConsumer<String, String> consumer = null;
try {
FileInputStream inputStream = new FileInputStream(new File(consumerProPath)) ;
Properties properties = new Properties();
properties.load(inputStream);
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));

} catch (IOException e) {
LOGGER.error("加载consumer.props文件出错", e);
}
return consumer;
}

public static void main(String[] args) {
if (initCheck()){
return;
}

int totalCount = 0 ;
long totalMin = 0L ;
int count = 0;
KafkaConsumer<String, String> consumer = initKafkaConsumer();

long startTime = System.currentTimeMillis() ;
//消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() <= 0){
continue ;
}
LOGGER.debug("本次获取:"+records.count());
count += records.count() ;

long endTime = System.currentTimeMillis() ;
LOGGER.debug("count=" +count) ;
if (count >= 10000 ){
totalCount += count ;
LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
totalMin += (endTime-startTime) ;
startTime = System.currentTimeMillis() ;
count = 0 ;
}
LOGGER.debug("end totalCount={},min={}",totalCount,totalMin);

/*for (ConsumerRecord<String, String> record : records) {
record.value() ;
JsonNode msg = null;
try {
msg = mapper.readTree(record.value());
} catch (IOException e) {
LOGGER.error("消费消息出错", e);
}
LOGGER.info("kafka receive = "+msg.toString());
}*/


}
}
}
Loading

0 comments on commit b766f11

Please sign in to comment.