架构

分布式锁之 Apache Curator InterProcessMutex

分布式锁之 Apache Curator InterProcessMutex

对分布式锁耳熟能详。不过,一直关注的是基于 Redis 实现的分布式锁。知道 ZooKeeper 也可以实现分布式锁。但是,原来的想法是把 Redis 那个思路切换到 ZooKeeper 上来实现就好。今天了解到 Apache Curator 内置了分布式锁的实现: InterProcessMutex。查看了一下源码实现,发现跟基于 Redis 实现的源码相比,在思路上还是有很大不同的。所以,特别作文记录一下。 先来看一下,整体流程: 结合流程图和源码,加锁的过程是这样的: 先判断本地是否有锁数据,如果有则对锁定次数自增一下,然后返回 true; 如果没有锁数据,则尝试获取锁: 在指定路径下创建临时顺序节点 获取指定路径下,所有节点,检查自身是否是序号最小的节点: 如果自身序号最小,则获得锁;否则 如果自身不是序号最小的节点,则通过 while 自旋 + wait(times) 不断尝试获取锁,直到成功。 获得锁后,把锁信息缓存在本地 ConcurrentMap<Thread, LockData> threadData 变量中,方便计算重入。 在 ZooKeeper 中的结构大致如下: 下面我们逐个方法进行分析说明。先来看一下 InterProcessMutex 的注释: /** * A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that * use the same lock path will achieve an inter-process critical section. Further, this mutex is * "fair" - each user will get the mutex in the order requested (from ZK's point of view) */ public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
Spring 扩展点实践:整合 Apache Dubbo(二)

Spring 扩展点实践:整合 Apache Dubbo(二)

