Kafka、SpringMVC整合例子

一、安装zookeeper

1.下载安装包:http://zookeeper.apache.org/releases.html#download;

2.进入Zookeeper设置目录,笔者D:\kafka\zookeeper-3.4.11\conf;

3. 将“zoo_sample.cfg”重命名为“zoo.cfg” ;

3. 编辑zoo.cfg配置文件;

4. 找到并编辑

dataDir=/tmp/zookeeper 并更改成您当前的路径;

5. 系统环境变量:

a. 在系统变量中添加ZOOKEEPER_HOME = D:\kafka\zookeeper-3.4.11

b. 编辑path系统变量,添加为路径%ZOOKEEPER_HOME%\bin;

6. 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181);

7.打开新的cmd,输入zkServer,运行Zookeeper;

出现如下图片表示成功:

002

 

二、安装并运行Kafka

1.下载Kafka:http://kafka.apache.org/downloads.html

2. 进入Kafka配置目录,D:\kafka\kafka_2.12-1.0.1\config;

3. 编辑文件“server.properties” ;

4. 找到并编辑log.dirs=/tmp/kafka-logs 改成您当前可用的目录;

5. 找到并编辑zookeeper.connect=localhost:2181;

6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

运行Kafka代码:.\bin\windows\kafka-server-start.bat .\config\server.properties 

003

 

 

注:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。

三、Kafka代码的实现

1.生产者配置文件:

@Bean
public Map<String,Object> getDefaultFactoryArg(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("retries","1");
    arg.put("batch.size","16384");
    arg.put("linger.ms","1");
    arg.put("buffer.memory","33554432");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    return arg;
}

@Bean
public DefaultKafkaProducerFactory defaultKafkaProducerFactory(){
    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.getDefaultFactoryArg());
    return factory;
}

@Bean
public KafkaTemplate kafkaTemplate(){
    KafkaTemplate template = new KafkaTemplate(defaultKafkaProducerFactory());
    template.setDefaultTopic(ConstantKafka.KAFKA_TOPIC1);
    template.setProducerListener(kafkaProducerListener());
    return template;
}

@Bean
public KafkaProducerListener kafkaProducerListener(){
    KafkaProducerListener listener = new KafkaProducerListener();
    return listener;
}

2.消费者配置文件:

@Bean
public Map<String,Object> getDefaultArgOfConsumer(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("enable.auto.commit","false");
    arg.put("auto.commit.interval.ms","1000");
    arg.put("auto.commit.interval.ms","15000");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    return arg;
}

@Bean
public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
    DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(getDefaultArgOfConsumer());
    return factory;
}

@Bean
public KafkaConsumerMessageListener kafkaConsumerMessageListener(){
    KafkaConsumerMessageListener listener = new KafkaConsumerMessageListener();
    return listener;
}

/**
 * 监听频道-log
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfLog(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC1);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

/**
 * 监听频道-other
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfOther(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC2);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfLog(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfLog());
    return container;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfOther(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfOther());
    return container;
}

3.生产消息服务

@Service
public class KafkaProducerServer implements IKafkaProducerServer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public static final String ROLE_LOG = "log";
    public static final String ROLE_web = "web";
    public static final String ROLE_APP = "app";

    /**
     * 发送消息
     * @param topic 频道
     * @param msg 消息对象
     * @param isUsePartition 是否使用分区
     * @param partitionNum 分区数,如果isUsePartition为true,此数值必须>0
     * @param role 角色:app,web
     * @return
     * @throws IllegalServiceException
     */
    @Override
    public ResultResp<Void> send(String topic, Object msg, boolean isUsePartition, Integer partitionNum, String role) throws IllegalServiceException {
        if (role == null){
            role = ROLE_LOG;
        }

        String key = role + "_" + msg.hashCode();
        String valueString = JsonUtil.ObjectToJson(msg, true);

        if (isUsePartition) {
            //表示使用分区
            int partitionIndex = getPartitionIndex(key, partitionNum);
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);
            return checkProRecord(result);
        } else {
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, key, valueString);
            return checkProRecord(result);
        }
    }

    /**
     * 根据key值获取分区索引
     *
     * @param key
     * @param num
     * @return
     */
    private int getPartitionIndex(String key, int num) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(num);
        } else {
            int result = Math.abs(key.hashCode()) % num;
            return result;
        }
    }

    /**
     * 检查发送返回结果record
     *
     * @param res
     * @return
     */

    private ResultResp<Void> checkProRecord(ListenableFuture<SendResult<String, Object>> res) {
        ResultResp<Void> resp = new ResultResp<>();
        resp.setCode(ConstantKafka.KAFKA_NO_RESULT_CODE);
        resp.setInfo(ConstantKafka.KAFKA_NO_RESULT_MES);

        if (res != null) {
            try {
                SendResult r = res.get();//检查result结果集
                /*检查recordMetadata的offset数据,不检查producerRecord*/
                Long offsetIndex = r.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    resp.setCode(ConstantKafka.SUCCESS_CODE);
                    resp.setInfo(ConstantKafka.SUCCESS_MSG);
                } else {
                    resp.setCode(ConstantKafka.KAFKA_NO_OFFSET_CODE);
                    resp.setInfo(ConstantKafka.KAFKA_NO_OFFSET_MES);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());

            } catch (ExecutionException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());
            }
        }

        return resp;
    }

}

