当前位置:  编程技术>java/j2ee

java使用zookeeper实现的分布式锁示例

    来源: 互联网  发布时间:2014-11-06

    本文导语:  使用zookeeper实现的分布式锁分布式锁,实现了Lock接口 代码如下:package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;imp...

使用zookeeper实现的分布式锁

分布式锁,实现了Lock接口

代码如下:

package com.concurrent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
   DistributedLock lock = null;
 try {
  lock = new DistributedLock("127.0.0.1:2182","test");
  lock.lock();
  //do something...
 } catch (Exception e) {
  e.printStackTrace();
 }
 finally {
  if(lock != null)
   lock.unlock();
 }
 * @author xueliang
 *
 */
public class DistributedLock implements Lock, Watcher{
 private ZooKeeper zk;
 private String root = "/locks";//根
 private String lockName;//竞争资源的标志
 private String waitNode;//等待前一个锁
 private String myZnode;//当前锁
 private CountDownLatch latch;//计数器
 private int sessionTimeout = 30000;
 private List exception = new ArrayList();

 /**
  * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  * @param config 127.0.0.1:2181
  * @param lockName 竞争资源标志,lockName中不能包含单词lock
  */
 public DistributedLock(String config, String lockName){
  this.lockName = lockName;
  // 创建一个与服务器的连接
   try {
   zk = new ZooKeeper(config, sessionTimeout, this);
   Stat stat = zk.exists(root, false);
   if(stat == null){
    // 创建根节点
    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
   }
  } catch (IOException e) {
   exception.add(e);
  } catch (KeeperException e) {
   exception.add(e);
  } catch (InterruptedException e) {
   exception.add(e);
  }
 }

 /**
  * zookeeper节点的监视器
  */
 public void process(WatchedEvent event) {
  if(this.latch != null) { 
            this.latch.countDown(); 
        }
 }

 public void lock() {
  if(exception.size() > 0){
   throw new LockException(exception.get(0));
  }
  try {
   if(this.tryLock()){
    System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
    return;
   }
   else{
    waitForLock(waitNode, sessionTimeout);//等待锁
   }
  } catch (KeeperException e) {
   throw new LockException(e);
  } catch (InterruptedException e) {
   throw new LockException(e);
  }
 }

 public boolean tryLock() {
  try {
   String splitStr = "_lock_";
   if(lockName.contains(splitStr))
    throw new LockException("lockName can not contains \u000B");
   //创建临时子节点
   myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
   System.out.println(myZnode + " is created ");
   //取出所有子节点
   List subNodes = zk.getChildren(root, false);
   //取出所有lockName的锁
   List lockObjNodes = new ArrayList();
   for (String node : subNodes) {
    String _node = node.split(splitStr)[0];
    if(_node.equals(lockName)){
     lockObjNodes.add(node);
    }
   }
   Collections.sort(lockObjNodes);
   System.out.println(myZnode + "==" + lockObjNodes.get(0));
   if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
    //如果是最小的节点,则表示取得锁
             return true;
         }
   //如果不是最小的节点,找到比自己小1的节点
   String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
   waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
  } catch (KeeperException e) {
   throw new LockException(e);
  } catch (InterruptedException e) {
   throw new LockException(e);
  }
  return false;
 }

 public boolean tryLock(long time, TimeUnit unit) {
  try {
   if(this.tryLock()){
    return true;
   }
         return waitForLock(waitNode,time);
  } catch (Exception e) {
   e.printStackTrace();
  }
  return false;
 }

 private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
         System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
         this.latch = new CountDownLatch(1);
         this.latch.await(waitTime, TimeUnit.MILLISECONDS);
         this.latch = null;
        }
        return true;
    }

 public void unlock() {
  try {
   System.out.println("unlock " + myZnode);
   zk.delete(myZnode,-1);
   myZnode = null;
   zk.close();
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (KeeperException e) {
   e.printStackTrace();
  }
 }

 public void lockInterruptibly() throws InterruptedException {
  this.lock();
 }

 public Condition newCondition() {
  return null;
 }

 public class LockException extends RuntimeException {
  private static final long serialVersionUID = 1L;
  public LockException(String e){
   super(e);
  }
  public LockException(Exception e){
   super(e);
  }
 }

}

并发测试工具

代码如下:

package com.concurrent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
  ConcurrentTask[] task = new ConcurrentTask[5];
  for(int i=0;i


    
 
 

您可能感兴趣的文章:

  • 不太明白,利用RMI实现JAVA分布式应用 和 EJB实现JAVA分布式应用有什么区别。
  • Java分布式应用平台 SmartFrog
  • 请教高手:在基于Java的三层分布式结构应用中的打印的解决方案(套打)
  • 请问如何用JAVA开发分布式应用?送分决不食言
  • Java分布式哈希表 Bamboo Distributed Hash Table
  • 分布式 Java 服务平台 Baratine
  • 深入Java分布式计算的使用分析
  • andriod下java socket网络编程:java socket客户端服务端代码示例
  • 输出java进程的jstack信息示例分享 通过线程堆栈信息分析java线程
  • java Servlet实现Session创建存取以及url重写代码示例
  • java 四舍五入使java保留2位小数示例讲解
  • java进行error捕获和处理示例(java异常捕获)
  • java去除集合中重复元素示例分享 java去除重复
  • java读取csv文件示例分享(java解析csv文件)
  • java求三个数的最大值的示例分享
  • java生成字母数字组合的随机数示例 java生成随机数
  • java实现网页解析示例
  • java协变返回类型使用示例
  • 使用java执行定时任务示例
  • java自定义枚举转换器示例
  • java向文件末尾添加内容示例分享
  • java正则表达式获取url的host示例
  • java使用正则表达校验手机号码示例(手机号码正则)
  • java实现jframe透明窗体示例
  • java的split方法使用示例
  • java抓取网页数据示例
  • Oracle 使用Java Source 简单示例
  • java自定义日期转化类示例
  •  
    本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
    本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • Zookeeper Java客户端 zkclient
  • java命名空间java.sql类types的类成员方法: java_object定义及介绍
  • 我想学JAVA ,是买THINK IN JAVA 还是JAVA2核心技术:卷1 好???
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: imageflavor定义及介绍
  • 请问Java高手,Java的优势在那里??,Java主要适合于开发哪类应用程序
  • java命名空间java.lang.management类managementfactory的类成员方法: getcompilationmxbean定义及介绍
  • 如何将java.util.Date转化为java.sql.Date?数据库中Date类型对应于java的哪个Date呢
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getlibrarypath定义及介绍
  • java开源软件 iis7站长之家
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getstarttime定义及介绍
  • 本人想学java,请问java程序员的待遇如何,和java主要有几个比较强的方向
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: stringflavor定义及介绍
  • 我对JAVA一窍不通,可惜别人却给我一个Java的project,要我做一个安装程序,请问哪里有JAVA INSTALLER下载,而且我要不要安装java的sdk才能完成此项任务?
  • java命名空间java.security类keystore的类成员方法: getdefaulttype定义及介绍
  • 新年第一天,让我们讨论一下未来一年JAVA的发展趋势! 个人认为,JAVA将主要朝ERP和JAVA手机方面发展!
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getclasspath定义及介绍
  • 我想学Java,但不知道Java的实用的开发工具有那些,Java主要用在哪些方面,EJB到底是什么东西??
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: javaserializedobjectmimetype定义及介绍
  • redhat7.3下,java程序打印中文直接用java命令执行正常,用crontab执行java命令为乱码
  • java命名空间java.awt.datatransfer类dataflavor的类成员方法: javafilelistflavor定义及介绍
  • 各位学java的朋友,学java的未来是什么,你们学java都用来开发什么项目啊!来者给分!!
  • java命名空间java.lang.management接口runtimemxbean的类成员方法: getvmname定义及介绍
  • 请问java程序中的import为什么有的用java.….*,而有的又用java.….…,有什么区别吗?




  • 特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3