D瓜哥
在 Spring 扩展点实践:整合 Apache Dubbo(一) 中,D瓜哥介绍了 Dubbo 如何使用 Spring 的插件机制与 Spring 整合。限于篇幅原因,上一篇文章只介绍到了服务提供者的注册。本篇文章继续上一篇文章的主题,继续介绍 Spring 与 Dubbo 的整合过程。先来讲解一下服务消费者的生成过程。 Dubbo 生成服务消费者的过程 先来看看 XML 配置文件: dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="demo-consumer"/> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/> </beans> 我们先看一下 ReferenceBean 类的声明: org.apache.dubbo.config.spring.ReferenceBean public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean { // 此处省略 N 行代码 @Override public Object getObject() { return get(); } // 此处省略 N 行代码 @Override @SuppressWarnings({"unchecked"}) public void afterPropertiesSet() throws Exception { // Initializes Dubbo's Config Beans before @Reference bean autowiring prepareDubboConfigBeans(); // lazy init by default. if (init == null) { init = false; } // eager init if necessary. if (shouldInit()) { getObject(); } } // 此处省略 N 行代码 } 这个类实现了 FactoryBean 接口,D瓜哥在 Spring 扩展点概览及实践:FactoryBean 中对 FactoryBean 介绍。所以,请在上面的 getObject() 打个断点。 另外,这个类还实现了 InitializingBean,D瓜哥在 Spring Bean 生命周期概述 中介绍了这个接口的用途。不了解的,请移步。
Spring 扩展点实践:整合 Apache Dubbo(一)

Spring 扩展点实践:整合 Apache Dubbo(一)

D瓜哥
在上一篇文章 Spring 扩展点概览及实践 中介绍了 Spring 内部存在的扩展点。 Spring 扩展点实践:整合 MyBATIS 中,D瓜哥带大家了解了一下 MyBATIS 如何利用 Spring 的扩展点实现了与 Spring 的完美整合。现在,学以致用,我们继续来分析一下 Spring 与 Apache Dubbo 的整合流程。 示例程序 Apache Dubbo 仓库中就有很完整的示例。D瓜哥直接拿来使用就不再搭建示例程序了。 首先,需要启动一个 ZooKeeper 实例。查看 Dubbo 的依赖可以看出,最新版代码依赖的 ZooKeeper 是 3.4.13 版。所以,为了最好的兼容性,就要选用 3.4.X 版的 ZooKeeper 服务器。D瓜哥直接使用 Docker 启动 ZooKeeper 了。命令如下: docker run --rm --name zookeeper -d -p 2181:2181 zookeeper:3.4.14 这次我们使用 Apache Dubbo 的 dubbo-demo/dubbo-demo-xml 示例。 第二步,启动服务提供者程序,找到 DUBBO/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/Application.java,运行该类。 第三步,运行服务消费者程序,找到 DUBBO/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/java/org/apache/dubbo/demo/consumer/Application.java,运行该类。 如果没有任何错误,则在终端可以看到 result: async result 输出。 在开始正餐之前,D瓜哥先给大家来个开胃菜。 Spring 插件机制简介 不知道大家有没有想过一个问题:Spring 框架是如何支持越来越多的功能的? 在D瓜哥了解到 Spring 的插件机制后,非常叹服 Spring 精巧的设计和灵活的扩展性。闲言少叙,好戏上演。
Redis 核心数据结构(二)

Redis 核心数据结构(二)

D瓜哥
本文内容对于 Redis 7+ 来说已经过时,最新实现请看下面两篇文章: Redis 核心数据结构(3) Redis 核心数据结构(4) 在上一篇文章: Redis 核心数据结构(1) 中,介绍了链表、ziplist、quicklist 数据结构。这篇文章,来介绍一下 skiplist、dict。 skiplist 跳跃表是一种有序数据结构,支持平均 O(logN)、最坏 O(N) 复杂度的节点查找;大部分情况效率可以和平衡树相媲美,实现却比平衡树简单。 跳跃表就是 Redis 中有序集合键的底层实现之一。 server.h typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode; typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist; typedef struct zset { dict *dict; zskiplist *zsl; } zset; skiplist,顾名思义,首先它是一个list。实际上,它是在有序链表的基础上发展起来的。 当我们想查找数据的时候,可以先沿着跨度大的链进行查找。当碰到比待查数据大的节点时,再回到跨度小的链表中进行查找。 skiplist正是受这种多层链表的想法的启发而设计出来的。按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(logN)。但是,存在的一个问题是:如果插入新节点后就会打乱上下相邻两层节点是 2:1 的对应关系。如果要维持,则需要调整后面所有的节点。 skiplist为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是为每个节点随机出一个层数(level)。 插入操作只需要修改插入节点前后的指针,而不需要对很多节点都进行调整。这就降低了插入操作的复杂度。实际上,这是 skiplist 的一个很重要的特性,这让它在插入性能上明显优于平衡树的方案。 skiplist,翻译成中文,可以翻译成“跳表”或“跳跃表”,指的就是除了最下面第1层链表之外,它会产生若干层稀疏的链表,这些链表里面的指针故意跳过了一些节点(而且越高层的链表跳过的节点越多)。这就使得我们在查找数据的时候能够先在高层的链表中进行查找,然后逐层降低,最终降到第1层链表来精确地确定数据位置。在这个过程中,我们跳过了一些节点,从而也就加快了查找速度。 在中间插入一个有比较高 Level 的节点,如何维护前面节点到这个节点的这些链接? 在平衡树种,如何做范围查找?先确定边界,然后其他节点怎么查找? skiplist 中 key 允许重复。 在比较时,不仅比较分数(即key),还要比较数据自身。 第一层链表是双向链表,并且反向指针只有一个。 在 skiplist 中可以很方便计算每个元素的排名。 Redis 中的有序集合(sorted set),是在 skiplist, dict 和 ziplist 基础上构建起来的: 当数据较少时,sorted set是由一个 ziplist 来实现的。其中集合元素按照分值从小到大排序。 当数据多的时候,sorted set 是由一个叫 zset 的数据结构来实现的,这个 zset 包含一个 dict + 一个 skiplist。dict 用来查询数据到分数(score)的对应关系,而 skiplist 用来根据分数查询数据(可能是范围查找)。
Redis 核心数据结构(一)

Redis 核心数据结构(一)

D瓜哥
本文内容对于 Redis 7+ 来说已经过时,最新实现请看下面两篇文章: Redis 核心数据结构(3) Redis 核心数据结构(4) Redis 目前是使用最广泛的缓存中间件。其突出特点就是支持多种常见的数据结构。对比 JDK 集合类的实现,Redis 的实现表现出很多独到之处,很多地方设计得别具匠心。下面就来简要介绍一下。 linkedlist Redis 底层也有很多地方使用到 linkedlist,并且也是双向链表。 adlist.h typedef struct listNode { struct listNode *prev; struct listNode *next; void *value; } listNode; typedef struct listIter { listNode *next; int direction; } listIter; typedef struct list { listNode *head; listNode *tail; void *(*dup)(void *ptr); void (*free)(void *ptr); int (*match)(void *ptr, void *key); unsigned long len; } list; Redis 的 linkedlist 实现特点是: 双向:节点带有前后指针; 无环:首尾没有相连,所以没有构成环状; 链表保存了首尾指针; 多态:可以保存不同类型的值,这里成为泛型也许更符合 Java 中的语义。 Redis 在 2014 年实现了 quicklist,并使用 quicklist 代替了 linkedlist。所以,现在 linkedlist 几乎已经是废弃状态。 ziplist Redis 官方在 ziplist.c 文件的注释中对 ziplist 进行了定义: The ziplist is a specially encoded dually linked list that is designed to be very memory efficient. It stores both strings and integer values, where integers are encoded as actual integers instead of a series of characters. It allows push and pop operations on either side of the list in O(1) time. However, because every operation requires a reallocation of the memory used by the ziplist, the actual complexity is related to the amount of memory used by the ziplist.
Kafka 常见面试题

Kafka 常见面试题

D瓜哥
Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。Kafka 本身设计也非常精巧,有很多关键的知识点需要注意。在面试中,也常常被问到。整理篇文章,梳理一下自己的知识点。 架构设计问题 Kafka 整体架构如下: 图 1. Kafka 架构 Kafka 架构分为以下几个部分 Producer:消息生产者,就是向 Kafka Broker 发消息的客户端。 Consumer:消息消费者,向 Kafka Broker 取消息的客户端。 Topic:可以理解为一个队列,一个 Topic 又分为一个或多个分区。 Consumer Group:这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给任意一个 Consumer)的手段。一个 Topic 可以有多个 Consumer Group。 Broker:一台 Kafka 服务器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。 Partition:为了实现扩展性,一个非常大的 Topic 可以分布到多个 Broker上,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 Consumer,Kafka 只保证按一个 Partition 中的消息的顺序,不保证一个 Topic 的整体(多个 Partition 间)的顺序。 Offset:Kafka 的存储文件都是按照 offset.Kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.Kafka 的文件即可。当然 the first offset 就是 00000000000.Kafka。
Spring AOP 处理流程概述

