美团的领域驱动设计

至少30年以前,一些软件设计人员就已经意识到领域建模和设计的重要性,并形成一种思潮,Eric Evans将其定义为领域驱动设计(Domain-Driven Design,简称DDD)。在互联网开发“小步快跑,迭代试错”的大环境下,DDD似乎是一种比较“古老而缓慢”的思想。然而,由于互联网公司也逐渐深入实体经济,业务日益复杂,我们在开发中也越来越多地遇到传统行业软件开发中所面临的问题。本文就先来讲一下这些问题,然后再尝试在实践中用DDD的思想来解决这些问题。

过度耦合

业务初期,我们的功能大都非常简单,普通的CRUD就能满足,此时系统是清晰的。随着迭代的不断演化,业务逻辑变得越来越复杂,我们的系统也越来越冗杂。模块彼此关联,谁都很难说清模块的具体功能意图是啥。修改一个功能时,往往光回溯该功能需要的修改点就需要很长时间,更别提修改带来的不可预知的影响面。

下图是一个常见的系统耦合病例。

服务耦合示意图

服务耦合示意图

订单服务接口中提供了查询、创建订单相关的接口,也提供了订单评价、支付、保险的接口。同时我们的表也是一个订单大表,包含了非常多字段。在我们维护代码时,牵一发而动全身,很可能只是想改下评价相关的功能,却影响到了创单核心路径。虽然我们可以通过测试保证功能完备性,但当我们在订单领域有大量需求同时并行开发时,改动重叠、恶性循环、疲于奔命修改各种问题。

上述问题,归根到底在于系统架构不清晰,划分出来的模块内聚度低、高耦合。

有一种解决方案,按照演进式设计的理论,让系统的设计随着系统实现的增长而增长。我们不需要作提前设计,就让系统伴随业务成长而演进。这当然是可行的,敏捷实践中的重构、测试驱动设计及持续集成可以对付各种混乱问题。重构——保持行为不变的代码改善清除了不协调的局部设计,测试驱动设计确保对系统的更改不会导致系统丢失或破坏现有功能,持续集成则为团队提供了同一代码库。

在这三种实践中,重构是克服演进式设计中大杂烩问题的主力,通过在单独的类及方法级别上做一系列小步重构来完成。我们可以很容易重构出一个独立的类来放某些通用的逻辑,但是你会发现你很难给它一个业务上的含义,只能给予一个技术维度描绘的含义。这会带来什么问题呢?新同学并不总是知道对通用逻辑的改动或获取来自该类。显然,制定项目规范并不是好的idea。我们又闻到了代码即将腐败的味道。

事实上,你可能意识到问题之所在。在解决现实问题时,我们会将问题映射到脑海中的概念模型,在模型中解决问题,再将解决方案转换为实际的代码。上述问题在于我们解决了设计到代码之间的重构,但提炼出来的设计模型,并不具有实际的业务含义,这就导致在开发新需求时,其他同学并不能很自然地将业务问题映射到该设计模型。设计似乎变成了重构者的自娱自乐,代码继续腐败,重新重构……无休止的循环。

用DDD则可以很好地解决领域模型到设计模型的同步、演化,最后再将反映了领域的设计模型转为实际的代码。

注:模型是我们解决实际问题所抽象出来的概念模型,领域模型则表达与业务相关的事实;设计模型则描述了所要构建的系统。

贫血症和失忆症

贫血领域对象

贫血领域对象(Anemic Domain Object)是指仅用作数据载体,而没有行为和动作的领域对象。

在我们习惯了J2EE的开发模式后,Action/Service/DAO这种分层模式,会很自然地写出过程式代码,而学到的很多关于OO理论的也毫无用武之地。使用这种开发方式,对象只是数据的载体,没有行为。以数据为中心,以数据库ER设计作驱动。分层架构在这种开发模式下,可以理解为是对数据移动、处理和实现的过程。

以笔者最近开发的系统抽奖平台为例:

  • 场景需求

奖池里配置了很多奖项,我们需要按运营预先配置的概率抽中一个奖项。 实现非常简单,生成一个随机数,匹配符合该随机数生成概率的奖项即可。

  • 贫血模型实现方案

先设计奖池和奖项的库表配置。

抽奖ER图

抽奖ER图
  • 设计AwardPool和Award两个对象,只有简单的get和set属性的方法
class AwardPool {
    int awardPoolId;
    List<Award> awards;
    public List<Award> getAwards() {
        return awards;
    }
  
    public void setAwards(List<Award> awards) {
        this.awards = awards;
    }
    ......
}

class Award {
   int awardId;
   int probability;//概率
  
   ......
}
  • Service代码实现

设计一个LotteryService,在其中的drawLottery()方法写服务逻辑

AwardPool awardPool = awardPoolDao.getAwardPool(poolId);//sql查询,将数据映射到AwardPool对象
for (Award award : awardPool.getAwards()) {
   //寻找到符合award.getProbability()概率的award
}
  • 按照我们通常思路实现,可以发现:在业务领域里非常重要的抽奖,我的业务逻辑都是写在Service中的,Award充其量只是个数据载体,没有任何行为。简单的业务系统采用这种贫血模型和过程化设计是没有问题的,但在业务逻辑复杂了,业务逻辑、状态会散落到在大量方法中,原本的代码意图会渐渐不明确,我们将这种情况称为由贫血症引起的失忆症。

更好的是采用领域模型的开发方式,将数据和行为封装在一起,并与现实世界中的业务对象相映射。各类具备明确的职责划分,将领域逻辑分散到领域对象中。继续举我们上述抽奖的例子,使用概率选择对应的奖品就应当放到AwardPool类中。

软件系统复杂性应对

解决复杂和大规模软件的武器可以被粗略地归为三类:抽象、分治和知识。

分治 把问题空间分割为规模更小且易于处理的若干子问题。分割后的问题需要足够小,以便一个人单枪匹马就能够解决他们;其次,必须考虑如何将分割后的各个部分装配为整体。分割得越合理越易于理解,在装配成整体时,所需跟踪的细节也就越少。即更容易设计各部分的协作方式。评判什么是分治得好,即高内聚低耦合。

抽象 使用抽象能够精简问题空间,而且问题越小越容易理解。举个例子,从北京到上海出差,可以先理解为使用交通工具前往,但不需要一开始就想清楚到底是高铁还是飞机,以及乘坐他们需要注意什么。

知识 顾名思义,DDD可以认为是知识的一种。

DDD提供了这样的知识手段,让我们知道如何抽象出限界上下文以及如何去分治。

与微服务架构相得益彰

微服务架构众所周知,此处不做赘述。我们创建微服务时,需要创建一个高内聚、低耦合的微服务。而DDD中的限界上下文则完美匹配微服务要求,可以将该限界上下文理解为一个微服务进程。

上述是从更直观的角度来描述两者的相似处。

在系统复杂之后,我们都需要用分治来拆解问题。一般有两种方式,技术维度和业务维度。技术维度是类似MVC这样,业务维度则是指按业务领域来划分系统。

微服务架构更强调从业务维度去做分治来应对系统复杂度,而DDD也是同样的着重业务视角。 如果两者在追求的目标(业务维度)达到了上下文的统一,那么在具体做法上有什么联系和不同呢?

我们将架构设计活动精简为以下三个层面:

  • 业务架构——根据业务需求设计业务模块及其关系
  • 系统架构——设计系统和子系统的模块
  • 技术架构——决定采用的技术及框架

