Hadoop2.0分布式集群的平台搭建

一、Hadoop集群安装前的准备
基础环境

四台Centos6.5
IP地址:
192.168.174.128
192.168.174.129
192.168.174.130
192.168.174.131
四台主机新建hadoop用户并实现ssh免密登陆
iptables关闭和selinuxdisabled

1.修改主机名和ip地址映射
为了后面操作方便,修改主机名分别为hadoop01、hadoop02、hadoop03、hadoop04。修改主机名只需修改/etc/sysconfig/network文件hostname行即可,这里博主不再复述。然后修改/etc/hosts文件,将ip地址和主机名的映射写入进去,这样,后面其它主机就可根据主机名去对应ip地址。
1553049668-2104-cf72d11967ef780df27ba5cdd994
2.安装JDK
Hadoop的核心组成之一MapReduce是基于java的,因此需要配置基本的java环境。JDK安装十分简单,前面也多次提到。下载jdk安装包,解压jdk到指定目录。

tar -zxvf jdk-8u181-linux-x64.tar.gz -C /usr/local/java

修改环境变量,进入/etc/profile

export JAVA_HOME=/usr/local/java/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin

重新加载环境变量生效。JDK需在四个节点都安装配置
3.Zookeeper安装配置
Zookeeper是负责协调Hadoop一致性,是Hadoop实现HA的不可或缺的组件。根据Zookeeper的工作机制,需要在奇数个节点安装Zookeeper。本文在hadoop01、hadoop02、hadoop03三个节点安装Zookeeper。
下载zookeeper安装包,解压zookeeper安装包
1553049668-5318-127622313b4fc934cee2bd69890c
设置环境变量,修改/etc/profile

export ZOOKEEPER_HOME=/usr/local/zookeeper/zookeeper-3.4.6
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin

重新加载环境变量生效
进入zookeeper解压目录下的conf目录下,修改配置文件zoo.cfg,一开始并没有zoo.cfg文件,拷贝zoo_sample.cfg文件重命名为zoo.cfg即可。
1553049674-4206-bd6f9ed6dd0e4be48beee2c9b782
创建相应的data目录及datalog目录

mkdir  -p /opt/zookeeper/datalog

在每个data目录下新建myid文件,hadoop01的myid文件写入1,hadoop02的myid文件写入2,即server.后的数字。另外注意给/opt/zookeeper目录及其子目录给hadoop用户读写操作权限,因为后面使用zookeeper时是以hadoop用户使用的。
到这里zookeeper基本安装配置完成,以hadoop用户启动zookeeper服务

zkServer.sh start

查看zookeeper状态

zkServer.sh status

二、Hadoop安装配置
下载hadoop安装包,解压hadoop安装包
1553049668-7059-1c4d83e8df66cd8d0ac7e1b3e711
注意解压后的目录user和group应该为hadoop,道理与前面zookeeper一样,在Hadoop使用过程中使用者是hadoop用户。
设置环境变量,修改配置文件/etc/profile

export HADOOP_HOME=/usr/local/hadoop-2.6.4
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

注意hadoop需要配置bin和sbin,不然后面许多命令无法使用。重新加载环境变量生效。
然后就是修改hadoop的配置文件,进入hadoop安装目录下的etc/hadoop目录下,修改配置文件:hadoop-env.sh、core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml,其中配置文件mapred-site.xml在该目录下有一个样本mapred-site.xml.template,复制该文件重命名为mapred-site.xml即可。
修改配置文件hadoop-env.sh。主要是配置java目录
1553049668-2112-cbcc8aa520faab9fa11c0ead1971
修改配置文件core-site.xml

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://jsj/</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/usr/local/hdpdata</value>
</property>
<property>
  <name>ha.zookeeper.quorum</name>
  <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
</configuration>

修改配置文件hdfs-site.xml,从配置文件名也可知这是关于HDFS的配置。

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>dfs.nameservices</name>
  <value>jsj</value>
</property>
<property>
  <name>dfs.ha.namenodes.jsj</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.jsj.nn1</name>
  <value>hadoop01:9000</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.jsj.nn2</name>
  <value>hadoop02:9000</value>
</property>
<property>
  <name>dfs.namenode.http-address.jsj.nn1</name>
  <value>hadoop01:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.jsj.nn2</name>
  <value>hadoop02:50070</value>
</property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop01:8485;hadoop02:8485;hadoop03:8485/jsj</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/usr/local/journaldata</value>
</property>
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>
<property>
  <name>dfs.client.failover.proxy.provider.jsj</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>
  sshfence
  shell(/bin/true)
  </value>
</property>
<property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/hadoop/.ssh/id_rsa</value>
</property>
<property>
  <name>dfs.ha.fencing.ssh.connect-timeout</name>
  <value>30000</value>
</property>
</configuration>

修改配置文件mapred-site.xml,即MapReduce相关配置。

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
   <name>mapreduce.framework.name</name>
   <value>yarn</value>
