Redis内存淘汰机制

Redis内存淘汰指的是用户存储的一些key可以被Redis主动地从实例中删除,从而产生读miss的情况,那么Redis为什么要有这种功能?

这就是我们需要探究的设计初衷,Redis最常见的两种应用场景为缓存和持久存储,首先要明确的一个问题是内存淘汰策略更适合于那种场景?是持久存储还是缓存?

内存的淘汰机制的初衷是为了更好地使用内存,用一定的缓存miss来换取内存的使用效率。

redis 官方提供的 conf

https://raw.github.com/antirez/redis/2.2/redis.conf

1.# maxmemory <bytes>

我们可以通过配置redis.conf中的maxmemory这个值来开启内存淘汰功能,至于这个值有什么意义,我们可以通过了解内存淘汰的过程来理解它的意义:

1. 客户端发起了需要申请更多内存的命令(如set)。

2. Redis检查内存使用情况,如果已使用的内存大于maxmemory则开始根据用户配置的不同淘汰策略来淘汰内存(key),从而换取一定的内存。

3. 如果上面都没问题,则这个命令执行成功。

 

2.# maxmemory-policy volatile-lru

redis 中的默认的过期策略是 volatile-lru 。

设置方式   

config set maxmemory-policy volatile-lru

maxmemory-policy 六种方式:

#volatile-lru – >使用LRU算法删除使用过期集合的key
#allkeys-lru – >在主键空间中,优先移除最近未使用的key。
#volatile-random – >在设置了过期时间的键空间中,随机移除某个key。
#allkeys-random – >在主键空间中,随机移除某个key。
#volatile-ttl – >设置了过期时间的键空间中,具有更早过期时间的key优先移除。
#novaiction – >当内存使用达到阈值的时候,所有引起申请内存的命令会报错。

 

 

Push to origin/master was rejected

错误如下:

Git Pull Failed: fatal: refusing to merge unrelated histories

意思是git拒绝合并两个不相干的东西
此时你需要在打开Git Bash,然后进入相应的目录,然后敲git命令:

$ git pull origin master --allow-unrelated-histories

出现类似于这种信息就说明pull成功了:

$ git pull origin master --allow-unrelated-histories
From gitee.com:alanro/suisuinian__server
 * branch master -> FETCH_HEAD
Merge made by the 'recursive' strategy.
 LICENSE | 191 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 191 insertions(+)
 create mode 100644 LICENSE

然后你可以利用git status查看一下当前仓库的状态,是不是所有的全部add并且commit,如果全部完成,那么此时你就可以将本地仓库中的推送到github中,使用如下的git命令:

$ git push -u origin master

搞定

简单处理GPS漂移计算方法

class test{
   
   private LocationPojo preLocation;

    private List<LocationPojo> nowLocation;

    private Long preTime;
   
   public boolean test(List<LocationPojo> now,LocationPojo pre) {
      this.nowLocation = now;
      this.preLocation = pre;
      

        double distance = 0;//两点坐标点距离
        int tmp = 40;//精准度上行初始阀值(固定)
      
        int AccuracyThresholdUp = tmp;//精准度上行阀值
        int AccuracyThresholdDown = 30;//精准度下行阀值

        int stopCount = 0; //静止状态坐标计数
        int rectCountDown = 0; //坐标在30M围栏内计数
        int rectCountUp = 0;  //坐标在100M围栏外计数
      
        int notCheckUpCount = 0; //超出大围栏计数,不检查精准度

      /*
       *
       * 如果没有上一次的GPS数据,那么直接返回这次的GPS数据。
      **/
        if (this.preLocation() == null){
            this.preLocation(this.nowLocation.get(0));
            this.preTime = this.preLocation().getAddTime();//上一次记录的时间
            return true;
        }

        LocationPojo b = null;

        //循环计数(我这边是每次定位间隔是1秒,每次定位数据达到10条后进入计算,所以有这个循环)
      //就是用10条现在的GPS数据与上一次的GPS数据,进行数据计算。
        for (LocationPojo pojo:this.nowLocation){
            if (b == null){
                b = pojo;
            }

            //判断不是GPS数据,如果不是,改变阀值的上下值
            if (pojo.getProvider().equals(GPS.GPS)) {
                AccuracyThresholdUp = (int)(tmp * 1.5);//网络定位普遍在40以上,所以需要改变精准度的阀值。
            }else{
                AccuracyThresholdUp = tmp;//由于是循环的,所以每次都需要重新赋值。
            }

            //没有速度,或者有速度但是精准度很高,我会把这类的数据归于静止状态(GPS漂移数据)
            if (pojo.getSpeed() <= 0 || (pojo.getSpeed() > 0 && pojo.getAccuracy() > AccuracyThresholdDown)){
                stopCount++;
            }

            //测算距离(测算距离的方法有很多,我现在把它封装成工具类了)
            distance = CommUtils.getLocationDistance(pojo.getLatitude(),pojo.getLongitude(),preLocation.getLatitude(),preLocation.getLongitude());

            //优化速度精准度
            if(pojo.getSpeed() > 0 && distance > 0){
                //距离 / 时间 * 3.6 = 速度(KM)
//                float speed = CommUtils.fromatNumber(distance / ((pojo.getAddTime() - this.preTime) / 1000) * 3.6,null);
//                pojo.setSpeed(speed);
                pojo.setSpeed(CommUtils.formatNumber(pojo.getSpeed().doubleValue(),"#0.00").floatValue());
            }

            //latlnt电子围栏 30 - 100m
            //超出围栏(条件是:lat或者lnt与上一次坐标匹配大于[100m]并且精确度在30m以内,条件成立)
            if (distance > 100){
                notCheckUpCount++;

            //高精准度(GPS数据应该是可靠的)
                if(pojo.getAccuracy() < AccuracyThresholdUp){
                    rectCountUp++;

                    //如果上一次GPS精准度大于这一次,那么次数GPS数据是有效的。
                    if(pojo.getAccuracy() <= preLocation.getAccuracy()){
                        b = pojo;
                    }
                }
            }else if (distance > 30 && pojo.getAccuracy() < AccuracyThresholdUp){
                //如果在电子围栏内,并且精确度在30m以内,条件成立
                rectCountDown++;
                if(pojo.getAccuracy() <= preLocation.getAccuracy()){
                    b = pojo;
                }
            }
            
        }

        //a:在30米的围栏中必须有速度值,而且超出小围栏的计数>=5个,条件成立则正在移动(30M直径的正方形)
        //a1:在100米的围栏中有8个条数据均超出,不管有没有速度,条件均成立(也许他是坐飞机,也许他瞬移)
        double a = getNowLocation().size() * 0.5;
        double a1 = getNowLocation().size() * 0.8;
        if ((stopCount <= 5 && rectCountDown >= a) || rectCountUp >= a1 || (notCheckUpCount == getNowLocation().size() && rectCountUp >= a) || (stopCount >= a && rectCountUp >= a)){
            this.setPreLocation(b);
            this.setPreTime(b.getAddTime());
            return true;
        }
        return false;

    }
}

Spring Batch 批处理框架使用介绍

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。

Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如 Quartz、Tivoli、Control-M、Cron 等)可以使用。

1.现在实现Spring Batch方法,首先是配置:

@Configuration
@EnableBatchProcessing
public class SpringBatchConfiguration {

    @Resource
    private DataSource dataSource;

    @Resource
    private PlatformTransactionManager transactionManager;

    /**
     * 任务仓库
     * @return
     */
    @Bean
    public JobRepository jobRepository() throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * 任务加载器
     * @return
     */
    @Bean
    public SimpleJobLauncher jobLauncher() throws Exception{
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(this.jobRepository());
        return launcher;
    }

    @Bean
    public TestJobListener testJobListener(){
        return new TestJobListener();
    }

}

2.TestJobListener监听器

public class TestJobListener implements JobExecutionListener {

    private final Logger logger = Logger.getLogger(this.getClass().getName());

    private long time = 0L;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        this.time = System.currentTimeMillis();
        logger.info(">>job start");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        logger.info(">>job end("+(System.currentTimeMillis()-time)+")");
    }
}

3.读操作代码

public class TestItemReader2 extends FlatFileItemReader<BikeKey> {

    public TestItemReader2(){

    }

    public void setData(String path,LineMapper<BikeKey> lineMapper){
        this.setResource(new FileSystemResource(path));
        this.setLineMapper(lineMapper);
    }

}

//LineMapper代码
@Component
public class TestItemLineMapper implements LineMapper<BikeKey> {

    @Override
    public BikeKey mapLine(String s, int i) throws Exception {
        System.out.println("mapLine..."+s+" i:"+i);

        String[] args = s.split(",");

        // 创建DeviceCommand对象
        BikeKey bikeKey = new BikeKey();
        bikeKey.setId(null);
        bikeKey.setStatus(0);
        bikeKey.setKeySn(args[1]);
        return bikeKey;
    }
}

4.写操作代码

@Component
public class TestItemWriter implements ItemWriter<BikeKey> {

    @Resource
    private IBikeKeyService bikeKeyService;

    @Override
    public void write(List<? extends BikeKey> list) throws Exception {
        for (int i=0;i<list.size();i++){
            System.out.println("write..."+list.size()+" i:"+i);
            bikeKeyService.insert(list.get(i));
        }
    }
}

5.处理过程代码

@Component
public class TestItemProcessor implements ItemProcessor<BikeKey,BikeKey> {
    @Override
    public BikeKey process(BikeKey bikeKey) throws Exception {
        System.out.println("process...");
        bikeKey.setKeyCreateTime((int) (DateUtil.getTime()/1000));
        return bikeKey;
    }
}

6.工作统一调用代码

@Component
public class TestDoImport {

    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Resource
    private TestJobListener jobListener;

    @Resource
    private TestItemLineMapper lineMapper;

    /**
     * 读操作
     */
    private TestItemReader2 reader;

    /**
     * 写操作
     */
    @Resource
    private TestItemWriter writer;

    /**
     * 处理过程
     */
    @Resource
    private TestItemProcessor processor;

    public TestDoImport(){

    }

    public void doImport(){

        /**
         * 批量任务的参数
         */
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("TIME",System.currentTimeMillis())
                .toJobParameters();
        try {
            /**
             * 设置数据路径
             */
            reader = new TestItemReader2();
            reader.setData("d:/bike_key.csv",lineMapper);

            /**
             * 执行任务
             */
            jobLauncher.run(this.getJob(jobBuilderFactory,this.getStep(stepBuilderFactory,reader,writer,processor)),jobParameters);
        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }

    }

    /**
     * 获取一个工作实例
     * @param jobs
     * @param step
     * @return
     */
    private Job getJob(JobBuilderFactory jobs, Step step){
        return jobs
                .get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .listener(jobListener)//监听整个过程
                .build();
    }

    /**
     * 获取一个步骤实例
     * @param stepBuilderFactory
     * @param reader 读
     * @param writer 写
     * @param processor 过程
     * @return
     */
    private Step getStep(StepBuilderFactory stepBuilderFactory, ItemReader<BikeKey> reader, ItemWriter<BikeKey> writer, ItemProcessor<BikeKey,BikeKey> processor){
        return stepBuilderFactory
                .get("step1")
                .<BikeKey,BikeKey> chunk(1)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

 

运行日志:

23-May-2018 13:42:03.817 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.beforeJob >>job start
mapLine…,a0011 i:1
process…
write…1 i:0
mapLine…,a0022 i:2
process…
write…1 i:0
mapLine…,a0033 i:3
process…
write…1 i:0
mapLine…,a0044 i:4
process…
write…1 i:0
mapLine…,a0055 i:5
process…
write…1 i:0
mapLine…,a0016 i:6
process…
write…1 i:0
mapLine…,a0027 i:7
process…
write…1 i:0
mapLine…,a0038 i:8
process…
write…1 i:0
mapLine…,a0049 i:9
process…
write…1 i:0
mapLine…,a00510 i:10
process…
write…1 i:0
mapLine…,a00111 i:11
process…
write…1 i:0
mapLine…,a00212 i:12
process…
write…1 i:0
mapLine…,a00313 i:13
process…
write…1 i:0
mapLine…,a00414 i:14
process…
write…1 i:0
mapLine…,a00515 i:15
process…
write…1 i:0
mapLine…,a00116 i:16
process…
write…1 i:0
mapLine…,a00217 i:17
process…
write…1 i:0
mapLine…,a00318 i:18
process…
write…1 i:0
mapLine…,a00419 i:19
process…
write…1 i:0
mapLine…,a00520 i:20
process…
write…1 i:0
23-May-2018 13:42:05.252 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.afterJob >>job end(1435)

 

 

 

 

 

Spring Batch 批处理框架介绍

前言

在大型的企业应用中,或多或少都会存在大量的任务需要处理,如邮件批量通知所有将要过期的会员等等。而在批量处理任务的过程中,又需要注意很多细节,如任务异常、性能瓶颈等等。那么,使用一款优秀的框架总比我们自己重复地造轮子要好得多一些。

我所在的物联网云平台部门就有这么一个需求,需要实现批量下发命令给百万设备。为了防止枯燥乏味,下面就让我们先通过Spring Batch框架简单地实现一下这个功能,再来详细地介绍这款框架!

小试牛刀

Demo代码:https://github.com/wudashan/spring-batch-demo.git

引入依赖

首先我们需要引入对Spring Batch的依赖,在pom.xml文件加入下面的代码:

1
2
3
4
5
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>3.0.8.RELEASE</version>
</dependency>

装载Bean

其次,我们需要在resources目录下,创建applicationContext.xml文件,用于自动注入我们需要的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 事务管理器 -->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
    <!-- 任务仓库 -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    <!-- 任务加载器 -->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>
</beans>

有了上面声明的transactionManager、jobRepository、jobLauncher,我们就可以执行批量任务啦!不过,我们还需要创建一个任务。在Spring Batch框架中,一个任务Job由一个或者多个步骤Step,而步骤又由读操作Reader、处理操作Processor、写操作Writer