# Check if the id match. If not, return an error code. UNLOCK_SCRIPT = b""" if redis.call("get", KEYS[1]) ~= ARGV[1] then return 1 else redis.call("del", KEYS[2]) redis.call("lpush", KEYS[2], 1) redis.call("del", KEYS[1]) return 0 end """ UNLOCK_SCRIPT_HASH = sha1(UNLOCK_SCRIPT).hexdigest()
classLock(object): """ A Lock context manager implemented via redis SETNX/BLPOP. """
def__init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True): """ :param redis_client: An instance of :class:`~StrictRedis`. :param name: The name (redis key) the lock should have. :param expire: The lock expiry time in seconds. If left at the default (None) the lock will not expire. :param id: The ID (redis value) the lock should have. A random value is generated when left at the default. Note that if you specify this then the lock is marked as "held". Acquires won't be possible. :param auto_renewal: If set to ``True``, Lock will automatically renew the lock so that it doesn't expire for as long as the lock is held (acquire() called or running in a context manager). Implementation note: Renewal will happen using a daemon thread with an interval of ``expire*2/3``. If wishing to use a different renewal time, subclass Lock, call ``super().__init__()`` then set ``self._lock_renewal_interval`` to your desired interval. :param strict: If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``. """ # ... 此处省略参数校验代码 self._client = redis_client self._expire = expire if expire isNoneelseint(expire) ifidisNone: self._id = urandom(16) elifisinstance(id, bytes): self._id = id else: raise TypeError("Incorrect type for `id`. Must be bytes not %s." % type(id)) self._name = 'lock:'+name self._signal = 'lock-signal:'+name self._lock_renewal_interval = (float(expire)*2/3 if auto_renewal elseNone) self._lock_renewal_thread = None
defacquire(self, blocking=True, timeout=None): """ :param blocking: Boolean value specifying whether lock should be blocking or not. :param timeout: An integer value specifying the maximum number of seconds to block. """ logger.debug("Getting %r ...", self._name)
if self._held: # 锁不可重入 raise AlreadyAcquired("Already acquired from this Lock instance.")
# ... 此处省略参数校验代码,如timeout不能大于锁的_expire等各种条件
busy = True blpop_timeout = timeout or self._expire or0 timed_out = False while busy: # 如果set失败则代表锁被占用,返回False busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire) if busy: if timed_out: returnFalse elif blocking: # 如果阻塞则在signal列表上监听 # 如果blpop在blpop_timeout时间内获取到信号通知的话,timeout会被设置为False timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout else: logger.debug("Failed to get %r.", self._name) returnFalse
logger.debug("Got lock for %r.", self._name) if self._lock_renewal_interval isnotNone: # 如果需要自动刷新锁,则开启刷新线程 self._start_lock_renewer() returnTrue
defextend(self, expire=None): """Extends expiration time of the lock. :param expire: New expiration time. If ``None`` - `expire` provided during lock initialization will be taken. """ # ... 此处省略参数校验代码 # 这里调用第一步提到的Lua脚本,用索引EXTEND来指定脚本,并将超时时间`expire`和自身识别 id 传入脚本。 error = _eval_script(self._client, EXTEND, self._name, args=(expire, self._id)) if error == 1: raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) elif error == 2: raise NotExpirable("Lock %s has no assigned expiration time" % self._name) elif error: raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
这个EXTEND操作的 Lua 脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
# Covers both cases when key doesn't exist and doesn't equal to lock's id # 刷新前判断锁是否属于该拥有者,只允许拥有者延长锁的TTL EXTEND_SCRIPT = b""" if redis.call("get", KEYS[1]) ~= ARGV[2] then return 1 elseif redis.call("ttl", KEYS[1]) < 0 then return 2 else redis.call("expire", KEYS[1], ARGV[1]) return 0 end """ EXTEND_SCRIPT_HASH = sha1(EXTEND_SCRIPT).hexdigest()
@staticmethod def_lock_renewer(lockref, interval, stop): """ Renew the lock key in redis every `interval` seconds for as long as `self._lock_renewal_thread.should_exit` is False. """ log = getLogger("%s.lock_refresher" % __name__) # 等待终止事件到来,否则在指定超时后返回False whilenot stop.wait(timeout=interval): log.debug("Refreshing lock") lock = lockref() # 调用这个弱引用来获取当前锁实例 if lock isNone: # 如果这个锁已经在其它线程被销毁则对应刷新线程也应该关闭 log.debug("The lock no longer exists, " "stopping lock refreshing") break lock.extend(expire=lock._expire) del lock # 删除弱引用 log.debug("Exit requested, stopping lock refreshing")
def_stop_lock_renewer(self): """ Stop the lock renewer. This signals the renewal thread and waits for its exit. """ if self._lock_renewal_thread isNoneornot self._lock_renewal_thread.is_alive(): return logger.debug("Signalling the lock refresher to stop") self._lock_renewal_stop.set() # 事件通知子线程退出 self._lock_renewal_thread.join() self._lock_renewal_thread = None logger.debug("Lock refresher has stopped")
defrelease(self): """Releases the lock, that was acquired with the same object. .. note:: If you want to release a lock that you acquired in a different place you have two choices: * Use ``Lock("name", id=id_from_other_place).release()`` * Use ``Lock("name").reset()`` """ if self._lock_renewal_thread isnotNone: # 如果有刷新线程则停止它 self._stop_lock_renewer() logger.debug("Releasing %r.", self._name) error = _eval_script(self._client, UNLOCK, self._name, self._signal, args=(self._id,)) if error == 1: raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) elif error: raise RuntimeError("Unsupported error code %s from EXTEND script." % error) else: self._delete_signal()
conn = StrictRedis() with redis_lock.Lock(conn, "name-of-the-lock"): print("Got the lock. Doing some work ...") time.sleep(5) # Eg: lock = redis_lock.Lock(conn, "name-of-the-lock") if lock.acquire(blocking=False): print("Got the lock.") else: print("Someone else has the lock.")