</property>
<property>
   <name>mapreduce.jobhistory.address</name>
   <value>hadoop03:10020</value>
</property>
<property>
   <name>mapreduce.jobhistory.webapp.address</name>
   <value>hadoop03:19888</value>
</property>
</configuration>

修改配置文件yarn-site.xml。yarn平台的相关配置

<configuration>

<!-- Site specific YARN configuration properties -->
<property>
   <name>yarn.log-aggregation-enable</name>
   <value>true</value>
</property>
<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
</property>
<property>
   <name>yarn.resourcemanager.cluster-id</name>
   <value>abc</value>
</property>
<property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>rm1,rm2</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname.rm1</name>
   <value>hadoop01</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname.rm2</name>
   <value>hadoop02</value>
</property>
<property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
</configuration>

最后修改slaves文件

hadoop02
hadoop03
hadoop04

至此,Hadoop集群相关配置文件配置完成,在hadoop01、hadoop02、hadoop03、hadoop04四个节点都完成相关配置。
配置文件修改完成并不代表Hadoop安装结束,还需要几个操作才能正常使用。
在hadoop01、hadoop02、hadoop03启动zookeeper服务。

zkServer.sh start

在hadoop01、hadoop02、hadoop03启动journalnode

hadoop-daemon.sh start journalnode

格式化hdfs,hadoop01执行

hdfs namenode -format

然后查看hadoop安装目录确保hdpdata和journaldata在hadoop01和hadoop02都有。没有从一个节点拷贝到另一个节点。
在hadoop01启动namenode

hadoop-daemon.sh start namenode

在Hadoop02执行

hdfs namenode -bootstrapStandby

格式化zkfc,Hadoop01执行

hdfs zkfc -formatZk

在hadoop01启动HDFS

start-dfs.sh

完成以上操作后,Hadoop应该可以正常对外做出服务。在浏览器输入hadoop01的ip地址,端口号为50070,查看HDFS的web界面是否正常对外做出服务。
1553049669-6024-f2359417ee5551e2196c1d8bd697
在hadoop01和hadoop02启动yarn平台

start-yarn.sh

访问hadoop01的ip地址的8088端口,查看yarn平台是否正常对外做出服务。
1553049674-6192-f59ff4b6d0e5ba538026527ee83e
Hadoop安装配置完成,关于配置文件的解释后期有时间再加上去。本文使用的安装包是在学习过程老师给的,Hadoop是开源的,相信相关安装包不难找到。

 

注:转自https://blog.51cto.com/13917261/2364868,原因是觉得写的不错。

JAVA的垃圾回收机制

一、 技术背景你要了解吧
谈谈JVM垃圾回收的前世今生的,说起垃圾回收GC,大部分人都把这项技术当做Java语言的伴生产物。事实上,GC的历史比Java久远,早在1960年Lisp这门语言中就使用了内存动态分配和垃圾回收技术。设计和优化C++这门语言。

二、 哪些内存需要回收?
都知道JVM的内存结构包括五大区域:程序计数器、虚拟机栈、本地方法栈、堆区、方法区。其中程序计数器、虚拟机栈、本地方法栈3个区域随线程而生、随线程而灭,因此这几个区域的内存分配和回收都具备确定性,就不需要过多考虑回收的问题,因为方法结束或者线程结束时,内存自然就跟随着回收了。而Java堆区方法区则不一样,这部分内存的分配和回收是动态的,正是垃圾收集器所需关注的部分。
垃圾收集器在对堆区和方法区进行回收前,首先要确定这些区域的对象哪些可以被回收,哪些暂时还不能回收,这就要用到判断对象是否存活的算法。

2.1 引用计数算法
2.1.1 算法分析
引用计数是垃圾收集器中的早期策略。在这种方法中,堆中每个对象实例都有一个引用计数。当一个对象被创建时,就将该对象实例分配给一个变量,该变量计数设置为1。当任何其它变量被赋值为这个对象的引用时,计数加1(a = b,则b引用的对象实例的计数器+1),但当一个对象实例的某个引用超过了生命周期或者被设置为一个新值时,对象实例的引用计数器减1。任何引用计数器为0的对象实例可以被当作垃圾收集。当一个对象实例被垃圾收集时,它引用的任何对象实例的引用计数器减1。
2.1.2 优缺点
优点:引用计数收集器可以很快的执行,交织在程序运行中。对程序需要不被长时间打断的实时环境比较有利。
缺点:无法检测出循环引用。如父对象有一个对子对象的引用,子对象反过来引用父对象。这样,他们的引用计数永远不可能为0。
2.1.3 是不是很无趣,来段代码压压惊

    public static void main(String[] args) {
        MyObject object1 = new MyObject();
        MyObject object2 = new MyObject();
          
        object1.object = object2;
        object2.object = object1;
          
        object1 = null;
        object2 = null;
    }
}