4.生产者监听服务

public class KafkaProducerListener implements ProducerListener {

    protected final Logger logger = Logger.getLogger(KafkaProducerListener.class.getName());

    public KafkaProducerListener(){

    }

    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        logger.info("-----------------kafka发送数据成功");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("----------RecordMetadata:"+recordMetadata);
        logger.info("-----------------kafka发送数据结束");
    }

    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        logger.info("-----------------kafka发送数据失败");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("-----------------kafka发送数据失败结束");
        e.printStackTrace();
    }

    /**
     * 是否启动Producer监听器
     * @return
     */
    @Override
    public boolean isInterestedInSuccess() {
        return false;
    }
}

5.消费者监听服务

public class KafkaConsumerMessageListener implements MessageListener<String,Object> {

    private Logger logger = Logger.getLogger(KafkaConsumerMessageListener.class.getName());

    public KafkaConsumerMessageListener(){

    }

    /**
     * 消息接收-LOG日志处理
     * @param record
     */
    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        logger.info("=============kafka消息订阅=============");

        String topic = record.topic();
        String key = record.key();
        Object value = record.value();
        long offset = record.offset();
        int partition = record.partition();

        if (ConstantKafka.KAFKA_TOPIC1.equals(topic)){
            doSaveLogs(value.toString());
        }

        logger.info("-------------topic:"+topic);
        logger.info("-------------value:"+value);
        logger.info("-------------key:"+key);
        logger.info("-------------offset:"+offset);
        logger.info("-------------partition:"+partition);
        logger.info("=============kafka消息订阅=============");
    }

    private void doSaveLogs(String data){
        SocketIOPojo<BikeLogPojo> logs = JsonUtil.JsonToObject(data.toString(),SocketIOPojo.class);
        /**
         * 写入到数据库中
         */
    }
}

 

测试图片:

004

 

 

在win系统搭建redis集群方法

我这边主要是搭建出来做测试用的,真实环境还是要使用linux。

环境需求:

Redis-win-3.2.100
Ruby-win-2.2.4-x64
Redis-3.2.2.gem(ruby驱动,需要对应redis的版本号)
Redis-trib.rb源码
1.安装Redis,并运行3个实例(Redis集群需要至少3个以上节点,低于3个无法创建);

2.使用redis-trib.rb工具来创建Redis集群,由于该文件是用ruby语言写的,所以需要安装Ruby开发环境,以及驱动redis-xxxx.gem。

1.下载并安装Redis

GitHub路径如下:https://github.com/MSOpenTech/redis/releases/

Redis提供msi和zip格式的下载文件,这里下载zip格式3.2.100版本

将下载到的Redis-win-3.2.100.zip解压即可,为了方便使用,建议放在盘符根目录下,并修改目录名为Redis,如:C:\Redis 或者D:\Redis。当然也可以放在桌面,只要你喜欢。

通过配置文件来启动3个不同的Redis实例,由于Redis默认端口为6379,所以这里使用了7000、7001、7002来运行3个Redis实例。