Spring AOP 处理流程概述

D瓜哥
AOP 是 Spring 框架的最核心的两个功能之一,在前面的 Spring 启动流程概述 和 Spring Bean 生命周期概述 两篇文章中,分别介绍了 Spring 启动过程和 Spring Bean 的生命周期,对 IoC 有了一个细致介绍。这里来细致分析一下 Spring AOP 的实现原理和处理流程。 基本概念 先来了解几个基本概念,D瓜哥私以为这些概念是 AOP 中最核心的内容,了解了基本概念,可以说基本上掌握了一半的 AOP 内容。 学习概念最权威的地方,当然就是官方文档。所以,这些概念可以在 Spring Framework Documentation: AOP Concepts 中看到最权威的介绍。 Join point(连接点): 所谓的连接点是指那些被拦截到的点。在 Spring 中,连接点指的是方法,因为 Spring 只支持方法类型的连接点。在 Spring 中,使用 Pointcut(切入点): 所谓的切入点,是指要对哪些 Join point(连接点) 进行拦截的定义。如果 Join point(连接点) 是全集,那么 Pointcut(切入点) 就是被选中的子集。写 AOP 代码的时候,一般是用 Pointcut(切入点) 表达式进行对 Join point(连接点) 进行选择。 Advice(通知/增强): 所谓的通知就是指拦截到 Join point(连接点) 之后所要做的事情。通知根据作用位置不同,又细分为: Before advice(前置通知): 在 Join point(连接点) 之前运行的通知。这种通知,不能阻止执行流程继续到 Join point(连接点)。 After returning advice(后置通知): 在 Join point(连接点) 之后运行的通知。当然,如果在 Join point(连接点) 执行过程中,抛出异常,则可能就不执行了。 After throwing advice(异常通知): 方法抛出异常后,将会执行的通知。 After (finally) advice(最终通知): 无论如何都会执行的通知,即使抛出异常。 Around advice(环绕通知): 围绕在 Join point(连接点) 的通知,方法执行前和执行后,都可以执行自定义行为。同时,也可以决定是返回 Join point(连接点) 的返回值,还是返回自定义的返回值。
Spring Bean 生命周期概述

Spring Bean 生命周期概述

D瓜哥
在 Spring 启动流程概述 中,分析了 Spring 的启动流程。本文就来说明一下 Spring Bean 整个生命周期。如果有不清楚的地方,可以参考上文的“附录:启动日志”。 直接上图:Spring Bean 生命周期流程图。内容较多,图片文字偏小,请放大看(矢量图,可以任意放大): 图 1. Spring Bean 生命周期流程图 下面是文字说明。 Bean 生命周期简述 调用 InstantiationAwareBeanPostProcessor#postProcessBeforeInstantiation,主要是判断 AnnotationAwareAspectJAutoProxyCreator 是否可以生成代理。 调用构造函数 调用 MergedBeanDefinitionPostProcessor#postProcessMergedBeanDefinition,主要是通过 CommonAnnotationBeanPostProcessor、 AutowiredAnnotationBeanPostProcessor 收集依赖信息。 InstantiationAwareBeanPostProcessor#postProcessAfterInstantiation,这步什么也没做。 调用 InstantiationAwareBeanPostProcessor#postProcessProperties,主要是完成依赖注入。 调用 AutowiredAnnotationBeanPostProcessor#setBeanFactory,注入 BeanFactory 等相关信息。 调用 BeanPostProcessor#postProcessBeforeInitialization,主要是注入 ApplicationContext 等相关信息。 调用 InitializingBean#afterPropertiesSet、 init-method 方法 调用 BeanPostProcessor#postProcessAfterInitialization,主要是生成 AOP 代理类。 Bean 生命周期详解 从 getBean() 方法获取 Bean 时,如果缓存中没有对应的 Bean,则会创建 Bean,整个流程如下: InstantiationAwareBeanPostProcessor#postProcessBeforeInstantiation — 目前有如下四个: ImportAwareBeanPostProcessor — 继承父类实现,无所事事。 AnnotationAwareAspectJAutoProxyCreator — 继承父类实现,判断是否属于基础切面类,如果有指定的 Target 则生成代理。 CommonAnnotationBeanPostProcessor — 无所事事。 AutowiredAnnotationBeanPostProcessor — 继承父类实现,无所事事。
Spring 启动流程概述

Spring 启动流程概述

D瓜哥
对于 Spring 启动流程和 Bean 的生命周期,总有一些小地方搞的不是很清楚,干脆直接通过修改代码增加日志输出,使用断点单步调试,把整个流程捋顺了一点点的。 除了加载配置文件或者基础配置类外,Spring 的启动过程几乎都被封装在 AbstractApplicationContext#refresh 方法中,可以说弄清楚了这个方法的执行过程,就摸清楚了 Spring 启动全流程,下面的流程分析也是以这个方法为骨架来展开的。 流程概要 下面完整流程有些太复杂,所以,提炼一个简要的过程,方便糊弄面试官,哈哈哈😆 创建容器,读取 applicationContext.register(Config.class) 指定的配置。 准备 BeanFactory,注册容器本身和 BeanFactory 实例,以及注册环境配置信息等。 执行 BeanDefinitionRegistryPostProcessor#postProcessBeanDefinitionRegistry 注册 BeanDefinition。有三点需要注意: 目前只有一个 ConfigurationClassPostProcessor 实现类,Spring 中大量的 Bean 都是在这一步被该类注册到容器中的。 执行顺序是 ① PriorityOrdered ② Ordered ③ 普通的顺序来执行 在执行上一步时,如果发现注册了 BeanDefinitionRegistryPostProcessor 类型的 Bean,就会在循环里继续调用 postProcessBeanDefinitionRegistry 方法。MyBATIS 和 Spring 整合的 MapperScannerConfigurer 类就是在这一步执行的。 执行 BeanFactoryPostProcessor#postProcessBeanFactory 方法。目前只有一个 ConfigurationClassPostProcessor 实现类。 注册 CommonAnnotationBeanPostProcessor 和 AutowiredAnnotationBeanPostProcessor 为 BeanPostProcessor。 注册 ApplicationEventMulticaster,用于广播事件的。 注册 ApplicationListener 预加载以及注册所有非懒加载的 Bean 启动时序图 Spring 启动流程的时序图如下:
Spring 扩展点实践:整合 MyBATIS

Spring 扩展点实践:整合 MyBATIS

D瓜哥
在上一篇文章 Spring 扩展点概览及实践 中介绍了 Spring 内部存在的扩展点。学以致用,现在来分析一下 Spring 与 MyBATIS 的整合流程。 示例程序 为了方便分析源码,先根据官方文档 mybatis-spring – MyBatis-Spring | Getting Started 搭建起一个简单实例。 数据库方面,直接使用功能了 MySQL 示例数据库: MySQL : Employees Sample Database,需要的话,自行下载。 package com.diguage.truman.mybatis; import com.mysql.cj.jdbc.Driver; import com.zaxxer.hikari.HikariDataSource; import org.apache.ibatis.session.Configuration; import org.junit.jupiter.api.Test; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import javax.sql.DataSource; /** * @author D瓜哥, https://www.diguage.com/ * @since 2020-05-29 17:11 */ public class MybatisTest { @Test public void test() { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(Config.class); context.refresh(); EmployeesMapper employeesMapper = context.getBean(EmployeesMapper.class); Employees employees = employeesMapper.getById(10001); System.out.println(employees); } @org.springframework.context.annotation.Configuration @MapperScan(basePackages = "com.diguage.truman.mybatis") public static class Config { @Bean public DataSource dataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setDriverClassName(Driver.class.getName()); dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/employees?useUnicode=true&characterEncoding=utf-8&autoReconnectForPools=true&autoReconnect=true"); return dataSource; } @Bean public SqlSessionFactoryBean sqlSessionFactory(@Autowired DataSource dataSource) { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSource); Configuration configuration = new Configuration(); configuration.setMapUnderscoreToCamelCase(true); factoryBean.setConfiguration(configuration); return factoryBean; } } } EmployeesMapper package com.diguage.truman.mybatis; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; /** * @author D瓜哥, https://www.diguage.com/ * @since 2020-05-29 17:23 */ public interface EmployeesMapper { @Select("SELECT * FROM employees WHERE emp_no = #{id}") Employees getById(@Param("id") Integer id); } Employees package com.diguage.truman.mybatis; import java.util.Date; /** * @author D瓜哥, https://www.diguage.com/ * @since 2020-05-29 17:24 */ public class Employees { Integer empNo; Date birthDate; String firstName; String lastName; String gender; Date hireDate; @Override public String toString() { return "Employees{" + "empNo=" + empNo + ", birthDate=" + birthDate + ", firstName='" + firstName + '\'' + ", lastName='" + lastName + '\'' + ", gender='" + gender + '\'' + ", hireDate=" + hireDate + '}'; } } 整个实例代码中,只有 @MapperScan(basePackages = "com.diguage.truman.mybatis") 这个注解和 MyBATIS 的配置相关,我们就从这里开始吧。