这段代码是用来验证引用计数算法不能检测出循环引用。最后面两句将object1和object2赋值为null,也就是说object1和object2指向的对象已经不可能再被访问,但是由于它们互相引用对方,导致它们的引用计数器都不为0,那么垃圾收集器就永远不会回收它们。

2.2 可达性分析算法
可达性分析算法是从离散数学中的图论引入的,程序把所有的引用关系看作一张图,从一个节点GC ROOT开始,寻找对应的引用节点,找到这个节点以后,继续寻找这个节点的引用节点,当所有的引用节点寻找完毕之后,剩余的节点则被认为是没有被引用到的节点,即无用的节点,无用的节点将会被判定为是可回收的对象。
1552823266-9938-5c8ddbe88c5b7
在Java语言中,可作为GC Roots的对象包括下面几种:
a) 虚拟机栈中引用的对象(栈帧中的本地变量表);
b) 方法区中类静态属性引用的对象;
c) 方法区中常量引用的对象;
d) 本地方法栈中JNI(Native方法)引用的对象。
2.3 Java中的引用你了解多少
无论是通过引用计数算法判断对象的引用数量,还是通过可达性分析算法判断对象的引用链是否可达,判定对象是否存活都与“引用”有关。在Java语言中,将引用又分为强引用、软引用、弱引用、虚引用4种,这四种引用强度依次逐渐减弱。

强引用
在程序代码中普遍存在的,类似 Object obj = new Object() 这类引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象。

软引用
用来描述一些还有用但并非必须的对象。对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围之中进行第二次回收。如果这次回收后还没有足够的内存,才会抛出内存溢出异常。

弱引用
也是用来描述非必需对象的,但是它的强度比软引用更弱一些,被弱引用关联的对象只能生存到下一次垃圾收集发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉只被弱引用关联的对象。

虚引用
也叫幽灵引用或幻影引用(名字真会取,很魔幻的样子),是最弱的一种引用关系。一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。它的作用是能在这个对象被收集器回收时收到一个系统通知。
不要被概念吓到,也别担心,还没跑题,再深入,可就不好说了。小编罗列这四个概念的目的是为了说明,无论引用计数算法还是可达性分析算法都是基于强引用而言的。

2.4 对象死亡(被回收)前的最后一次挣扎
即使在可达性分析算法中不可达的对象,也并非是“非死不可”,这时候它们暂时处于“缓刑”阶段,要真正宣告一个对象死亡,至少要经历两次标记过程。
第一次标记:如果对象在进行可达性分析后发现没有与GC Roots相连接的引用链,那它将会被第一次标记;
第二次标记:第一次标记后接着会进行一次筛选,筛选的条件是此对象是否有必要执行finalize()方法。在finalize()方法中没有重新与引用链建立关联关系的,将被进行第二次标记。
第二次标记成功的对象将真的会被回收,如果对象在finalize()方法中重新与引用链建立了关联关系,那么将会逃离本次回收,继续存活。

2.5 方法区如何判断是否需要回收
方法区存储内容是否需要回收的判断可就不一样咯。方法区主要回收的内容有:废弃常量和无用的类。对于废弃常量也可通过引用的可达性来判断,但是对于无用的类则需要同时满足下面3个条件:
1.该类所有的实例都已经被回收,也就是Java堆中不存在该类的任何实例;
2.加载该类的ClassLoader已经被回收;
3.该类对应的java.lang.Class对象没有在任何地方被引用,无法在任何地方通过反射访问该类的方法。

三、常用的垃圾收集算法
3.1 标记-清除算法
标记-清除算法采用从根集合(GC Roots)进行扫描,对存活的对象进行标记,标记完毕后,再扫描整个空间中未被标记的对象,进行回收,如下图所示。标记-清除算法不需要进行对象的移动,只需对不存活的对象进行处理,在存活对象比较多的情况下极为高效,但由于标记-清除算法直接回收不存活的对象,因此会造成内存碎片。
1552823267-7730-5c8ddbea01285
3.2 复制算法

复制算法的提出是为了克服句柄的开销和解决内存碎片的问题。它开始时把堆分成 一个对象 面和多个空闲面, 程序从对象面为对象分配空间,当对象满了,基于copying算法的垃圾 收集就从根集合(GC Roots)中扫描活动对象,并将每个 活动对象复制到空闲面(使得活动对象所占的内存之间没有空闲洞),这样空闲面变成了对象面,原来的对象面变成了空闲面,程序会在新的对象面中分配内存。
1552823266-9289-5c8ddbeb709a4
3.3 标记-整理算法
标记-整理算法采用标记-清除算法一样的方式进行对象的标记,但在清除时不同,在回收不存活的对象占用的空间后,会将所有的存活对象往左端空闲空间移动,并更新对应的指针。标记-整理算法是在标记-清除算法的基础上,又进行了对象的移动,因此成本更高,但是却解决了内存碎片的问题。具体流程见下图:
1552823271-1641-5c8ddbecdcb7b
3.4 分代收集算法
分代收集算法是目前大部分JVM的垃圾收集器采用的算法。它的核心思想是根据对象存活的生命周期将内存划分为若干个不同的区域。一般情况下将堆区划分为老年代(Tenured Generation)和新生代(Young Generation),在堆区之外还有一个代就是永久代(Permanet Generation)。老年代的特点是每次垃圾收集时只有少量对象需要被回收,而新生代的特点是每次垃圾回收时都有大量的对象需要被回收,那么就可以根据不同代的特点采取最适合的收集算法。
1552823267-3484-5c8ddbee8b29a
3.4.1 年轻代(Young Generation)的回收算法
a) 所有新生成的对象首先都是放在年轻代的。年轻代的目标就是尽可能快速的收集掉那些生命周期短的对象。

