本文共 9311 字,大约阅读时间需要 31 分钟。
注:本文参考http://blog.csdn.net/desilting/article/details/41280869整理。
在多线程或多进程环境中,解决互斥性问题,即资源抢占的基本方式为: 对共享资源的操作前后(进入退出临界区)加解锁,保证不同线程或进程可以互斥有序的操作资源。如ReentrantLock和synchronized。
那么在分布式环境中,为了保证不同JVM不同主机间不会出现资源抢占,那么同样只要对临界区加解锁就可以了。只是这个锁需要我们自己来实现,因为并没有如单实例中共享资源的加解锁那样好的实现。即分布式锁的实现。
分布式锁的基本条件:
- 需要有存储锁的空间,并且锁的空间是可以访问到的。
- 锁需要被唯一标识。
- 锁要有至少两种状态。
- 存储空间
锁是一个抽象的概念,锁的实现,需要依存于一个可以存储锁的空间。在多线程中是内存,在多进程中是内存或者磁盘。更重要的是,这个空间是可以被访问到的。多线程中,不同的线程都可以访问到堆中的成员变量;在多进程中,不同的进程可以访问到共享内存中的数据或者存储在磁盘中的文件。但是在分布式环境中,不同的主机很难访问对方的内存或磁盘。这就需要一个都能访问到的外部空间来作为存储空间。
最普遍的外部存储空间就是数据库了,事实上也确实有基于数据库做分布式锁(行锁、version乐观锁),如quartz集群架构中就有所使用。除此以外,还有各式缓存如Redis、Tair、Memcached、Mongodb,当然还有专门的分布式协调服务Zookeeper,甚至是另一台主机。只要可以存储数据、锁在其中可以被多主机访问到,那就可以作为分布式锁的存储空间。
- 唯一标识
不同的共享资源,必然需要用不同的锁进行保护,因此相应的锁必须有唯一的标识。在多线程环境中,锁可以是一个对象,那么对这个对象的引用便是这个唯一标识。多进程环境中,信号量在共享内存中也是由引用来作为唯一的标识。
但是如果不在内存中,失去了对锁的引用,如何唯一标识它呢?上文提到的有名信号量,便是用硬盘中的文件名作为唯一标识。
因此,在分布式环境中,只要给这个锁设定一个名称,并且保证这个名称是全局唯一的(如zk临时节点),那么就可以作为唯一标识。
- 至少两种状态
为了给临界区加锁和解锁,需要存储两种不同的状态。
如ReentrantLock中的status:0表示没有线程竞争,大于0表示有线程竞争;信号量大于0表示可以进入临界区,小于等于0则表示需要被阻塞。
因此只要在分布式环境中,锁的状态有两种或以上:如有锁、没锁;存在、不存在等等,均可以实现。
有了这三个条件,基本就可以实现一个简单的分布式锁了。
下面以数据库为例,实现一个简单的分布式锁:
数据库表,字段为锁的ID(唯一标识),锁的状态(0表示没有被锁,1表示被锁)。 伪代码为:lock = mysql.get(id);while(lock.status == 1) { sleep(100);}mysql.update(lock.status = 1);doSomething();mysql.update(lock.status = 0);
问题
以上的方式即可以实现一个粗糙的分布式锁,但是这样的实现,有没有什么问题呢?
-
问题1:锁状态判断原子性无法保证
从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。 -
问题2:网络断开或主机宕机,锁状态无法清除
假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。 -
问题3:无法保证释放的是自己上锁的那把锁
在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。
保留以上所有问题和条件,我们接下来看一些比较典型的实现方案。
1 ZooKeeper的实现
比较简单,可以利用顺序临时节点做分布式锁。
package bjsxt.zookeeper.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.List; import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; /** * 利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制,来简单实现分布式锁 * 主要思想:1、开启10个线程,在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点;2、获取disLocks节点下所有子节点,排序,如果自己的节点编号最小,则获取锁;3、否则watch排在自己前面的节点,监听到其删除后,进入第2步(重新检测排序是防止监听的节点发生连接失效,导致的节点删除情况);4、删除自身sub节点,释放连接; * @author jeffSheng * */public class DistributedLock implements Watcher{ private int threadId; private ZooKeeper zk = null; private String selfPath; private String waitPath; private String LOG_PREFIX_OF_THREAD; private static final int SESSION_TIMEOUT = 10000; private static final String GROUP_PATH = "/disLocks"; private static final String SUB_PATH = "/disLocks/sub"; private static final String CONNECTION_STRING = "192.168.98.98:2181,192.168.98.99:2181,192.168.98.100:2181"; private static final int THREAD_NUM = 10; //确保连接zk成功; private CountDownLatch connectedSemaphore = new CountDownLatch(1); //确保所有线程运行结束; private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM); private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class); public DistributedLock(int id) { this.threadId = id; LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】"; } public static void main(String[] args) { for(int i=0; i < THREAD_NUM; i++){ final int threadId = i + 1; new Thread(){ @Override public void run() { try{ DistributedLock dc = new DistributedLock(threadId); dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); //GROUP_PATH不存在的话,由一个线程创建即可; synchronized (threadSemaphore){ dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true); } dc.getLock(); } catch (Exception e){ LOG.error("【第"+threadId+"个线程】 抛出的异常:"); e.printStackTrace(); } } }.start(); } try { threadSemaphore.await(); LOG.info("所有线程运行结束!"); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 获取锁 * @return */ private void getLock() throws KeeperException, InterruptedException { selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:" + selfPath); if(checkMinPath()){ getLockSuccess(); } } /** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return */ public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException { if(zk.exists(path, needWatch)==null){ LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: " + this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT) + ", content: " + data); } return true; } /** * 创建ZK连接 * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException { zk = new ZooKeeper( connectString, sessionTimeout, this); connectedSemaphore.await(); } /** * 获取锁成功 */ public void getLockSuccess() throws KeeperException, InterruptedException { if(zk.exists( this.selfPath, false) == null){ LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."); return; } LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!"); Thread.sleep(2000); LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath); zk.delete(this.selfPath, -1); releaseConnection(); threadSemaphore.countDown(); } /** * 关闭ZK连接 */ public void releaseConnection() { if ( this.zk !=null ) { try { this.zk.close(); } catch ( InterruptedException e ) {} } LOG.info(LOG_PREFIX_OF_THREAD + "释放连接"); } /** * 检查自己是不是最小的节点 * @return */ public boolean checkMinPath() throws KeeperException, InterruptedException { ListsubNodes = zk.getChildren(GROUP_PATH, false); Collections.sort(subNodes); int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1)); switch (index){ case -1:{ LOG.error(LOG_PREFIX_OF_THREAD + "本节点已不在了..." + selfPath); return false; } case 0:{ LOG.info(LOG_PREFIX_OF_THREAD + "子节点中,我果然是老大" + selfPath); return true; } default:{ this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1); LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath); try{ zk.getData(waitPath, true, new Stat()); return false; }catch(KeeperException e){ if(zk.exists(waitPath, false) == null){ LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?"); return checkMinPath(); }else{ throw e; } } } } } @Override public void process(WatchedEvent event) { if(event == null){ return; } Event.KeeperState keeperState = event.getState(); Event.EventType eventType = event.getType(); if ( Event.KeeperState.SyncConnected == keeperState) { if ( Event.EventType.None == eventType ) { LOG.info( LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器" ); connectedSemaphore.countDown(); }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { LOG.info(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?"); try { if(checkMinPath()){ getLockSuccess(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }else if ( Event.KeeperState.Disconnected == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接" ); } else if ( Event.KeeperState.AuthFailed == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "权限检查失败" ); } else if ( Event.KeeperState.Expired == keeperState ) { LOG.info( LOG_PREFIX_OF_THREAD + "会话失效" ); } } }
2 Redis实现分布式锁
参考我的另一篇文章:《》
转载地址:https://jeffsheng.blog.csdn.net/article/details/76732261 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!