以上三种活动在实际开发中是有先后顺序的,但不一定孰先孰后。在我们解决常规套路问题时,我们会很自然地往熟悉的分层架构套(先确定系统架构),或者用PHP开发很快(先确定技术架构),在业务不复杂时,这样是合理的。

跳过业务架构设计出来的架构关注点不在业务响应上,可能就是个大泥球,在面临需求迭代或响应市场变化时就很痛苦。

DDD的核心诉求就是将业务架构映射到系统架构上,在响应业务变化调整业务架构时,也随之变化系统架构。而微服务追求业务层面的复用,设计出来的系统架构和业务一致;在技术架构上则系统模块之间充分解耦,可以自由地选择合适的技术架构,去中心化地治理技术和数据。

可以参见下图来更好地理解双方之间的协作关系:

DDD与微服务关系

DDD与微服务关系

我们将通过上文提到的抽奖平台,来详细介绍我们如何通过DDD来解构一个中型的基于微服务架构的系统,从而做到系统的高内聚、低耦合。

首先看下抽奖系统的大致需求: 运营——可以配置一个抽奖活动,该活动面向一个特定的用户群体,并针对一个用户群体发放一批不同类型的奖品(优惠券,激活码,实物奖品等)。 用户-通过活动页面参与不同类型的抽奖活动。

设计领域模型的一般步骤如下:

  1. 根据需求划分出初步的领域和限界上下文,以及上下文之间的关系;
  2. 进一步分析每个上下文内部,识别出哪些是实体,哪些是值对象;
  3. 对实体、值对象进行关联和聚合,划分出聚合的范畴和聚合根;
  4. 为聚合根设计仓储,并思考实体或值对象的创建方式;
  5. 在工程中实践领域模型,并在实践中检验模型的合理性,倒推模型中不足的地方并重构。

战略建模

战略和战术设计是站在DDD的角度进行划分。战略设计侧重于高层次、宏观上去划分和集成限界上下文,而战术设计则关注更具体使用建模工具来细化上下文。

领域

现实世界中,领域包含了问题域和解系统。一般认为软件是对现实世界的部分模拟。在DDD中,解系统可以映射为一个个限界上下文,限界上下文就是软件对于问题域的一个特定的、有限的解决方案。

限界上下文

限界上下文

一个由显示边界限定的特定职责。领域模型便存在于这个边界之内。在边界内,每一个模型概念,包括它的属性和操作,都具有特殊的含义。

一个给定的业务领域会包含多个限界上下文,想与一个限界上下文沟通,则需要通过显示边界进行通信。系统通过确定的限界上下文来进行解耦,而每一个上下文内部紧密组织,职责明确,具有较高的内聚性。

一个很形象的隐喻:细胞质所以能够存在,是因为细胞膜限定了什么在细胞内,什么在细胞外,并且确定了什么物质可以通过细胞膜。

划分限界上下文

划分限界上下文,不管是Eric Evans还是Vaughn Vernon,在他们的大作里都没有怎么提及。

显然我们不应该按技术架构或者开发任务来创建限界上下文,应该按照语义的边界来考虑。

我们的实践是,考虑产品所讲的通用语言,从中提取一些术语称之为概念对象,寻找对象之间的联系;或者从需求里提取一些动词,观察动词和对象之间的关系;我们将紧耦合的各自圈在一起,观察他们内在的联系,从而形成对应的界限上下文。形成之后,我们可以尝试用语言来描述下界限上下文的职责,看它是否清晰、准确、简洁和完整。简言之,限界上下文应该从需求出发,按领域划分。

前文提到,我们的用户划分为运营和用户。其中,运营对抽奖活动的配置十分复杂但相对低频。用户对这些抽奖活动配置的使用是高频次且无感知的。根据这样的业务特点,我们首先将抽奖平台划分为C端抽奖和M端抽奖管理平台两个子域,让两者完全解耦。

抽奖平台领域

抽奖平台领域

在确认了M端领域和C端的限界上下文后,我们再对各自上下文内部进行限界上下文的划分。下面我们用C端进行举例。

产品的需求概述如下:

1. 抽奖活动有活动限制,例如用户的抽奖次数限制,抽奖的开始和结束的时间等;
2. 一个抽奖活动包含多个奖品,可以针对一个或多个用户群体;
3. 奖品有自身的奖品配置,例如库存量,被抽中的概率等,最多被一个用户抽中的次数等等;
4. 用户群体有多种区别方式,如按照用户所在城市区分,按照新老客区分等;
5. 活动具有风控配置,能够限制用户参与抽奖的频率。

根据产品的需求,我们提取了一些关键性的概念作为子域,形成我们的限界上下文。

C端抽奖领域

C端抽奖领域

首先,抽奖上下文作为整个领域的核心,承担着用户抽奖的核心业务,抽奖中包含了奖品和用户群体的概念。

  • 在设计初期,我们曾经考虑划分出抽奖和发奖两个领域,前者负责选奖,后者负责将选中的奖品发放出去。但在实际开发过程中,我们发现这两部分的逻辑紧密连接,难以拆分。并且单纯的发奖逻辑足够简单,仅仅是调用第三方服务进行发奖,不足以独立出来成为一个领域。

对于活动的限制,我们定义了活动准入的通用语言,将活动开始/结束时间,活动可参与次数等限制条件都收拢到活动准入上下文中。

对于抽奖的奖品库存量,由于库存的行为与奖品本身相对解耦,库存关注点更多是库存内容的核销,且库存本身具备通用性,可以被奖品之外的内容使用,因此我们定义了独立的库存上下文。

由于C端存在一些刷单行为,我们根据产品需求定义了风控上下文,用于对活动进行风控。 最后,活动准入、风控、抽奖等领域都涉及到一些次数的限制,因此我们定义了计数上下文。

可以看到,通过DDD的限界上下文划分,我们界定出抽奖、活动准入、风控、计数、库存等五个上下文,每个上下文在系统中都高度内聚。

上下文映射图

在进行上下文划分之后,我们还需要进一步梳理上下文之间的关系。

康威(梅尔·康威)定律

任何组织在设计一套系统(广义概念上的系统)时,所交付的设计方案在结构上都与该组织的沟通结构保持一致。

康威定律告诉我们,系统结构应尽量的与组织结构保持一致。这里,我们认为团队结构(无论是内部组织还是团队间组织)就是组织结构,限界上下文就是系统的业务结构。因此,团队结构应该和限界上下文保持一致。

梳理清楚上下文之间的关系,从团队内部的关系来看,有如下好处:

  1. 任务更好拆分,一个开发人员可以全身心的投入到相关的一个单独的上下文中;
  2. 沟通更加顺畅,一个上下文可以明确自己对其他上下文的依赖关系,从而使得团队内开发直接更好的对接。

从团队间的关系来看,明确的上下文关系能够带来如下帮助:

  1. 每个团队在它的上下文中能够更加明确自己领域内的概念,因为上下文是领域的解系统;
  2. 对于限界上下文之间发生交互,团队与上下文的一致性,能够保证我们明确对接的团队和依赖的上下游。

限界上下文之间的映射关系

  • 合作关系(Partnership):两个上下文紧密合作的关系,一荣俱荣,一损俱损。
  • 共享内核(Shared Kernel):两个上下文依赖部分共享的模型。
  • 客户方-供应方开发(Customer-Supplier Development):上下文之间有组织的上下游依赖。
  • 遵奉者(Conformist):下游上下文只能盲目依赖上游上下文。
  • 防腐层(Anticorruption Layer):一个上下文通过一些适配和转换与另一个上下文交互。
  • 开放主机服务(Open Host Service):定义一种协议来让其他上下文来对本上下文进行访问。
  • 发布语言(Published Language):通常与OHS一起使用,用于定义开放主机的协议。
  • 大泥球(Big Ball of Mud):混杂在一起的上下文关系,边界不清晰。
  • 另谋他路(SeparateWay):两个完全没有任何联系的上下文。

上文定义了上下文映射间的关系,经过我们的反复斟酌,抽奖平台上下文的映射关系图如下:

上下文映射关系

上下文映射关系

由于抽奖,风控,活动准入,库存,计数五个上下文都处在抽奖领域的内部,所以它们之间符合“一荣俱荣,一损俱损”的合作关系(PartnerShip,简称PS)。

同时,抽奖上下文在进行发券动作时,会依赖券码、平台券、外卖券三个上下文。抽奖上下文通过防腐层(Anticorruption Layer,ACL)对三个上下文进行了隔离,而三个券上下文通过开放主机服务(Open Host Service)作为发布语言(Published Language)对抽奖上下文提供访问机制。

通过上下文映射关系,我们明确的限制了限界上下文的耦合性,即在抽奖平台中,无论是上下文内部交互(合作关系)还是与外部上下文交互(防腐层),耦合度都限定在数据耦合(Data Coupling)的层级。

战术建模——细化上下文

梳理清楚上下文之间的关系后,我们需要从战术层面上剖析上下文内部的组织关系。首先看下DDD中的一些定义。

实体

当一个对象由其标识(而不是属性)区分时,这种对象称为实体(Entity)。

例:最简单的,公安系统的身份信息录入,对于人的模拟,即认为是实体,因为每个人是独一无二的,且其具有唯一标识(如公安系统分发的身份证号码)。

在实践上建议将属性的验证放到实体中。

值对象

当一个对象用于对事务进行描述而没有唯一标识时,它被称作值对象(Value Object)。

例:比如颜色信息,我们只需要知道{“name”:“黑色”,”css”:“#000000”}这样的值信息就能够满足要求了,这避免了我们对标识追踪带来的系统复杂性。

值对象很重要,在习惯了使用数据库的数据建模后,很容易将所有对象看作实体。使用值对象,可以更好地做系统优化、精简设计。

它具有不变性、相等性和可替换性。

在实践中,需要保证值对象创建后就不能被修改,即不允许外部再修改其属性。在不同上下文集成时,会出现模型概念的公用,如商品模型会存在于电商的各个上下文中。在订单上下文中如果你只关注下单时商品信息快照,那么将商品对象视为值对象是很好的选择。

聚合根

Aggregate(聚合)是一组相关对象的集合,作为一个整体被外界访问,聚合根(Aggregate Root)是这个聚合的根节点。

聚合是一个非常重要的概念,核心领域往往都需要用聚合来表达。其次,聚合在技术上有非常高的价值,可以指导详细设计。

聚合由根实体,值对象和实体组成。

如何创建好的聚合?

  • 边界内的内容具有一致性:在一个事务中只修改一个聚合实例。如果你发现边界内很难接受强一致,不管是出于性能或产品需求的考虑,应该考虑剥离出独立的聚合,采用最终一致的方式。
  • 设计小聚合:大部分的聚合都可以只包含根实体,而无需包含其他实体。即使一定要包含,可以考虑将其创建为值对象。
  • 通过唯一标识来引用其他聚合或实体:当存在对象之间的关联时,建议引用其唯一标识而非引用其整体对象。如果是外部上下文中的实体,引用其唯一标识或将需要的属性构造值对象。 如果聚合创建复杂,推荐使用工厂方法来屏蔽内部复杂的创建逻辑。

聚合内部多个组成对象的关系可以用来指导数据库创建,但不可避免存在一定的抗阻。如聚合中存在List<值对象>,那么在数据库中建立1:N的关联需要将值对象单独建表,此时是有id的,建议不要将该id暴露到资源库外部,对外隐蔽。

领域服务

一些重要的领域行为或操作,可以归类为领域服务。它既不是实体,也不是值对象的范畴。

当我们采用了微服务架构风格,一切领域逻辑的对外暴露均需要通过领域服务来进行。如原本由聚合根暴露的业务逻辑也需要依托于领域服务。

领域事件

领域事件是对领域内发生的活动进行的建模。

抽奖平台的核心上下文是抽奖上下文,接下来介绍下我们对抽奖上下文的建模。

抽奖上下文

抽奖上下文

在抽奖上下文中,我们通过抽奖(DrawLottery)这个聚合根来控制抽奖行为,可以看到,一个抽奖包括了抽奖ID(LotteryId)以及多个奖池(AwardPool),而一个奖池针对一个特定的用户群体(UserGroup)设置了多个奖品(Award)。

另外,在抽奖领域中,我们还会使用抽奖结果(SendResult)作为输出信息,使用用户领奖记录(UserLotteryLog)作为领奖凭据和存根。

谨慎使用值对象

在实践中,我们发现虽然一些领域对象符合值对象的概念,但是随着业务的变动,很多原有的定义会发生变更,值对象可能需要在业务意义具有唯一标识,而对这类值对象的重构往往需要较高成本。因此在特定的情况下,我们也要根据实际情况来权衡领域对象的选型。

DDD工程实现

在对上下文进行细化后,我们开始在工程中真正落地DDD。

模块

模块(Module)是DDD中明确提到的一种控制限界上下文的手段,在我们的工程中,一般尽量用一个模块来表示一个领域的限界上下文。

如代码中所示,一般的工程中包的组织方式为{com.公司名.组织架构.业务.上下文.*},这样的组织结构能够明确的将一个上下文限定在包的内部。

import com.company.team.bussiness.lottery.*;//抽奖上下文
import com.company.team.bussiness.riskcontrol.*;//风控上下文
import com.company.team.bussiness.counter.*;//计数上下文
import com.company.team.bussiness.condition.*;//活动准入上下文
import com.company.team.bussiness.stock.*;//库存上下文

代码演示1 模块的组织

对于模块内的组织结构,一般情况下我们是按照领域对象、领域服务、领域资源库、防腐层等组织方式定义的。

import com.company.team.bussiness.lottery.domain.valobj.*;//领域对象-值对象
import com.company.team.bussiness.lottery.domain.entity.*;//领域对象-实体
import com.company.team.bussiness.lottery.domain.aggregate.*;//领域对象-聚合根
import com.company.team.bussiness.lottery.service.*;//领域服务
import com.company.team.bussiness.lottery.repo.*;//领域资源库
import com.company.team.bussiness.lottery.facade.*;//领域防腐层

代码演示2 模块的组织

每个模块的具体实现,我们将在下文中展开。

领域对象

前文提到,领域驱动要解决的一个重要的问题,就是解决对象的贫血问题。这里我们用之前定义的抽奖(DrawLottery)聚合根和奖池(AwardPool)值对象来具体说明。

抽奖聚合根持有了抽奖活动的id和该活动下的所有可用奖池列表,它的一个最主要的领域功能就是根据一个抽奖发生场景(DrawLotteryContext),选择出一个适配的奖池,即chooseAwardPool方法。

chooseAwardPool的逻辑是这样的:DrawLotteryContext会带有用户抽奖时的场景信息(抽奖得分或抽奖时所在的城市),DrawLottery会根据这个场景信息,匹配一个可以给用户发奖的AwardPool。

package com.company.team.bussiness.lottery.domain.aggregate;
import ...;
  
public class DrawLottery {
    private int lotteryId; //抽奖id
    private List<AwardPool> awardPools; //奖池列表
  
    //getter & setter
    public void setLotteryId(int lotteryId) {
        if(id<=0){
            throw new IllegalArgumentException("非法的抽奖id"); 
        }
        this.lotteryId = lotteryId;
    }
  
    //根据抽奖入参context选择奖池
    public AwardPool chooseAwardPool(DrawLotteryContext context) {
        if(context.getMtCityInfo()!=null) {
            return chooseAwardPoolByCityInfo(awardPools, context.getMtCityInfo());
        } else {
            return chooseAwardPoolByScore(awardPools, context.getGameScore());
        }
    }
     
    //根据抽奖所在城市选择奖池
    private AwardPool chooseAwardPoolByCityInfo(List<AwardPool> awardPools, MtCifyInfo cityInfo) {
        for(AwardPool awardPool: awardPools) {
            if(awardPool.matchedCity(cityInfo.getCityId())) {
                return awardPool;
            }
        }
        return null;
    }
  
    //根据抽奖活动得分选择奖池
    private AwardPool chooseAwardPoolByScore(List<AwardPool> awardPools, int gameScore) {...}
}

代码演示3 DrawLottery

在匹配到一个具体的奖池之后,需要确定最后给用户的奖品是什么。这部分的领域功能在AwardPool内。

package com.company.team.bussiness.lottery.domain.valobj;
import ...;
  
public class AwardPool {
    private String cityIds;//奖池支持的城市
    private String scores;//奖池支持的得分
    private int userGroupType;//奖池匹配的用户类型
    private List<Awrad> awards;//奖池中包含的奖品
  
    //当前奖池是否与城市匹配
    public boolean matchedCity(int cityId) {...}
  
    //当前奖池是否与用户得分匹配
    public boolean matchedScore(int score) {...}
  
    //根据概率选择奖池
    public Award randomGetAward() {
        int sumOfProbablity = 0;
        for(Award award: awards) {
            sumOfProbability += award.getAwardProbablity();
        }
        int randomNumber = ThreadLocalRandom.current().nextInt(sumOfProbablity);
        range = 0;
        for(Award award: awards) {
            range += award.getProbablity();
            if(randomNumber<range) {
                return award;
            }
        }
        return null;
    }
}

代码演示4 AwardPool

与以往的仅有getter、setter的业务对象不同,领域对象具有了行为,对象更加丰满。同时,比起将这些逻辑写在服务内(例如**Service),领域功能的内聚性更强,职责更加明确。

资源库

领域对象需要资源存储,存储的手段可以是多样化的,常见的无非是数据库,分布式缓存,本地缓存等。资源库(Repository)的作用,就是对领域的存储和访问进行统一管理的对象。在抽奖平台中,我们是通过如下的方式组织资源库的。

//数据库资源
import com.company.team.bussiness.lottery.repo.dao.AwardPoolDao;//数据库访问对象-奖池
import com.company.team.bussiness.lottery.repo.dao.AwardDao;//数据库访问对象-奖品
import com.company.team.bussiness.lottery.repo.dao.po.AwardPO;//数据库持久化对象-奖品
import com.company.team.bussiness.lottery.repo.dao.po.AwardPoolPO;//数据库持久化对象-奖池
  
import com.company.team.bussiness.lottery.repo.cache.DrawLotteryCacheAccessObj;//分布式缓存访问对象-抽奖缓存访问
import com.company.team.bussiness.lottery.repo.repository.DrawLotteryRepository;//资源库访问对象-抽奖资源库

代码演示5 Repository组织结构

资源库对外的整体访问由Repository提供,它聚合了各个资源库的数据信息,同时也承担了资源存储的逻辑(例如缓存更新机制等)。

在抽奖资源库中,我们屏蔽了对底层奖池和奖品的直接访问,而是仅对抽奖的聚合根进行资源管理。代码示例中展示了抽奖资源获取的方法(最常见的Cache Aside Pattern)。

比起以往将资源管理放在服务中的做法,由资源库对资源进行管理,职责更加明确,代码的可读性和可维护性也更强。

package com.company.team.bussiness.lottery.repo;
import ...;
  
@Repository
public class DrawLotteryRepository {
    @Autowired
    private AwardDao awardDao;
    @Autowired
    private AwardPoolDao awardPoolDao;
    @AutoWired
    private DrawLotteryCacheAccessObj drawLotteryCacheAccessObj;
  
    public DrawLottery getDrawLotteryById(int lotteryId) {
        DrawLottery drawLottery = drawLotteryCacheAccessObj.get(lotteryId);
        if(drawLottery!=null){
            return drawLottery;
        }
        drawLottery = getDrawLotteyFromDB(lotteryId);
        drawLotteryCacheAccessObj.add(lotteryId, drawLottery);
        return drawLottery;
    }
  
    private DrawLottery getDrawLotteryFromDB(int lotteryId) {...}
}

代码演示6 DrawLotteryRepository

防腐层

亦称适配层。在一个上下文中,有时需要对外部上下文进行访问,通常会引入防腐层的概念来对外部上下文的访问进行一次转义。

有以下几种情况会考虑引入防腐层:

  • 需要将外部上下文中的模型翻译成本上下文理解的模型。
  • 不同上下文之间的团队协作关系,如果是供奉者关系,建议引入防腐层,避免外部上下文变化对本上下文的侵蚀。
  • 该访问本上下文使用广泛,为了避免改动影响范围过大。

如果内部多个上下文对外部上下文需要访问,那么可以考虑将其放到通用上下文中。

在抽奖平台中,我们定义了用户城市信息防腐层(UserCityInfoFacade),用于外部的用户城市信息上下文(微服务架构下表现为用户城市信息服务)。

以用户信息防腐层举例,它以抽奖请求参数(LotteryContext)为入参,以城市信息(MtCityInfo)为输出。

package com.company.team.bussiness.lottery.facade;
import ...;
  
@Component
public class UserCityInfoFacade {
    @Autowired
    private LbsService lbsService;//外部用户城市信息RPC服务
     
    public MtCityInfo getMtCityInfo(LotteryContext context) {
        LbsReq lbsReq = new LbsReq();
        lbsReq.setLat(context.getLat());
        lbsReq.setLng(context.getLng());
        LbsResponse resp = lbsService.getLbsCityInfo(lbsReq);
        return buildMtCifyInfo(resp);
    }
  
    private MtCityInfo buildMtCityInfo(LbsResponse resp) {...}
}

代码演示7 UserCityInfoFacade

领域服务

上文中,我们将领域行为封装到领域对象中,将资源管理行为封装到资源库中,将外部上下文的交互行为封装到防腐层中。此时,我们再回过头来看领域服务时,能够发现领域服务本身所承载的职责也就更加清晰了,即就是通过串联领域对象、资源库和防腐层等一系列领域内的对象的行为,对其他上下文提供交互的接口。

我们以抽奖服务为例(issueLottery),可以看到在省略了一些防御性逻辑(异常处理,空值判断等)后,领域服务的逻辑已经足够清晰明了。

package com.company.team.bussiness.lottery.service.impl
import ...;
  
@Service
public class LotteryServiceImpl implements LotteryService {
    @Autowired
    private DrawLotteryRepository drawLotteryRepo;
    @Autowired
    private UserCityInfoFacade UserCityInfoFacade;
    @Autowired
    private AwardSendService awardSendService;
    @Autowired
    private AwardCounterFacade awardCounterFacade;
  
    @Override
    public IssueResponse issueLottery(LotteryContext lotteryContext) {
        DrawLottery drawLottery = drawLotteryRepo.getDrawLotteryById(lotteryContext.getLotteryId());//获取抽奖配置聚合根
        awardCounterFacade.incrTryCount(lotteryContext);//增加抽奖计数信息
        AwardPool awardPool = lotteryConfig.chooseAwardPool(bulidDrawLotteryContext(drawLottery, lotteryContext));//选中奖池
        Award award = awardPool.randomChooseAward();//选中奖品
        return buildIssueResponse(awardSendService.sendAward(award, lotteryContext));//发出奖品实体
    }
  
    private IssueResponse buildIssueResponse(AwardSendResponse awardSendResponse) {...}
}

代码演示8 LotteryService

数据流转

数据流转

数据流转

在抽奖平台的实践中,我们的数据流转如上图所示。 首先领域的开放服务通过信息传输对象(DTO)来完成与外界的数据交互;在领域内部,我们通过领域对象(DO)作为领域内部的数据和行为载体;在资源库内部,我们沿袭了原有的数据库持久化对象(PO)进行数据库资源的交互。同时,DTO与DO的转换发生在领域服务内,DO与PO的转换发生在资源库内。

与以往的业务服务相比,当前的编码规范可能多造成了一次数据转换,但每种数据对象职责明确,数据流转更加清晰。

上下文集成

通常集成上下文的手段有多种,常见的手段包括开放领域服务接口、开放HTTP服务以及消息发布-订阅机制。

在抽奖系统中,我们使用的是开放服务接口进行交互的。最明显的体现是计数上下文,它作为一个通用上下文,对抽奖、风控、活动准入等上下文都提供了访问接口。 同时,如果在一个上下文对另一个上下文进行集成时,若需要一定的隔离和适配,可以引入防腐层的概念。这一部分的示例可以参考前文的防腐层代码示例。

分离领域

接下来讲解在实施领域模型的过程中,如何应用到系统架构中。

我们采用的微服务架构风格,与Vernon在《实现领域驱动设计》并不太一致,更具体差异可阅读他的书体会。

如果我们维护一个从前到后的应用系统:

下图中领域服务是使用微服务技术剥离开来,独立部署,对外暴露的只能是服务接口,领域对外暴露的业务逻辑只能依托于领域服务。而在Vernon著作中,并未假定微服务架构风格,因此领域层暴露的除了领域服务外,还有聚合、实体和值对象等。此时的应用服务层是比较简单的,获取来自接口层的请求参数,调度多个领域服务以实现界面层功能。

DDD-分层

DDD-分层

随着业务发展,业务系统快速膨胀,我们的系统属于核心时:

应用服务虽然没有领域逻辑,但涉及到了对多个领域服务的编排。当业务规模庞大到一定程度,编排本身就富含了业务逻辑(除此之外,应用服务在稳定性、性能上所做的措施也希望统一起来,而非散落各处),那么此时应用服务对于外部来说是一个领域服务,整体看起来则是一个独立的限界上下文。

此时应用服务对内还属于应用服务,对外已是领域服务的概念,需要将其暴露为微服务。

DDD-系统架构图

DDD-系统架构图

注:具体的架构实践可按照团队和业务的实际情况来,此处仅为作者自身的业务实践。除分层架构外,如CQRS架构也是不错的选择

以下是一个示例。我们定义了抽奖、活动准入、风险控制等多个领域服务。在本系统中,我们需要集成多个领域服务,为客户端提供一套功能完备的抽奖应用服务。这个应用服务的组织如下:

package ...;
  
import ...;
  
@Service
public class LotteryApplicationService {
    @Autowired
    private LotteryRiskService riskService;
    @Autowired
    private LotteryConditionService conditionService;
    @Autowired
    private LotteryService lotteryService;
     
    //用户参与抽奖活动
    public Response<PrizeInfo, ErrorData> participateLottery(LotteryContext lotteryContext) {
        //校验用户登录信息
        validateLoginInfo(lotteryContext);
        //校验风控 
        RiskAccessToken riskToken = riskService.accquire(buildRiskReq(lotteryContext));
        ...
        //活动准入检查
        LotteryConditionResult conditionResult = conditionService.checkLotteryCondition(otteryContext.getLotteryId(),lotteryContext.getUserId());
        ...
        //抽奖并返回结果
        IssueResponse issueResponse = lotteryService.issurLottery(lotteryContext);
        if(issueResponse!=null && issueResponse.getCode()==IssueResponse.OK) {
            return buildSuccessResponse(issueResponse.getPrizeInfo());
        } else {   
            return buildErrorResponse(ResponseCode.ISSUE_LOTTERY_FAIL, ResponseMsg.ISSUE_LOTTERY_FAIL)
        }
    }
  
    private void validateLoginInfo(LotteryContext lotteryContext){...}
    private Response<PrizeInfo, ErrorData> buildErrorResponse (int code, String msg){...}
    private Response<PrizeInfo, ErrorData> buildSuccessResponse (PrizeInfo prizeInfo){...}
} 

代码演示9 LotteryApplicationService

在本文中,我们采用了分治的思想,从抽象到具体阐述了DDD在互联网真实业务系统中的实践。通过领域驱动设计这个强大的武器,我们将系统解构的更加合理。

但值得注意的是,如果你面临的系统很简单或者做一些SmartUI之类,那么你不一定需要DDD。尽管本文对贫血模型、演进式设计提出了些许看法,但它们在特定范围和具体场景下会更高效。读者需要针对自己的实际情况,做一定取舍,适合自己的才是最好的。

引用地址:https://tech.meituan.com/2017/12/22/ddd-in-practice.html

WebSocket: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method

使用WebSocket做APP的通知服务,但是批量推送的时候报错:The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method

2021/08/05 09:45:34.364 INFO [task-33261] Base : ===> 收到消息转发给APP(userId=107):{“messageId”:”c73f2e6f-3c36-48a6-91ae-a59854aa992e”}
2021/08/05 09:45:34.365 INFO [task-33261] Base : 发送数据 -> 开始
2021/08/05 09:45:34.389 ERROR [task-33261] o.s.a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.ikonke.ws.system.bus.EventSenderListener.onApplicationEvent(com.ikonke.ws.system.bus.event.EventSender)
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1258) ~[tomcat-embed-websocket-9.0.41.jar!/:9.0.41]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textStart(WsRemoteEndpointImplBase.java:1220) ~[tomcat-embed-websocket-9.0.41.jar!/:9.0.41]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendString(WsRemoteEndpointImplBase.java:191) ~[tomcat-embed-websocket-9.0.41.jar!/:9.0.41]
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:37) ~[tomcat-embed-websocket-9.0.41.jar!/:9.0.41]
at com.ikonke.ws.socket.WSHandler.sendMessage(WSHandler.java:104) ~[classes!/:0.0.1-SNAPSHOT]
at com.ikonke.ws.system.bus.EventSenderListener.onApplicationEvent(EventSenderListener.java:69) ~[classes!/:0.0.1-SNAPSHOT]
at com.ikonke.ws.system.bus.EventSenderListener.onApplicationEvent(EventSenderListener.java:25) ~[classes!/:0.0.1-SNAPSHOT]
at com.ikonke.ws.system.bus.EventSenderListener$$FastClassBySpringCGLIB$$a5414582.invoke(<generated>) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.cloud.sleuth.instrument.async.TraceAsyncAspect.traceBackgroundThread(TraceAsyncAspect.java:70) ~[spring-cloud-sleuth-core-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.2.12.RELEASE.jar!/:5.2.12.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_171]
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68) ~[spring-cloud-sleuth-core-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68) ~[spring-cloud-sleuth-core-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]

TEXT_FULL_WRITING的状态是当前会话正在输出数据,那么需要将send的方法改为同步,如下:

public synchronized void sendMessage(Session session, String message) {
    try {
        logger("发送数据 ->  开始");
        session.getBasicRemote().sendText(message);
        logger("发送数据 -> 成功");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

 

HTTP状态码(记录)

HTTP状态码总的分为五类:

1开头:信息状态码

2开头:成功状态码

3开头:重定向状态码

4开头:客户端错误状态码

5开头:服务端错误状态码

 1XX:信息状态码

状态码 含义 描述
100 继续 初始的请求已经接受,请客户端继续发送剩余部分
101 切换协议 请求这要求服务器切换协议,服务器已确定切换

 2XX:成功状态码

状态码 含义 描述
200 成功 服务器已成功处理了请求
201 已创建 请求成功并且服务器创建了新的资源
202 已接受 服务器已接受请求,但尚未处理
203 非授权信息 服务器已成功处理请求,但返回的信息可能来自另一个来源
204 无内容 服务器成功处理了请求,但没有返回任何内容
205 重置内容 服务器处理成功,用户终端应重置文档视图
206 部分内容 服务器成功处理了部分GET请求

3XX:重定向状态码

状态码 含义 描述
300 多种选择 针对请求,服务器可执行多种操作
301 永久移动 请求的页面已永久跳转到新的url
302 临时移动 服务器目前从不同位置的网页响应请求,但请求仍继续使用原有位置来进行以后的请求
303 查看其他位置 请求者应当对不同的位置使用单独的GET请求来检索响应时,服务器返回此代码
304 未修改 自从上次请求后,请求的网页未修改过
305 使用代理 请求者只能使用代理访问请求的网页
307 临时重定向 服务器目前从不同位置的网页响应请求,但请求者应继续使用原有位置来进行以后的请求

4XX:客户端错误状态码

状态码 含义 描述
400 错误请求 服务器不理解请求的语法
401 未授权 请求要求用户的身份演验证
403 禁止 服务器拒绝请求
404 未找到 服务器找不到请求的页面
405 方法禁用 禁用请求中指定的方法
406 不接受 无法使用请求的内容特性响应请求的页面
407 需要代理授权 请求需要代理的身份认证
408 请求超时 服务器等候请求时发生超时
409 冲突 服务器在完成请求时发生冲突
410 已删除 客户端请求的资源已经不存在
411 需要有效长度 服务器不接受不含有效长度表头字段的请求
412 未满足前提条件 服务器未满足请求者在请求中设置的其中一个前提条件
413 请求实体过大 由于请求实体过大,服务器无法处理,因此拒绝请求
414 请求url过长 请求的url过长,服务器无法处理
415 不支持格式 服务器无法处理请求中附带媒体格式
416 范围无效 客户端请求的范围无效
417 未满足期望 服务器无法满足请求表头字段要求

 

5XX:服务端错误状态码

状态码 含义 描述
500 服务器错误 服务器内部错误,无法完成请求
501 尚未实施 服务器不具备完成请求的功能
502 错误网关 服务器作为网关或代理出现错误
503 服务不可用 服务器目前无法使用
504 网关超时 网关或代理服务器,未及时获取请求
505 不支持版本 服务器不支持请求中使用的HTTP协议版本

ConcurrentLinkedDeque类详解

一、Queue接口定义

boolean add(E e);//入列 会抛出异常

boolean offer(E e);//入列

E remove();//出列 会抛出异常

E poll();//出列

E element();//读取 会抛出异常

E peek();//读取

Queue是一种具有FIFO特点的数据结构,元素只能在队首进行“入列”操作,在队尾进行“出列”操作。Queue的接口非常简单,一共只有三种类型的操作:入列、出列、读取。

每种操作类型,都给出了两种方法,区别就是其中一种操作在队列的状态不满足某些要求时,会抛出异常;另一种,则直接返回特殊值(如null)。

再来看下JDK中QueueDeque这两种数据结构的接口定义,看看Deque和Queue相比有哪些增强:

二、Deque接口定义

Deque(double-ended queue)是一种双端队列,也就是说可以在任意一端进行“入列”,也可以在任意一端进行“出列”。

Queue接口的所有方法Deque都具备,只不过队首/队尾都可以进行“出列”和“入列”操作:

void addFirst(E e);//队首入列 会抛出异常

void addLast(E e);//队尾入列 会抛出异常

boolean offerFirst(E e);//队首入列

boolean offerLast(E e);//队尾入列

E removeFirst();//队首出列 会抛出异常

E removeLast();//队尾出列 会抛出异常

E pollFirst();//队首出列

E pollLast();//队尾出列

E getFirst();//队首读取 会抛出异常

E getLast();//队尾读取 会抛出异常

E peekFirst();//队首读取

E peekLast();//队尾读取

除此之外,Deque还可以当作“栈”来使用,我们知道“栈”是一种具有“LIFO”特点的数据结构,Deque提供了pushpoppeek这三个栈方法,一般实现这三个方法时,可以利用已有方法,即有如下映射关系:

Comparison of Stack and Deque methods
Stack Method Equivalent Deque Method
push(e) addFirst(e)
pop() removeFirst()
peek() peekFirst()

官网连接:https://docs.oracle.com/javase/8/docs/api/

三、ConcurrentLinkedDeque简介

ConcurrentLinkedDeque是JDK1.7时,J.U.C包引入的一个集合工具类。在JDK1.7之前,除了Stack类外,并没有其它适合并发环境的“栈”数据结构。ConcurrentLinkedDeque作为双端队列,可以当作“栈”来使用,并且高效地支持并发环境。

ConcurrentLinkedDequeConcurrentLinkedQueue一样,采用了无锁算法,底层基于自旋+CAS的方式实现。

Class ConcurrentLinkedDeque<E>

 

Class ConcurrentLinkedQueue<E>

四、ConcurrentLinkedDeque原理

1.队列结构:

我们先来看下ConcurrentLinkedDeque的内部结构:

public class ConcurrentLinkedDeque<E> extends AbstractCollection<E>
    implements Deque<E>, java.io.Serializable {

    /**
     * 头指针
     */
    private transient volatile Node<E> head;

    /**
     * 尾指针
     */
    private transient volatile Node<E> tail;

    private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;

    static {
        PREV_TERMINATOR = new Node<Object>();
        PREV_TERMINATOR.next = PREV_TERMINATOR;
        NEXT_TERMINATOR = new Node<Object>();
        NEXT_TERMINATOR.prev = NEXT_TERMINATOR;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedDeque.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    /**
     * 双链表结点定义
     */
    static final class Node<E> {
        volatile Node<E> prev;  // 前驱指针
        volatile E item;        // 结点值
        volatile Node<E> next;  // 后驱指针

        Node() {
        }

        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        void lazySetPrev(Node<E> val) {
            UNSAFE.putOrderedObject(this, prevOffset, val);
        }

        boolean casPrev(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long prevOffset;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    // ...
}

可以看到,ConcurrentLinkedDeque的内部和ConcurrentLinkedQueue类似,不过是一个双链表结构,每入队一个元素就是插入一个Node类型的结点。字段head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及Node对象的字段值。

QQ截图20181130105240

需要特别注意的是ConcurrentLinkedDeque包含两个特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
这两个字段初始时都指向一个值为null的空结点,这两个字段在结点删除时使用,后面会详细介绍:

QQ截图20181130105328

2.构造器定义

ConcurrentLinkedDeque包含两种构造器:

/**
 * 空构造器.
 */
public ConcurrentLinkedDeque() {
    head = tail = new Node<E>(null);

}
/**
 * 从已有集合,构造队列
 */
public ConcurrentLinkedDeque(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {  // 在队尾插入元素
            t.lazySetNext(newNode);
            newNode.lazySetPrev(t);
            t = newNode;
        }
    }
    initHeadTail(h, t);
}

我们重点看下空构造器,通过空构造器建立的ConcurrentLinkedDeque对象,其head和tail指针并非指向null,而是指向一个item值为null的Node结点——哨兵结点,如下图:

QQ截图20181130105328

3.入队操作

双端队列与普通队列的入队区别是:双端队列既可以在“队尾”插入元素,也可以在“队首”插入元素。ConcurrentLinkedDeque的入队方法有很多:addFirst(e)addLast(e)offerFirst(e)offerLast(e)

public void addFirst(E e) {
    linkFirst(e);
}

public void addLast(E e) {
    linkLast(e);
}

public boolean offerFirst(E e) {
    linkFirst(e);
    return true;
}

public boolean offerLast(E e) {
    linkLast(e);
    return true;
}

可以看到,队首“入队”其实就是调用了linkFirst(e)方法,而队尾“入队”是调用了 linkLast(e)方法。我们先来看下队首“入队”——linkFirst(e):

/**
 * 在队首插入一个元素.
 */
private void linkFirst(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e); // 创建待插入的结点

    restartFromHead:
    for (; ; )
        for (Node<E> h = head, p = h, q; ; ) {
            if ((q = p.prev) != null && (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                p = (h != (h = head)) ? h : q;
            else if (p.next == p) // PREV_TERMINATOR
                continue restartFromHead;
            else {
                // p is first node
                newNode.lazySetNext(p); // CAS piggyback
                if (p.casPrev(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != h) // hop two nodes at a time
                        casHead(h, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read prev
            }
        }
}

为了便于理解,我们以示例来看:假设有两个线程ThreadA和ThreadB同时进行入队操作。

①ThreadA先单独入队一个元素9

此时,ThreadA会执行CASE3分支:

else {                              // CASE3: p是队首结点
    newNode.lazySetNext(p);         // “新结点”的next指向队首结点
    if (p.casPrev(null, newNode)) { // 队首结点的prev指针指向“新结点”
        if (p != h) // hop two nodes at a time
            casHead(h, newNode);  // Failure is OK.
        return;
    }
    // 执行到此处说明CAS操作失败,有其它线程也在队首插入元素
}

队列的结构如下:
QQ截图20181130105240


②ThreadA入队一个元素2,同时ThreadB入队一个元素10

此时,依然执行CASE3分支,我们假设ThreadA操作成功,ThreadB操作失败:

else {                              // CASE3: p是队首结点
    newNode.lazySetNext(p);         // “新结点”的next指向队首结点
    if (p.casPrev(null, newNode)) { // 队首结点的prev指针指向“新结点”
        if (p != h) // hop two nodes at a time
            casHead(h, newNode);  // Failure is OK.
        return;
    }
    // 执行到此处说明CAS操作失败,有其它线程也在队首插入元素
}

ThreadA的CAS操作成功后,会进入以下判断:

if (p != h) // hop two nodes at a time
    casHead(h, newNode);  // Failure is OK.

上述判断的作用就是重置head头指针,可以看到,ConcurrentLinkedDeque其实是以每次跳2个结点的方式移动指针,这主要考虑到并发环境以这种hop跳的方式可以提升效率。

此时队列的机构如下:
QQ截图20181130105240

注意,此时ThreadB的p.casPrev(null, newNode)操作失败了,所以会进入下一次自旋,在下一次自旋中继续进入CASE3。如果ThreadA的casHead操作没有完成,ThreadB就进入了下一次自旋,则会进入分支1,重置指针p指向队首。最终队列结构如下:

QQ截图20181130105328

在队尾插入元素和队首类似,不再赘述,读者可以自己阅读源码。

4.出队操作

ConcurrentLinkedDeque的出队一样分为队首、队尾两种情况:removeFirst()pollFirst()removeLast()pollLast()

public E removeFirst() {
    return screenNullResult(pollFirst());
}

public E removeLast() {
    return screenNullResult(pollLast());
}

public E pollFirst() {
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

public E pollLast() {
    for (Node<E> p = last(); p != null; p = pred(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

可以看到,两个remove方法其实内部都调用了对应的poll方法,我们重点看下队尾的“出队”——pollLast方法:

public E pollLast() {
    for (Node<E> p = last(); p != null; p = pred(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

last方法用于寻找队尾结点,即满足p.next == null && p.prev != p的结点:

Node<E> last() {
    restartFromTail:
    for (; ; )
        for (Node<E> t = tail, p = t, q; ; ) {
            if ((q = p.next) != null &&
                (q = (p = q).next) != null)
                // Check for tail updates every other hop.
                // If p == q, we are sure to follow tail instead.
                p = (t != (t = tail)) ? t : q;
            else if (p == t
                // It is possible that p is NEXT_TERMINATOR,
                // but if so, the CAS is guaranteed to fail.
                || casTail(t, p))
                return p;
            else
                continue restartFromTail;
        }
}

pred方法用于寻找当前结点的前驱结点(如果前驱是自身,则返回队尾结点):

final Node<E> pred(Node<E> p) {
    Node<E> q = p.prev;
    return (p == q) ? last() : q;
}

unlink方法断开结点的链接:

/**
 * Unlinks non-null node x.
 */
void unlink(Node<E> x) {
    // assert x != null;
    // assert x.item == null;
    // assert x != PREV_TERMINATOR;
    // assert x != NEXT_TERMINATOR;

    final Node<E> prev = x.prev;
    final Node<E> next = x.next;
    if (prev == null) {
        unlinkFirst(x, next);
    } else if (next == null) {
        unlinkLast(x, prev);
    } else {
        Node<E> activePred, activeSucc;
        boolean isFirst, isLast;
        int hops = 1;

        // Find active predecessor
        for (Node<E> p = prev; ; ++hops) {
            if (p.item != null) {
                activePred = p;
                isFirst = false;
                break;
            }
            Node<E> q = p.prev;
            if (q == null) {
                if (p.next == p)
                    return;
                activePred = p;
                isFirst = true;
                break;
            } else if (p == q)
                return;
            else
                p = q;
        }

        // Find active successor
        for (Node<E> p = next; ; ++hops) {
            if (p.item != null) {
                activeSucc = p;
                isLast = false;
                break;
            }
            Node<E> q = p.next;
            if (q == null) {
                if (p.prev == p)
                    return;
                activeSucc = p;
                isLast = true;
                break;
            } else if (p == q)
                return;
            else
                p = q;
        }

        // TODO: better HOP heuristics
        if (hops < HOPS
            // always squeeze out interior deleted nodes
            && (isFirst | isLast))
            return;

        // Squeeze out deleted nodes between activePred and
        // activeSucc, including x.
        skipDeletedSuccessors(activePred);
        skipDeletedPredecessors(activeSucc);

        // Try to gc-unlink, if possible
        if ((isFirst | isLast) &&

            // Recheck expected state of predecessor and successor
            (activePred.next == activeSucc) &&
            (activeSucc.prev == activePred) &&
            (isFirst ? activePred.prev == null : activePred.item != null) &&
            (isLast ? activeSucc.next == null : activeSucc.item != null)) {

            updateHead(); // Ensure x is not reachable from head
            updateTail(); // Ensure x is not reachable from tail

            // Finally, actually gc-unlink
            x.lazySetPrev(isFirst ? prevTerminator() : x);
            x.lazySetNext(isLast ? nextTerminator() : x);
        }
    }
}

ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更丰富,但是由于底层结构是双链表,且完全采用CAS+自旋的无锁算法保证线程安全性,所以需要考虑各种并发情况,源码比ConcurrentLinkedQueue更加难懂,留待有精力作进一步分析。

五、总结

ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。由于队列本身是一种双链表结构,所以虽然算法看起来很简单,但其实需要考虑各种并发的情况,实现复杂度较高,并且ConcurrentLinkedDeque不具备实时的数据一致性,实际运用中,如果需要一种线程安全的栈结构,可以使用ConcurrentLinkedDeque。

另外,关于ConcurrentLinkedDeque还有以下需要注意的几点:

  1. ConcurrentLinkedDeque的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在一个线程在遍历队列结点而另一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就造成在遍历某个尚未被修改的结点时,在next方法返回时可以看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。
  2. size方法需要遍历链表,所以在并发情况下,其结果不一定是准确的,只能供参考。

Nginx 配置文件 nginx.conf 详解

#定义Nginx运行的用户和用户组

user www www;

 

#nginx进程数,建议设置为等于CPU总核心数。

worker_processes 8;

 

#全局错误日志定义类型,[ debug | info | notice | warn | error | crit ]

error_log /var/log/nginx/error.log info;

 

#进程文件

pid /var/run/nginx.pid;

 

#一个nginx进程打开的最多文件描述符数目,理论值应该是最多打开文件数(系统的值ulimit -n)与nginx进程数相除,但是nginx分配请求并不均匀,所以建议与ulimit -n的值保持一致。

worker_rlimit_nofile 65535;

 

#工作模式与连接数上限

events

{

#参考事件模型,use [ kqueue | rtsig | epoll | /dev/poll | select | poll ]; epoll模型是Linux 2.6以上版本内核中的高性能网络I/O模型,如果跑在FreeBSD上面,就用kqueue模型。

use epoll;

#单个进程最大连接数(最大连接数=连接数*进程数)

worker_connections 65535;

}

 

#设定http服务器

http

{

include mime.types; #文件扩展名与文件类型映射表

default_type application/octetstream; #默认文件类型

#charset utf-8; #默认编码

server_names_hash_bucket_size 128; #服务器名字的hash表大小

client_header_buffer_size 32k; #上传文件大小限制

large_client_header_buffers 4 64k; #设定请求缓

client_max_body_size 8m; #设定请求缓

sendfile on; #开启高效文件传输模式,sendfile指令指定nginx是否调用sendfile函数来输出文件,对于普通应用设为 on,如果用来进行下载等应用磁盘IO重负载应用,可设置为off,以平衡磁盘与网络I/O处理速度,降低系统的负载。注意:如果图片显示不正常把这个改成off。

autoindex on; #开启目录列表访问,合适下载服务器,默认关闭。

tcp_nopush on; #防止网络阻塞

tcp_nodelay on; #防止网络阻塞

keepalive_timeout 120; #长连接超时时间,单位是秒

 

#FastCGI相关参数是为了改善网站的性能:减少资源占用,提高访问速度。下面参数看字面意思都能理解。

fastcgi_connect_timeout 300;

fastcgi_send_timeout 300;

fastcgi_read_timeout 300;

fastcgi_buffer_size 64k;

fastcgi_buffers 4 64k;

fastcgi_busy_buffers_size 128k;

fastcgi_temp_file_write_size 128k;

 

#gzip模块设置

gzip on; #开启gzip压缩输出

gzip_min_length 1k; #最小压缩文件大小

gzip_buffers 4 16k; #压缩缓冲区

gzip_http_version 1.0; #压缩版本(默认1.1,前端如果是squid2.5请使用1.0)

gzip_comp_level 2; #压缩等级

gzip_types text/plain application/xjavascript text/css application/xml;

#压缩类型,默认就已经包含text/html,所以下面就不用再写了,写上去也不会有问题,但是会有一个warn。

gzip_vary on;

#limit_zone crawler $binary_remote_addr 10m; #开启限制IP连接数的时候需要使用

 

upstream blog.ha97.com {

#upstream的负载均衡,weight是权重,可以根据机器配置定义权重。weigth参数表示权值,权值越高被分配到的几率越大。

server 192.168.80.121:80 weight=3;

server 192.168.80.122:80 weight=2;

server 192.168.80.123:80 weight=3;

}

 

#虚拟主机的配置

server

{

    #监听端口

    listen 80;

    #域名可以有多个,用空格隔开

    server_name www.ha97.com ha97.com;

    index index.html index.htm index.php;

    root /data/www/ha97;

    location ~ .*\.(php|php5)?$

    {

    fastcgi_pass 127.0.0.1:9000;

    fastcgi_index index.php;

    include fastcgi.conf;

    }

    #图片缓存时间设置

    location ~ .*\.(gif|jpg|jpeg|png|bmp|swf)$

    {

    expires 10d;

    }

    #JS和CSS缓存时间设置

    location ~ .*\.(js|css)?$

    {

    expires 1h;

    }

    #日志格式设定

    log_format access ‘$remote_addr – $remote_user [$time_local] “$request” ‘

    ‘$status $body_bytes_sent “$http_referer” ‘

    ‘”$http_user_agent” $http_x_forwarded_for’;

    #定义本虚拟主机的访问日志

    access_log /var/log/nginx/ha97access.log access;

 

    #对 “/” 启用反向代理

    location / {

    proxy_pass http://127.0.0.1:88;

    proxy_redirect off;

    proxy_set_header XRealIP $remote_addr;

    #后端的Web服务器可以通过X-Forwarded-For获取用户真实IP

    proxy_set_header XForwardedFor $proxy_add_x_forwarded_for;

    #以下是一些反向代理的配置,可选。

    proxy_set_header Host $host;

    client_max_body_size 10m; #允许客户端请求的最大单文件字节数

    client_body_buffer_size 128k; #缓冲区代理缓冲用户端请求的最大字节数,

    proxy_connect_timeout 90; #nginx跟后端服务器连接超时时间(代理连接超时)

    proxy_send_timeout 90; #后端服务器数据回传时间(代理发送超时)

    proxy_read_timeout 90; #连接成功后,后端服务器响应时间(代理接收超时)

    proxy_buffer_size 4k; #设置代理服务器(nginx)保存用户头信息的缓冲区大小

    proxy_buffers 4 32k; #proxy_buffers缓冲区,网页平均在32k以下的设置

    proxy_busy_buffers_size 64k; #高负荷下缓冲大小(proxy_buffers*2)

    proxy_temp_file_write_size 64k;

    #设定缓存文件夹大小,大于这个值,将从upstream服务器传

    }

 

    #设定查看Nginx状态的地址

    location /NginxStatus {

    stub_status on;

    access_log on;

    auth_basic “NginxStatus”;

    auth_basic_user_file conf/htpasswd;

    #htpasswd文件的内容可以用apache提供的htpasswd工具来产生。

    }

 

    #本地动静分离反向代理配置

    #所有jsp的页面均交由tomcat或resin处理

    location ~ .(jsp|jspx|do)?$ {

    proxy_set_header Host $host;

    proxy_set_header XRealIP $remote_addr;

    proxy_set_header XForwardedFor $proxy_add_x_forwarded_for;

    proxy_pass http://127.0.0.1:8080;

    }

    #所有静态文件由nginx直接读取不经过tomcat或resin

    location ~ .*.(htm|html|gif|jpg|jpeg|png|bmp|swf|ioc|rar|zip|txt|flv|mid|doc|ppt|pdf|xls|mp3|wma)$

    { expires 15d; }

    location ~ .*.(js|css)?$

    { expires 1h; }

}

}

12321
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |