之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。
首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。
现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。
以下详细讲解了在多个应用服务里,怎样用数据库去实现分布式锁。
结合案例:
1.客户app取出交易(同一个客户在某一个时间点只能对某种资产做取现操作)
2.交易重试补偿(交易过程服务宕机,扫描重试补偿)
一、数据库的设计
数据库锁表的表结构如下:
field | type | comment |
---|---|---|
ID | bigint | 主键 |
OUTER_SERIAL_NO | varchar | 流水号 |
CUST_NO | char | 客户号 |
SOURCE_CODE | varchar | 锁操作 |
THREAD_NO | varchar | 线程号 |
STATUS | char | 锁状态 |
REMARK | varchar | 备注 |
CREATED_AT | timestamp | 创建时间 |
UPDATED_AT | timestamp | 更新时间 |
作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性
流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)
客户号:客户的唯一标识
锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作
线程号:当前操作线程的线程号,比如取当前线程的uuid
锁状态:P处理中,F失败,Y成功
二、代码设计
代码的目录结构如下:
主要贴一下锁操作的核心代码实现:
锁接口定义:DbLockManager.java
/** * 锁接口 br
*
* @Author fugaoyang
*
*/
public interface DbLockManager
/**
* 加锁
*/
boolean lockString outerSerialNo, String custNo, LockSource
/**
* 解锁
*/
void unLockString outerSerialNo, String custNo, LockSource source, LockStatus targetStatus
View Code
锁接口实现类:DbLockManagerImpl.java
/** *
* 数据库锁实现br
*
* @author fugaoyang
*
*/
@Service
public class DbLockManagerImpl implements DbLockManager
private final Logger LOG LoggerFactory.getLoggerthis.getClass
@Autowired
private DbSyncLockMapper lockMapper
@Transactional
public boolean lockString outerSerialNo, String custNo, LockSource
boolean isLock
TradeSyncLock lock null
try
lock lockMapper.findouterSerialNo, custNo, source.getCode
null lock
lock new TradeSyncLock
createLocklock, outerSerialNo, custNo,
int num lockMapper.insertlock
num
isLock
LOG.infoThreadLogUtils.getLogPrefix + , custNo, source.getCode
isLock
// 根据交易类型进行加锁
isLock switchSynsLocklock,
LOG.infoThreadLogUtils.getLogPrefix + , custNo, source.getCode
catch Exception e
LOG.errorThreadLogUtils.getLogPrefix + + custNo, e
isLock
@Transactional
public void unLockString outerSerialNo, String custNo, LockSource source, LockStatus targetStatus
try
TradeSyncLock lock lockMapper.findouterSerialNo, custNo, source.getCode
null lock
lockMapper.updatelock.getId, targetStatus.getName, LockStatus.P.getName,
ThreadLogUtils.getCurrThreadUuid, ThreadLogUtils.getCurrThreadUuid
LOG.infoThreadLogUtils.getLogPrefix + , custNo, source.getCode
catch Exception e
LOG.errorThreadLogUtils.getLogPrefix + , custNo, e
/**
* 匹配加锁
*/
private boolean switchSynsLockTradeSyncLock lock, LockSource
boolean isLock
switch source
WITHDRAW:
isLock tradeSynsLocklock
WITHDRAW_RETRY:
isLock retrySynsLocklock
default:
isLock
/**
* 交易同步锁
*/
private boolean tradeSynsLockTradeSyncLock lock
// 处理中的不加锁,即不执行交易操作
LockStatus.P.getName.equalslock.getStatus
int num lockMapper.updatelock.getId, LockStatus.P.getName, LockStatus.S.getName,
ThreadLogUtils.getCurrThreadUuid, null
num
/**
* 补偿同步锁
*/
private boolean retrySynsLockTradeSyncLock lock
// 处理中或处理完成的不加锁,即不执行补偿操作
LockStatus.P.getName.equalslock.getStatus LockStatus.S.getName.equalslock.getStatus
int num lockMapper.updatelock.getId, LockStatus.P.getName, LockStatus.F.getName,
ThreadLogUtils.getCurrThreadUuid, null
num
private void createLockTradeSyncLock lock, String outerSerialNo, String custNo, LockSource
lock.setOuterSerialNoouterSerialNo
lock.setCustNocustNo
lock.setSourceCodesource.getCode
lock.setThreadNoThreadLogUtils.getCurrThreadUuid
lock.setStatusLockStatus.P.getName
lock.setRemarksource.getDesc
View Code
获取当前线程号以及打印uuid工具类ThreadLogUtils.Java
/** *
* 线程处理br
*
* @author fugaoyang
*
*/
public class ThreadLogUtils
private static ThreadLogUtils instance null
private
setInstancethis
// 初始化标志
private static final Object __noop new Object
private static ThreadLocalObject __flag new InheritableThreadLocalObject
@Override
protected Object
null
// 当前线程的UUID信息,主要用于打印日志;
private static ThreadLocalString currLogUuid new InheritableThreadLocalString
@Override
protected String
UUID.randomUUID.toString/* .toUpperCase */
private static ThreadLocalString currThreadUuid new ThreadLocalString
@Override
protected String
UUIDGenerator.getUuid
public static void clearBoolean isNew
isNew
currLogUuid.remove
__flag.remove
currThreadUuid.remove
public static String
isInitialized
throw new IllegalStateException
currLogUuid.get
public static String
currThreadUuid.get
public static void
currThreadUuid.remove
public static String
isInitialized
+ getCurrLogUuid +
private static boolean
__flag.get null
/**
* 初始化上下文,如果已经初始化则返回false,否则返回truebr/
*
* @return
*/
public static boolean
isInitialized
__flag.set__noop
private static void setInstanceThreadLogUtils instance
ThreadLogUtils.instance instance
public static ThreadLogUtils
instance
View Code
两种锁的实现的大致思路如下:
1.交易同步锁
当一个客户在app取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;
当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;
当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;
2.交易重试补偿锁
1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;
2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。
上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。
后续可以优化的部分:
1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。
2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。
3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。
因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~
完整示例代码后续会更新到github。
还没有评论,来说两句吧...