connect($host, $port); $redis->auth($password); $this->redis = $redis; } /** * 添加任务到队列 * @param $key * @param $value * @param int $sec * @return bool */ public function addDelayTask($key, $value, $sec = 10): bool { // 获取当前时间戳 $expireTime = $this->getTime($sec); // 添加到有序集合 return $this->redis->zAdd($key, $expireTime, $value) > 0; } /** * 删除延迟任务 * @param $key * @param $value * @return bool */ public function remDelayTask($key, $value): bool { // 从有序集合删除指定值 return $this->redis->zRem($key, $value) > 0; } /** * 处理延迟任务 * @param $key * @param $callback * @param int $sleep */ public function handleDelayTask($key, $callback, $sleep = 500000) { // 死循环 while (true) { // 尝试取出有序集合中第一个值 $task = $this->redis->zRange($key, 0, 0, true); if (!empty($task)) { $value = array_keys($task)[0]; $expireTime = $task[$value]; // 如果当前时间已经到达设定到执行时间 // 则开始执行逻辑,并删除当前任务 if ($this->getTime() >= $expireTime) { call_user_func($callback, $value); $this->remDelayTask($key, $value); // 直接进入下一次循环,跳过usleep continue; } } // 减少读取次数 usleep($sleep); } } /** * 添加实时任务 * @param $key * @param $value * @return false|int */ public function addTask($key, $value) { // 添加到列表 return $this->redis->lPush($key, $value); } /** * 删除实时任务 * @param $key * @param $value * @return bool|int */ public function remTask($key, $value) { // 从列表删除指定值 return $this->redis->lRem($key, $value, 0); } /** * 处理实时队列 * @param $key * @param $callback */ public function handleTask($key, $callback) { // 死循环 while (true) { // 阻塞读取列表中的值 $task = $this->redis->brPop($key, 0); // 执行用户逻辑 call_user_func($callback, $task[1]); } } /** * 获取现在时间戳(微秒) * @param int $sec * @return int */ private function getTime($sec = 0): int { return time() + $sec; } }