b) 新生代内存按照8:1:1的比例分为一个eden区和两个survivor(survivor0,survivor1)区。一个Eden区,两个 Survivor区(一般而言)。大部分对象在Eden区中生成。回收时先将eden区存活对象复制到一个survivor0区,然后清空eden区,当这个survivor0区也存放满了时,则将eden区和survivor0区存活对象复制到另一个survivor1区,然后清空eden和这个survivor0区,此时survivor0区是空的,然后将survivor0区和survivor1区交换,即保持survivor1区为空, 如此往复。

c) 当survivor1区不足以存放 eden和survivor0的存活对象时,就将存活对象直接存放到老年代。若是老年代也满了就会触发一次Full GC,也就是新生代、老年代都进行回收。
d) 新生代发生的GC也叫做Minor GC,MinorGC发生频率比较高(不一定等Eden区满了才触发)。

3.4.2 年老代(Old Generation)的回收算法

a) 在年轻代中经历了N次垃圾回收后仍然存活的对象,就会被放到年老代中。因此,可以认为年老代中存放的都是一些生命周期较长的对象。
b) 内存比新生代也大很多(大概比例是1:2),当老年代内存满时触发Major GC即Full GC,Full GC发生频率比较低,老年代对象存活时间比较长,存活率标记高。

3.4.3 持久代(Permanent Generation)的回收算法

用于存放静态文件,如Java类、方法等。持久代对垃圾回收没有显著影响,但是有些应用可能动态生成或者调用一些class,例如Hibernate 等,在这种时候需要设置一个比较大的持久代空间来存放这些运行过程中新增的类。持久代也称方法区,具体的回收可参见上文2.5节。

四、常见的垃圾收集器
下面一张图是HotSpot虚拟机包含的所有收集器,图是借用过来滴:
5c8ddbf02226f
Serial收集器(复制算法)
新生代单线程收集器,标记和清理都是单线程,优点是简单高效。是client级别默认的GC方式,可以通过-XX:+UseSerialGC来强制指定。

Serial Old收集器(标记-整理算法)
老年代单线程收集器,Serial收集器的老年代版本。

ParNew收集器(停止-复制算法)
新生代收集器,可以认为是Serial收集器的多线程版本,在多核CPU环境下有着比Serial更好的表现。

Parallel Scavenge收集器(停止-复制算法)
并行收集器,追求高吞吐量,高效利用CPU。吞吐量一般为99%, 吞吐量= 用户线程时间/(用户线程时间+GC线程时间)。适合后台应用等对交互相应要求不高的场景。是server级别默认采用的GC方式,可用-XX:+UseParallelGC来强制指定,用-XX:ParallelGCThreads=4来指定线程数。

Parallel Old收集器(停止-复制算法)

Parallel Scavenge收集器的老年代版本,并行收集器,吞吐量优先。

CMS(Concurrent Mark Sweep)收集器(标记-清理算法)
高并发、低停顿,追求最短GC回收停顿时间,cpu占用比较高,响应时间快,停顿时间短,多核cpu 追求高响应时间的选择。

五、GC是什么时候触发的(面试最常见的问题之一)
由于对象进行了分代处理,因此垃圾回收区域、时间也不一样。GC有两种类型:Scavenge GC和Full GC。

5.1 Scavenge GC
一般情况下,当新对象生成,并且在Eden申请空间失败时,就会触发Scavenge GC,对Eden区域进行GC,清除非存活对象,并且把尚且存活的对象移动到Survivor区。然后整理Survivor的两个区。这种方式的GC是对年轻代的Eden区进行,不会影响到年老代。因为大部分对象都是从Eden区开始的,同时Eden区不会分配的很大,所以Eden区的GC会频繁进行。因而,一般在这里需要使用速度快、效率高的算法,使Eden去能尽快空闲出来。

5.2 Full GC
对整个堆进行整理,包括Young、Tenured和Perm。Full GC因为需要对整个堆进行回收,所以比Scavenge GC要慢,因此应该尽可能减少Full GC的次数。在对JVM调优的过程中,很大一部分工作就是对于Full GC的调节。有如下原因可能导致Full GC:
a) 年老代(Tenured)被写满;
b) 持久代(Perm)被写满;
c) System.gc()被显示调用;
d) 上一次GC之后Heap的各域分配策略动态变化;

结束语
内容的完整度和深度在一篇博文里面真的很难全部考虑,本文做了很大尝试,最后还是得投降。对于各个垃圾收集器的区别、运行过程中各内存区域参数的设置、GC日志的查看等内容后续再补上吧。文章概念很多,也借用了一些书籍和博文的经典总结,算是一个知识点整理后的输出吧,希望对大家有所裨益。

 

转自:https://www.e-learn.cn/content/java/2069493

Java中常用的API

1. 字符串与数字之间的相互转换

① 将数字转化为字符串:

String s = Integer.toString(int n);

String s = Double.parseDouble(s);

 

② 将数字型的字符串转化为数字

int n = Integer.parseInt(String s)

double n = Double.parseDouble(s)

public class Main {
	public static void main(String[] args) {
		String s = "123456";
		int n1 = Integer.parseInt(s);
		System.out.println(n1);
		
		int n2 = 234812;
		s = Integer.toString(n2);
		
		System.out.println(s);
		double n3 = 23.45;
		
		s = Double.toString(n3);
		System.out.println(s);
		
		s = "45.87";
		System.out.println(Double.parseDouble(s));
	}
}

 

2. String对象与char数组的转换

① 字符串对象转为char类型的数组

char arr[] = s.toCharArray();

 

② 将char类型的数组转为字符串

String s = new String(char arr[]);

String s = String.valueOf(char arr[]);

public class Main {
	public static void main(String[] args) {
		String s = "123345";
		char arr[] = s.toCharArray();
		for(int i = 0; i < arr.length; i++){
			System.out.print(arr[i]);
		}
		System.out.print("\n");
		char arr2[] = {'a', 'b', 'c', 'd', 'e'};
		s = new String(arr2);
		System.out.println(s);
		
		char arr3[] = {'e', 'f', 'g', 'h', 'i'};
		s = String.valueOf(arr3);
		System.out.println(s);
	}
}

3. 根据索引获取字符和根据字符获取索引

① 根据索引获取字符:

char c = s.charAt(i);

 

② 获取字符的最后一个下标(可能有多个重复的字符):

int index = s.lastIndexOf(char c);

获取字符的第一个下标:

int index = s.indexOf(char c);

public class Main {
	public static void main(String[] args) {
		String s = "abbc";
		char c = s.charAt(3);
		System.out.println(c);
		System.out.println(s.indexOf('b'));
		System.out.println(s.lastIndexOf('b'));
	}
}

 

利用Sharding-Jdbc实现分表

看到了当当开源的Sharding-JDBC组件,它可以在几乎不修改代码的情况下完成分库分表的实现。摘抄其中一段介绍:

      Sharding-JDBC直接封装JDBC API,可以理解为增强版的JDBC驱动,旧代码迁移成本几乎为零:

  • 可适用于任何基于java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC
  • 可基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid等。
  • 理论上可支持任意实现JDBC规范的数据库。虽然目前仅支持MySQL,但已有支持Oracle,SQLServer,DB2等数据库的计划。

先做一个最简单的试用,不做分库,仅做分表。选择数据表bead_information,首先复制成三个表:bead_information_0、bead_information_1、bead_information_2

1552822689-9656-20180725

测试实现过程

前提:已经实现srping+mybatis对单库单表做增删改查的项目。

 1、修改pom.xml增加dependency

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>sharding-jdbc-config-spring</artifactId>
            <version>1.4.0</version>
        </dependency>

2、新建一个sharding-jdbc.xml文件,实现分库分表的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:rdb="http://www.dangdang.com/schema/ddframe/rdb"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.springframework.org/schema/tx 
                        http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.dangdang.com/schema/ddframe/rdb 
                        http://www.dangdang.com/schema/ddframe/rdb/rdb.xsd">
    
 
    
     <!-- 配置数据源 -->
    <bean name="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
        <property name="url" value="jdbc:mysql://localhost:3306/beadhouse" />
        <property name="username" value="root" />
        <property name="password" value="123456" />
    </bean>
                 
    <rdb:strategy id="tableShardingStrategy" sharding-columns="id" algorithm-class="com.springdemo.utill.MemberSingleKeyTableShardingAlgorithm"/>
    
    <rdb:data-source id="shardingDataSource">
        <rdb:sharding-rule data-sources="dataSource">
            <rdb:table-rules>
                <rdb:table-rule logic-table="bead_information" actual-tables="bead_information_${0..2}"  table-strategy="tableShardingStrategy"/>
            </rdb:table-rules>
        </rdb:sharding-rule>
    </rdb:data-source>
    
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="shardingDataSource" />
    </bean>
</beans>

3、将文件引入spring配置文件中。

需要修改几个地方,把sqlSessionFactory和transactionManager原来关联的dataSource统一修改为shardingDataSource(这一步作用就是把数据源全部托管给sharding去管理)

1552822688-6796-20180725

 

4、实现分表(分库)逻辑,我们的分表逻辑类需要实现SingleKeyTableShardingAlgorithm接口的三个方法doBetweenSharding、doEqualSharding、doInSharding

(取模除数需要按照自己需求改变,我这里分3个表,所以除以3)

import java.util.Collection;
import java.util.LinkedHashSet;
import com.dangdang.ddframe.rdb.sharding.api.ShardingValue;
import com.dangdang.ddframe.rdb.sharding.api.strategy.table.SingleKeyTableShardingAlgorithm;
import com.google.common.collect.Range;
public class MemberSingleKeyTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {

    @Override
    public Collection<String> doBetweenSharding(Collection<String> tableNames, ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<String>(tableNames.size());
        Range<Integer> range = (Range<Integer>) shardingValue.getValueRange();
        for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
            Integer modValue = i % 3;
            String modStr = modValue < 3 ? "" + modValue : modValue.toString();
            for (String each : tableNames) {
                if (each.endsWith(modStr)) {
                    result.add(each);
                }
            }
        }
        return result;
    }
    @Override
    public String doEqualSharding(Collection<String> tableNames, ShardingValue<Integer> shardingValue) {
        Integer modValue = shardingValue.getValue() % 3;
        String modStr = modValue < 3 ? "" + modValue : modValue.toString();
        for (String each : tableNames) {
            if (each.endsWith(modStr)) {
                return each;
            }
        }
        throw new IllegalArgumentException();
    }
    @Override
    public Collection<String> doInSharding(Collection<String> tableNames, ShardingValue<Integer> shardingValue) {
        Collection<String> result = new LinkedHashSet<String>(tableNames.size());
        for (Integer value : shardingValue.getValues()) {
            Integer modValue = value % 3;
            String modStr = modValue < 3 ? "" + modValue : modValue.toString();
            for (String tableName : tableNames) {
                if (tableName.endsWith(modStr)) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }
}

5、配置完成,可以实现增删改查测试。

理解Sharding jdbc原理

相比于Spring基于AbstractRoutingDataSource实现的分库分表功能,Sharding jdbc在单库单表扩展到多库多表时,兼容性方面表现的更好一点。例如,spring实现的分库分表sql写法如下:

select id, name, price, publish, intro
from book${tableIndex}
where id = #{id,jdbcType=INTEGER}

sql中的表名book需要加一个分表的后缀tableIndex,也就是需要在sql注入的参数中指定插入哪个表。相比,Sharding jdbc在这一块封装的更好一点。其sql中,根本不需要指定tableIndex,而是根据分库分表策略自动路由。

select id, name, price, publish, intro
from book
where id = #{id,jdbcType=INTEGER}

Sharding jdbc的这种特性,在水平扩展的时候无疑更具有吸引力。试想一下,一个项目开发一段时间后,单库单表数据量急剧上升,需要分库分表解决数据库的访问压力。而现有sql配置都是基于单库单表实现的,如果基于spring的AbstractRoutingDataSource实现,需要修改每一个相关表的sql,修改涉及较多地方,出错概率较大。而基于Sharding jdbc实现时,sql无需修改,只需要在spring中添加Sharding jdbc的相关配置即可,减少了修改面,大大简化分库分表的实现难度。

那么,Sharding jdbc是如何实现这种分库分表的逻辑呢?下面我们用一段简单、易懂的代码描述Sharding jdbc的原理。

通常我们在写一段访问数据库的数据时,逻辑是这样的:

ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(“application.xml”);
DataSource dataSource = ctx.getBean(“dataSource”, DataSource.class);
Connection connection = dataSource.getConnection();

String sql = “select id, name, price, publish, intro from book where id = 111″;
PreparedStatement ps = connection.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
// handle ResultSet…

Sharding jdbc是基于JDBC协议实现的,当我们获得dataSource时,这个dataSource是Sharding jdbc自己定义的一个SpringShardingDataSource类型的数据源,该数据源在返回getConnection()及prepareStatement()时,分别返回ShardingConnection和ShardingPreparedStatement的实例对象。然后在executeQuery()时,ShardingPreparedStatement做了这样的一件事:

根据逻辑sql,经过分库分表策略逻辑计算,获得分库分表的路由结果SQLRouteResult;
SQLRouteResult中包含真实的数据源以及转换后的真正sql,利用真实的数据源去执行获得ResultSet;
将ResultSet列表封装成一个可以顺序读的ResultSet对象IteratorReducerResultSet。

class ShardingPreparedStatement implements PreparedStatement {

@Override
public ResultSet executeQuery() throws SQLException {
List<SQLRouteResult> routeResults = routeSql(logicSql);

List<ResultSet> resultSets = new ArrayList<>(routeResults.size());
for (SQLRouteResult routeResult : routeResults) {
PreparedStatement ps = routeResult.getDataSource().getConnection.prepareStatement(routeResult.getParsedSql());
ResultSet rs = ps.executeQuery();
resultSets.add(rs);
}

return new IteratorReducerResultSet(resultSets);
}
…..

}

其中,分库分表策略的sql路由过程,我们将Sharding jdbc中的相关代码全部抽出来,放到一起来观看这个过程的实现:

// 环境准备
@SuppressWarnings(“resource”)
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(“application.xml”);
SpringShardingDataSource dataSource = ctx.getBean(SpringShardingDataSource.class);
Field field = SpringShardingDataSource.class.getSuperclass().getDeclaredField(“shardingContext”);
field.setAccessible(true);
ShardingContext sctx = (ShardingContext)field.get(dataSource);
ShardingRule shardingRule = sctx.getShardingRule();

String logicSql = “select id, name, price, publish, intro from book where id = ?”;
List<Object> parameters = new ArrayList<>();
parameters.add(2000);

// sql解析
MySqlStatementParser parser = new MySqlStatementParser(logicSql);
MySQLSelectVisitor visitor = new MySQLSelectVisitor();
SQLStatement statement = parser.parseStatement();
visitor.getParseContext().setShardingRule(shardingRule);
statement.accept(visitor);

SQLParsedResult parsedResult = visitor.getParseContext().getParsedResult();
if (visitor.getParseContext().isHasOrCondition()) {
new OrParser(statement, visitor).fillConditionContext(parsedResult);
}
visitor.getParseContext().mergeCurrentConditionContext();
System.out.println(“Parsed SQL result: ” + parsedResult);
System.out.println(“Parsed SQL: ” + visitor.getSQLBuilder());
parsedResult.getRouteContext().setSqlBuilder(visitor.getSQLBuilder());
parsedResult.getRouteContext().setSqlStatementType(SQLStatementType.SELECT);

// 分库分表路由
SQLRouteResult result = new SQLRouteResult(parsedResult.getRouteContext().getSqlStatementType(), parsedResult.getMergeContext(), parsedResult.getGeneratedKeyContext());
for (ConditionContext each : parsedResult.getConditionContexts()) {
Collection<Table> tables = parsedResult.getRouteContext().getTables();
final Set<String> logicTables = new HashSet<>();
tables.forEach(a -> logicTables.add(a.getName()));

SingleTableRouter router = new SingleTableRouter(shardingRule,
logicTables.iterator().next(),
each,
parsedResult.getRouteContext().getSqlStatementType());

RoutingResult routingResult = router.route();

// sql改写 –> routingResult.getSQLExecutionUnits()
// —> SingleRoutingTableFactor.replaceSQL(sqlBuilder).buildSQL()
// 结果合并
result.getExecutionUnits().addAll(routingResult.getSQLExecutionUnits(parsedResult.getRouteContext().getSqlBuilder()));
}
// amendSQLAccordingToRouteResult(parsedResult, parameters, result);
for (SQLExecutionUnit each : result.getExecutionUnits()) {
System.out.println(each.getDataSource() + ” ” + each.getSql() + ” ” + parameters);
}

  • 准备环境。由于Sharding jdbc分库分表中ShardingRule这个类是贯穿整个路由过程,我们在Spring中写好Sharding jdbc的配置,利用反射获取一个这个对象。(Sharding jdbc版本以及配置,在文章最后列出,方便debug这个过程)
  • sql解析。Sharding jdbc使用阿里的Druid库解析sql。在这个过程中,Sharding jdbc实现了一个自己的sql解析内容缓存容器SqlBuilder。当语法分析中解析到一个表名的时候,在SqlBuilder中缓存一个sql相关的逻辑表名的token。并且,Sharding jdbc会将sql按照语义解析为多个segment。例如,”select id, name, price, publish, intro from book where id = ?”将解析为,”select id, name, price, publish, intro | from | book | where | id = ?”。
  • 分库分表路由。根据ShardingRule中指定的分库分表列的参数值,以及分库分表策略,实行分库分表,得到一个RoutingResult 。RoutingResult 中包含一个真实数据源,以及逻辑表名和实际表名。
  • sql改写。在SqlBuilder中,查找sql中解析的segment,将和逻辑表名一致的segment替换成实际表名。(segment中可以标注该地方是不是表名)
    以上代码执行结果如下:

Parsed SQL result: SQLParsedResult(routeContext=RouteContext(tables=[Table(name=book, alias=Optional.absent())], sqlStatementType=null, sqlBuilder=null), generatedKeyContext=GeneratedKeyContext(columns=[], columnNameToIndexMap={}, valueTable={}, rowIndex=0, columnIndex=0, autoGeneratedKeys=0, columnIndexes=null, columnNames=null), conditionContexts=[ConditionContext(conditions={})], mergeContext=MergeContext(orderByColumns=[], groupByColumns=[], aggregationColumns=[], limit=null))
Parsed SQL: SELECT id, name, price, publish, intro FROM [Token(book)] WHERE id = ?
dataSource1 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]
dataSource1 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource1 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]

实际上,我们可以用更通俗易懂的代码表示sql改写的这个过程:

String logicSql = “select id, name, price, publish, intro from book where id = 111″;
MySqlStatementParser parser = new MySqlStatementParser(logicSql);
SQLStatement statement = parser.parseStatement();
MySQLSimpleVisitor visitor = new MySQLSimpleVisitor();
statement.accept(visitor);

String logicTable = “book”;
String realTable = “book_00″;
String token = “\\$\\{” + logicTable + “\\}”;

String sqlBuilder = visitor.getAppender().toString();
String sql = sqlBuilder.replaceAll(token, realTable);

System.out.println(sqlBuilder);
System.out.println(sql);

 

结果如下:

SELECT id, name, price, publish, intro
FROM ${book}
WHERE id = 111
SELECT id, name, price, publish, intro
FROM book_00
WHERE id = 111

以上,大致将Sharding jdbc的原理及实现过程介绍了一下,如果想要了解正真的实现过程和细节,还需要对照代码仔细推敲。

本文的实现环境:

<dependency>
<groupId>com.dangdang</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>sharding-jdbc-config-spring</artifactId>
<version>1.4.0</version>
</dependency>

application.xml:

<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xmlns:tx=”http://www.springframework.org/schema/tx”
xmlns:context=”http://www.springframework.org/schema/context”
xmlns:rdb=”http://www.dangdang.com/schema/ddframe/rdb”
xsi:schemaLocation=”
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.dangdang.com/schema/ddframe/rdb
http://www.dangdang.com/schema/ddframe/rdb/rdb.xsd”>

