From 179a39675de491cce55ad0954f01eb4e5fd2aa24 Mon Sep 17 00:00:00 2001 From: topnuomi <1130395124@qq.com> Date: Sat, 15 May 2021 15:18:35 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DelayTask.php | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++ README.en.md | 36 ----------- README.md | 37 ------------ handle.php | 15 +++++ script.php | 20 +++++++ 5 files changed, 196 insertions(+), 73 deletions(-) create mode 100644 DelayTask.php delete mode 100644 README.en.md delete mode 100644 README.md create mode 100644 handle.php create mode 100644 script.php diff --git a/DelayTask.php b/DelayTask.php new file mode 100644 index 0000000..185761f --- /dev/null +++ b/DelayTask.php @@ -0,0 +1,161 @@ +connect($host, $port); + $redis->auth($password); + $this->redis = $redis; + } + + /** + * 添加任务到队列 + * @param $key + * @param $value + * @param int $sec + * @return bool + */ + public function addDelayTask($key, $value, $sec = 10): bool + { + // 获取当前时间戳 + $expireTime = $this->getTime($sec); + // 添加到有序集合 + return $this->redis->zAdd($key, $expireTime, $value) > 0; + } + + /** + * 删除延迟任务 + * @param $key + * @param $value + * @return bool + */ + public function remDelayTask($key, $value): bool + { + // 从有序集合删除指定值 + return $this->redis->zRem($key, $value) > 0; + } + + /** + * 处理延迟任务 + * @param $key + * @param $callback + * @param int $sleep + */ + public function handleDelayTask($key, $callback, $sleep = 500000) + { + // 死循环 + while (true) { + // 尝试取出有序集合中第一个值 + $task = $this->redis->zRange($key, 0, 0, true); + if (!empty($task)) { + $value = array_keys($task)[0]; + $expireTime = $task[$value]; + // 如果当前时间已经到达设定到执行时间 + // 则开始执行逻辑,并删除当前任务 + if ($this->getTime() >= $expireTime) { + call_user_func($callback, $value); + $this->remDelayTask($key, $value); + // 直接进入下一次循环,跳过usleep + continue; + } + } + + // 减少读取次数 + usleep($sleep); + } + } + + /** + * 添加实时任务 + * @param $key + * @param $value + * @return false|int + */ + public function addTask($key, $value) + { + // 添加到列表 + return $this->redis->lPush($key, $value); + } + + /** + * 删除实时任务 + * @param $key + * @param $value + * @return bool|int + */ + public function remTask($key, $value) + { + // 从列表删除指定值 + return $this->redis->lRem($key, $value, 0); + } + + /** + * 处理实时队列 + * @param $key + * @param $callback + */ + public function handleTask($key, $callback) + { + // 死循环 + while (true) { + // 阻塞读取列表中的值 + $task = $this->redis->brPop($key, 0); + // 执行用户逻辑 + call_user_func($callback, $task[1]); + } + } + + /** + * 获取现在时间戳(微秒) + * @param int $sec + * @return int + */ + private function getTime($sec = 0): int + { + return time() + $sec; + } + +} diff --git a/README.en.md b/README.en.md deleted file mode 100644 index 767f1a2..0000000 --- a/README.en.md +++ /dev/null @@ -1,36 +0,0 @@ -# Redis queue - -#### Description -Redis实现消息队列 - -#### Software Architecture -Software architecture description - -#### Installation - -1. xxxx -2. xxxx -3. xxxx - -#### Instructions - -1. xxxx -2. xxxx -3. xxxx - -#### Contribution - -1. Fork the repository -2. Create Feat_xxx branch -3. Commit your code -4. Create Pull Request - - -#### Gitee Feature - -1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md -2. Gitee blog [blog.gitee.com](https://blog.gitee.com) -3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore) -4. The most valuable open source project [GVP](https://gitee.com/gvp) -5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help) -6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/README.md b/README.md deleted file mode 100644 index f018c57..0000000 --- a/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# Redis queue - -#### 介绍 -Redis实现消息队列 - -#### 软件架构 -软件架构说明 - - -#### 安装教程 - -1. xxxx -2. xxxx -3. xxxx - -#### 使用说明 - -1. xxxx -2. xxxx -3. xxxx - -#### 参与贡献 - -1. Fork 本仓库 -2. 新建 Feat_xxx 分支 -3. 提交代码 -4. 新建 Pull Request - - -#### 特技 - -1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md -2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) -3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 -4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 -5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) -6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/handle.php b/handle.php new file mode 100644 index 0000000..ab234ac --- /dev/null +++ b/handle.php @@ -0,0 +1,15 @@ +handleDelayTask('order', function ($value) { + print '取出任务 ' . $value . ' [' . date('Y-m-d H:i:s') . ']' . PHP_EOL; +}); +// $queue->handleTask('order', function ($value) { +// print '执行任务 ' . $value . ' [' . date('Y-m-d H:i:s') . ']' . PHP_EOL; +// // TODO 执行具体逻辑 +// // sleep(2); +// }); diff --git a/script.php b/script.php new file mode 100644 index 0000000..e40c603 --- /dev/null +++ b/script.php @@ -0,0 +1,20 @@ +addDelayTask('order', $taskName, 10); + + // 实时任务 + // $taskName = uniqid(); + // $flag = $queue->addTask('order', json_encode([ + // 'name' => $taskName, + // ], 256)); + if ($flag) { + print '创建任务 ' . $i . ' [' . date('Y-m-d H:i:s') . ']' . PHP_EOL; + } +}