Spring框架简介

1. Spring框架的作用

轻量:Spring是轻量级的,基本的版本大小为2MB
控制反转:Spring通过控制反转实现了松散耦合,对象们给出它们的依赖,而不是创建或查找依赖的对象们。
面向切面的编程AOP:Spring支持面向切面的编程,并且把应用业务逻辑和系统服务分开。
容器:Spring包含并管理应用中对象的生命周期和配置
MVC框架: Spring-MVC
事务管理:Spring提供一个持续的事务管理接口,可以扩展到上至本地事务下至全局事务JTA
异常处理:Spring提供方便的API把具体技术相关的异常

tx:annotation-driven注解方式EnableTransactionManagement

替换方法是用@EnableTransactionManagement

以下是官方文档:

org.springframework.transaction.annotation
Annotation Type EnableTransactionManagement


@Target(value=TYPE)
@Retention(value=RUNTIME)
@Documented
@Import(value=TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement

Enables Spring’s annotation-driven transaction management capability, similar to the support found in Spring’s <tx:*> XML namespace. To be used on @Configuration classes as follows:

 @Configuration
 @EnableTransactionManagement
 public class AppConfig {
     @Bean
     public FooRepository fooRepository() {
         // configure and return a class having @Transactional methods
         return new JdbcFooRepository(dataSource());
     }

     @Bean
     public DataSource dataSource() {
         // configure and return the necessary JDBC DataSource
     }

     @Bean
     public PlatformTransactionManager txManager() {
         return new DataSourceTransactionManager(dataSource());
     }
 }

For reference, the example above can be compared to the following Spring XML configuration:

 <beans>
     <tx:annotation-driven/>
     <bean id="fooRepository" class="com.foo.JdbcFooRepository">
         <constructor-arg ref="dataSource"/>
     </bean>
     <bean id="dataSource" class="com.vendor.VendorDataSource"/>
     <bean id="transactionManager" class="org.sfwk...DataSourceTransactionManager">
         <constructor-arg ref="dataSource"/>
     </bean>
 </beans>
 

In both of the scenarios above, @EnableTransactionManagement and <tx:annotation-driven/> are responsible for registering the necessary Spring components that power annotation-driven transaction management, such as the TransactionInterceptor and the proxy- or AspectJ-based advice that weave the interceptor into the call stack when JdbcFooRepository‘s @Transactional methods are invoked.

A minor difference between the two examples lies in the naming of the PlatformTransactionManager bean: In the @Bean case, the name is “txManager” (per the name of the method); in the XML case, the name is“transactionManager”. The <tx:annotation-driven/> is hard-wired to look for a bean named “transactionManager” by default, however @EnableTransactionManagement is more flexible; it will fall back to a by-type lookup for anyPlatformTransactionManager bean in the container. Thus the name can be “txManager”, “transactionManager”, or “tm”: it simply does not matter.

For those that wish to establish a more direct relationship between @EnableTransactionManagement and the exact transaction manager bean to be used, the TransactionManagementConfigurer callback interface may be implemented – notice the implements clause and the @Override-annotated method below:

 @Configuration
 @EnableTransactionManagement
 public class AppConfig implements TransactionManagementConfigurer {
     @Bean
     public FooRepository fooRepository() {
         // configure and return a class having @Transactional methods
         return new JdbcFooRepository(dataSource());
     }

     @Bean
     public DataSource dataSource() {
         // configure and return the necessary JDBC DataSource
     }

     @Bean
     public PlatformTransactionManager txManager() {
         return new DataSourceTransactionManager(dataSource());
     }

     @Override
     public PlatformTransactionManager annotationDrivenTransactionManager() {
         return txManager();
     }
 }

This approach may be desirable simply because it is more explicit, or it may be necessary in order to distinguish between two PlatformTransactionManager beans present in the same container. As the name suggests, theannotationDrivenTransactionManager() will be the one used for processing @Transactional methods. See TransactionManagementConfigurer Javadoc for further details.

The mode() attribute controls how advice is applied; if the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying.

If the mode() is set to AdviceMode.ASPECTJ, then the proxyTargetClass() attribute is obsolete. Note also that in this case the spring-aspects module JAR must be present on the classpath.

Since:
3.1
Author:
Chris Beams
See Also:
TransactionManagementConfigurer, TransactionManagementConfigurationSelector, ProxyTransactionManagementConfiguration, AspectJTransactionManagementConfiguration

Optional Element Summary
 AdviceMode mode
Indicate how transactional advice should be applied.
 int order
Indicate the ordering of the execution of the transaction advisor when multiple advices are applied at a specific joinpoint.
 boolean proxyTargetClass
Indicate whether subclass-based (CGLIB) proxies are to be created (true) as opposed to standard Java interface-based proxies (false).

 

proxyTargetClass

public abstract boolean proxyTargetClass
Indicate whether subclass-based (CGLIB) proxies are to be created (true) as opposed to standard Java interface-based proxies (false). The default is false. Applicable only if mode() is set to AdviceMode.PROXY.Note that setting this attribute to true will affect all Spring-managed beans requiring proxying, not just those marked with @Transactional. For example, other beans marked with Spring’s @Async annotation will be upgraded to subclass proxying at the same time. This approach has no negative impact in practice unless one is explicitly expecting one type of proxy vs another, e.g. in tests.

Default:
false

mode

public abstract AdviceMode mode
Indicate how transactional advice should be applied. The default is AdviceMode.PROXY.
See Also:
AdviceMode
Default:
org.springframework.context.annotation.AdviceMode.PROXY

order

public abstract int order
Indicate the ordering of the execution of the transaction advisor when multiple advices are applied at a specific joinpoint. The default is Ordered.LOWEST_PRECEDENCE.
Default:
2147483647

 

https://docs.spring.io/spring/docs/4.0.x/javadoc-api/org/springframework/transaction/annotation/EnableTransactionManagement.html

更多文档:

https://docs.spring.io/spring/docs/4.0.x/javadoc-api/overview-summary.html

Java多线程,线程池ThreadPoolExecutor使用详解

ThreadPoolExecutor.class构造方法参数讲解

参数名 作用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

示例代码:

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class testThreadPool {

   /**
    * 线程计数 
    */
   private static int count = 0;
   public static void main(String[] args) {
      
      /**
       * 通常会使用一个队列,进行排序
       */
      BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(25);
      
      /**
       * 如果使用无界的队列,当出现攻击时,容易导致内存泄漏。
       */
      //LinkedBlockingDeque<Runnable> queue = new LinkedBlockingDeque<>();
      
      /**
       * 线程池
       */
      ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 30, TimeUnit.MINUTES, queue);
      
      /**
       * 重写Handler抛出来的线程重新加进去
       */
      pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
         
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            while (true) {
               if (queue.size() < 24) {
                  System.out.println("++++++++++++++Runnable");
                  executor.execute(r);
                  break;
               }
            }
         }
      });

      /**
       * 定义一个定时器,每2秒创建10个进程
       */
      Timer timer = new Timer();
      timer.schedule(new TimerTask() {
         
         @Override
         public void run() {
            // TODO Auto-generated method stub
            
            for(int i = 0;i<10;i++) {
               count++;
               doWork work = new doWork();
               pool.execute(work);
               
            }
            
            if (count >= 200) {
               timer.cancel();
            }
         }
      }, 0,2000);
      
      /**
       * 定时检查线程的状况
       */
      new Timer().schedule(new TimerTask() {
         @Override
         public void run() {
            // TODO Auto-generated method stub
            System.err.println(">>当前队列:"+queue.size());
            System.err.println(">>线程数:"+count);
            System.err.println(">>当前ActiveCount:"+pool.getActiveCount());
            System.err.println(">>当前TaskCount:"+pool.getTaskCount());
            System.err.println(">>当前PoolSize:"+pool.getPoolSize());
         }
      }, 1000,1000);
      
   }
}

class doWork implements Runnable{
   

   @Override
   public void run() {
      // TODO Auto-generated method stub
      
      try {
         /**
          * 模拟真实环境,睡眠3秒
          */
         Thread.sleep(3000);
         System.out.println("线程id:"+Thread.currentThread().getId()+" 线程名称:"+Thread.currentThread().getName());
      } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
      }
      
   }

}

 

API文档:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html

1345678