<context:property-placeholder location=”classpath:jdbc.properties” ignore-unresolvable=”true” />

<bean id=”dataSource0″ class=”org.springframework.jdbc.datasource.DriverManagerDataSource”>
<property name=”driverClassName” value=”com.mysql.jdbc.Driver” />
<property name=”url” value=”${jdbc.mysql.url0}” />
<property name=”username” value=”${jdbc.mysql.username0}” />
<property name=”password” value=”${jdbc.mysql.password0}” />
</bean>

<bean id=”dataSource1″ class=”org.springframework.jdbc.datasource.DriverManagerDataSource”>
<property name=”driverClassName” value=”${driver}” />
<property name=”url” value=”${jdbc.mysql.url1}” />
<property name=”username” value=”${jdbc.mysql.username1}” />
<property name=”password” value=”${jdbc.mysql.password1}” />
</bean>

<bean id=”dataSource2″ class=”org.springframework.jdbc.datasource.DriverManagerDataSource”>
<property name=”driverClassName” value=”${driver}” />
<property name=”url” value=”${jdbc.mysql.url2}” />
<property name=”username” value=”${jdbc.mysql.username2}” />
<property name=”password” value=”${jdbc.mysql.password2}” />
</bean>

<!– sharding jdbc –>
<rdb:strategy id=”tableShardingStrategy” sharding-columns=”id”
algorithm-class=”com.wy.sharding.MemberSingleKeyTableShardingAlgorithm” />

<rdb:data-source id=”shardingDataSource”>
<rdb:sharding-rule data-sources=”dataSource0,dataSource1,dataSource2″>
<rdb:table-rules>
<rdb:table-rule logic-table=”book”
actual-tables=”book_0${0..2}”
table-strategy=”tableShardingStrategy”/>
</rdb:table-rules>
</rdb:sharding-rule>
</rdb:data-source>
</beans>

MemberSingleKeyTableShardingAlgorithm.java:

public class MemberSingleKeyTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {

public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) {
String routeDBSuffix = getRouteDBSuffix(shardingValue.getValue());
for (String each : availableTargetNames) {
if (each.endsWith(routeDBSuffix)) {
return each;
}
}
throw new IllegalArgumentException();
}

public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
for (int value : shardingValue.getValues()) {
String routeDBSuffix = getRouteDBSuffix(value);
for (String tableName : availableTargetNames) {
if (tableName.endsWith(routeDBSuffix)) {
result.add(tableName);
}
}
}
return result;
}

public Collection<String> doBetweenSharding(Collection<String> availableTargetNames,
ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
Range<Integer> range = (Range<Integer>) shardingValue.getValueRange();
for (int i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
String routeDBSuffix = getRouteDBSuffix(i);
for (String each : availableTargetNames) {
if (each.endsWith(routeDBSuffix)) {
result.add(each);
}
}
}
return result;
}

public String getRouteDBSuffix(Integer shardingCode) {
int modValue = shardingCode % 3;
return “0” + modValue;
}

}

 

1212223242533
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |