给大家整理了相关的编程文章,网友殳景山根据主题投稿了本篇教程内容,涉及到Semaphore共享锁、java、锁、java Semaphore共享锁相关内容,已被543网友关注,下面的电子资料对本篇知识点有更加详尽的解释。
java Semaphore共享锁
// 默认使用非公平锁实现 public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
函数名 | 说明 | 备注 |
acquire | 获取锁 | / |
release | 释放锁 | / |
// Semaphore.java public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
// AbstractQueuedSynchronizer.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果线程是中断状态,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享资源 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
// NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
// Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
// AbstractQueuedSynchronizer.java protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
// AbstractQueuedSynchronizer.java // 在队尾新建Node对象并添加 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
// AbstractQueuedSynchronizer.java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 将当前线程添加到等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // for循环自旋 for (;;) { // 获取node的前一个节点 final Node p = node.predecessor(); // 如果前一个节点是头节点 if (p == head) { // 尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) { // 获取锁成功,更新node信息设置为头节点,并通知其他节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 判断是否需要阻塞线程,设置waitStatus并阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
Semaphore.release流程相对而言,就比较简单,将release传递到AQS内部通过CAS更新许可证数量信息,更新完成后,遍历队列中Node节点,将Node waitStatus设置为0,并对对应线程执行unpark,相关代码如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 通过CAS更新许可证数量 if (compareAndSetState(current, next)) return true; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } // 许可证数量更新完成后,调用该方法唤醒线程 private void doReleaseShared() { // 自旋 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点线程抢占许可证 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }