redis-queue/DelayTask.php

162 lines
3.5 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
/**
* Redis实现延迟消息队列
* Class Queue
*/
class Queue
{
/**
* 当前实例
* @var Queue
*/
private static $instance;
/**
* Redis实例
* @var Redis
*/
private $redis;
/**
* 单例入口
* @param mixed ...$vars
* @return Queue
*/
public static function instance(...$vars): Queue
{
if (!self::$instance) {
self::$instance = new self(...$vars);
}
return self::$instance;
}
private function __clone()
{
// 单例私有__clone方法
}
/**
* Queue constructor.
* @param $host
* @param int $port
* @param string $password
*/
private function __construct($host, $port = 6379, $password = '')
{
$redis = new Redis();
$redis->connect($host, $port);
$redis->auth($password);
$this->redis = $redis;
}
/**
* 添加任务到队列
* @param $key
* @param $value
* @param int $sec
* @return bool
*/
public function addDelayTask($key, $value, $sec = 10): bool
{
// 获取当前时间戳
$expireTime = $this->getTime($sec);
// 添加到有序集合
return $this->redis->zAdd($key, $expireTime, $value) > 0;
}
/**
* 删除延迟任务
* @param $key
* @param $value
* @return bool
*/
public function remDelayTask($key, $value): bool
{
// 从有序集合删除指定值
return $this->redis->zRem($key, $value) > 0;
}
/**
* 处理延迟任务
* @param $key
* @param $callback
* @param int $sleep
*/
public function handleDelayTask($key, $callback, $sleep = 500000)
{
// 死循环
while (true) {
// 尝试取出有序集合中第一个值
$task = $this->redis->zRange($key, 0, 0, true);
if (!empty($task)) {
$value = array_keys($task)[0];
$expireTime = $task[$value];
// 如果当前时间已经到达设定到执行时间
// 则开始执行逻辑,并删除当前任务
if ($this->getTime() >= $expireTime) {
call_user_func($callback, $value);
$this->remDelayTask($key, $value);
// 直接进入下一次循环跳过usleep
continue;
}
}
// 减少读取次数
usleep($sleep);
}
}
/**
* 添加实时任务
* @param $key
* @param $value
* @return false|int
*/
public function addTask($key, $value)
{
// 添加到列表
return $this->redis->lPush($key, $value);
}
/**
* 删除实时任务
* @param $key
* @param $value
* @return bool|int
*/
public function remTask($key, $value)
{
// 从列表删除指定值
return $this->redis->lRem($key, $value, 0);
}
/**
* 处理实时队列
* @param $key
* @param $callback
*/
public function handleTask($key, $callback)
{
// 死循环
while (true) {
// 阻塞读取列表中的值
$task = $this->redis->brPop($key, 0);
// 执行用户逻辑
call_user_func($callback, $task[1]);
}
}
/**
* 获取现在时间戳(微秒)
* @param int $sec
* @return int
*/
private function getTime($sec = 0): int
{
return time() + $sec;
}
}