社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

用 MySQL 实现分布式锁,你听过吗?

Java基基 • 9 月前 • 261 次点击  

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本 

来源:blog.csdn.net/linsongbin1/
article/details/79444274


概述

以前参加过一个库存系统,由于其业务复杂性,搞了很多个应用来支撑。这样的话一份库存数据就有可能同时有多个应用来修改库存数据。

比如说,有定时任务域xx.cron,和SystemA域和SystemB域这几个JAVA应用,可能同时修改同一份库存数据。如果不做协调的话,就会有脏数据出现。

对于跨JAVA进程的线程协调,可以借助外部环境,例如DB或者Redis。下文介绍一下如何使用DB来实现分布式锁

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

设计

本文设计的分布式锁的交互方式如下:

  • 根据业务字段生成transaction_id,并线程安全的创建锁资源
  • 根据transaction_id申请锁
  • 释放锁

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

动态创建锁资源

在使用synchronized关键字的时候,必须指定一个锁对象。

synchronized(obj) {


进程内的线程可以基于obj来实现同步。obj在这里可以理解为一个锁对象。如果线程要进入synchronized代码块里,必须先持有obj对象上的锁。这种锁是JAVA里面的内置锁,创建的过程是线程安全的。那么借助DB,如何保证创建锁的过程是线程安全的呢?

可以利用DB中的UNIQUE KEY特性,一旦出现了重复的key,由于UNIQUE KEY的唯一性,会抛出异常的。在JAVA里面,是SQLIntegrityConstraintViolationException异常。

create table distributed_lock
(
 id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '自增主键',
 transaction_id varchar(128NOT NULL DEFAULT '' COMMENT '事务id',
 last_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL COMMENT '最后更新时间',
 create_time TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL COMMENT '创建时间',
 UNIQUE KEY `idx_transaction_id` (`transaction_id`)
)

transaction_id是事务Id,比如说,可以用

仓库 + 条码 + 销售模式

来组装一个transaction_id,表示某仓库某销售模式下的某个条码资源。不同条码,当然就有不同的transaction_id。如果有两个应用,拿着相同的transaction_id来创建锁资源的时候,只能有一个应用创建成功。

一条distributed_lock记录插入成功了,就表示一份锁资源创建成功了。

DB连接池列表设计

在写操作频繁的业务系统中,通常会进行分库,以降低单数据库写入的压力,并提高写操作的吞吐量。如果使用了分库,那么业务数据自然也都分配到各个数据库上了。

在这种水平切分的多数据库上使用DB分布式锁,可以自定义一个DataSouce列表。并暴露一个getConnection(String transactionId)方法,按照transactionId找到对应的Connection。

实现代码如下:

package dlock;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

@Component
public class DataSourcePool {
    private List dlockDataSources = new ArrayList<>();

    @PostConstruct
    private void initDataSourceList() throws IOException {
        Properties properties = new Properties();
        FileInputStream fis = new FileInputStream("db.properties");
        properties.load(fis);

        Integer lockNum = Integer.valueOf(properties.getProperty("DLOCK_NUM"));
        for (int i = 0; i             String user = properties.getProperty("DLOCK_USER_" + i);
            String password = properties.getProperty("DLOCK_PASS_" + i);
            Integer initSize = Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_" + i));
            Integer maxSize = Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_" + i));
            String url = properties.getProperty("DLOCK_URL_" + i);

            DruidDataSource dataSource = createDataSource(user,password,initSize,maxSize,url);
            dlockDataSources.add(dataSource);
        }
    }

    private DruidDataSource createDataSource(String user, String password, Integer initSize, Integer maxSize, String url) {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername(user);
        dataSource.setPassword(password);
        dataSource.setUrl(url);
        dataSource.setInitialSize(initSize);
        dataSource.setMaxActive(maxSize);

        return dataSource;
    }

    public Connection getConnection(String transactionId) throws Exception {
        if (dlockDataSources.size() <= 0) {
            return null;
        }

        if (transactionId == null || "".equals(transactionId)) {
            throw new RuntimeException("transactionId是必须的");
        }

        int hascode = transactionId.hashCode();
        if (hascode 0) {
            hascode = - hascode;
        }

        return dlockDataSources.get(hascode % dlockDataSources.size()).getConnection();
    }
}

首先编写一个initDataSourceList方法,并利用Spring的PostConstruct注解初始化一个DataSource 列表。相关的DB配置从db.properties读取。




    
DLOCK_NUM=2

DLOCK_USER_0="user1"
DLOCK_PASS_0="pass1"
DLOCK_INIT_SIZE_0=2
DLOCK_MAX_SIZE_0=10
DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"

DLOCK_USER_1="user1"
DLOCK_PASS_1="pass1"
DLOCK_INIT_SIZE_1=2
DLOCK_MAX_SIZE_1=10
DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"

DataSource使用阿里的DruidDataSource。

接着最重要的一个实现getConnection(String transactionId)方法。实现原理很简单,获取transactionId的hashcode,并对DataSource的长度取模即可。

连接池列表设计好后,就可以实现往distributed_lock表插入数据了。

package dlock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.sql.*;

@Component
public class DistributedLock {

    @Autowired
    private DataSourcePool dataSourcePool;

    /**
     * 根据transactionId创建锁资源
     */

    public String createLock(String transactionId) throws Exception{
        if (transactionId == null) {
            throw new RuntimeException("transactionId是必须的");
        }
        Connection connection = null;
        Statement statement = null;
        try {
            connection = dataSourcePool.getConnection(transactionId);
            connection.setAutoCommit(false);
            statement = connection.createStatement();
            statement.executeUpdate("INSERT INTO distributed_lock(transaction_id) VALUES ('" + transactionId + "')");
            connection.commit();
            return transactionId;
        }
        catch (SQLIntegrityConstraintViolationException icv) {
            //说明已经生成过了。
            if (connection != null) {
                connection.rollback();
            }
            return transactionId;
        }
        catch (Exception e) {
            if (connection != null) {
                connection.rollback();
            }
            throw  e;
        }
        finally {
            if (statement != null) {
                statement.close();
            }

            if (connection != null) {
                connection.close();
            }
        }
    }
}

根据transactionId锁住线程

接下来利用DB的select for update特性来锁住线程。当多个线程根据相同的transactionId并发同时操作select for update的时候,只有一个线程能成功,其他线程都block住,直到select for update成功的线程使用commit操作后,block住的所有线程的其中一个线程才能开始干活。

我们在上面的DistributedLock类中创建一个lock方法。




    
 public boolean lock(String transactionId) throws Exception {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            connection = dataSourcePool.getConnection(transactionId);
            preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
            preparedStatement.setString(1,transactionId);
            resultSet = preparedStatement.executeQuery();
            if (!resultSet.next()) {
                connection.rollback();
                return false;
            }
            return true;
        } catch (Exception e) {
            if (connection != null) {
                connection.rollback();
            }
            throw  e;
        }
        finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }

            if (resultSet != null) {
                resultSet.close();
            }

            if (connection != null) {
                connection.close();
            }
        }
    }

实现解锁操作

当线程执行完任务后,必须手动的执行解锁操作,之前被锁住的线程才能继续干活。在我们上面的实现中,其实就是获取到当时select for update成功的线程对应的Connection,并实行commit操作即可。

那么如何获取到呢?我们可以利用ThreadLocal。首先在DistributedLock类中定义

private ThreadLocal threadLocalConn = new ThreadLocal<>();

每次调用lock方法的时候,把Connection放置到ThreadLocal里面。我们修改lock方法。

 public boolean lock(String transactionId) throws Exception {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            connection = dataSourcePool.getConnection(transactionId);
            threadLocalConn.set(connection);
            preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
            preparedStatement.setString(1,transactionId);
            resultSet = preparedStatement.executeQuery();
            if (!resultSet.next()) {
                connection.rollback();
                threadLocalConn.remove();
                return false;
            }
            return true;
        } catch (Exception e) {
            if (connection != null) {
                connection.rollback();
                threadLocalConn.remove();
            }
            throw  e;
        }
        finally {
            if  (preparedStatement != null) {
                preparedStatement.close();
            }

            if (resultSet != null) {
                resultSet.close();
            }

            if (connection != null) {
                connection.close();
            }
        }
    }

这样子,当获取到Connection后,将其设置到ThreadLocal中,如果lock方法出现异常,则将其从ThreadLocal中移除掉。

有了这几步后,我们可以来实现解锁操作了。我们在DistributedLock添加一个unlock方法。

 public void unlock() throws Exception {
        Connection connection = null;
        try {
            connection = threadLocalConn.get();
            if (!connection.isClosed()) {
                connection.commit();
                connection.close();
                threadLocalConn.remove();
            }
        } catch (Exception e) {
            if (connection != null) {
                connection.rollback();
                connection.close();
            }
            threadLocalConn.remove();
            throw e;
        }
    }

缺点

毕竟是利用DB来实现分布式锁,对DB还是造成一定的压力。当时考虑使用DB做分布式的一个重要原因是,我们的应用是后端应用,平时流量不大的,反而关键的是要保证库存数据的正确性。对于像前端库存系统,比如添加购物车占用库存等操作,最好别使用DB来实现分布式锁了。

进一步思考

如果想锁住多份数据该怎么实现?比如说,某个库存操作,既要修改物理库存,又要修改虚拟库存,想锁住物理库存的同时,又锁住虚拟库存。其实也不是很难,参考lock方法,写一个multiLock方法,提供多个transactionId的入参,for循环处理就可以了。这个后续有时间再补上。


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/169019
 
261 次点击