From 0f658c724ac6160954393d26f658132a818095cc Mon Sep 17 00:00:00 2001 From: kuaifan Date: Tue, 8 Jun 2021 10:39:31 +0800 Subject: [PATCH] no message --- app/Http/Controllers/IndexController.php | 19 ++++++++ app/Models/WebSocket.php | 18 +++----- app/Module/Base.php | 16 +++++-- app/Tasks/DeleteTmpTask.php | 59 ++++++++++++++++++++++++ app/Tasks/PushTask.php | 14 +----- app/Tasks/WebSocketDialogMsgTask.php | 18 ++++++-- docker/crontab/crontab.sh | 2 +- 7 files changed, 113 insertions(+), 33 deletions(-) create mode 100644 app/Tasks/DeleteTmpTask.php diff --git a/app/Http/Controllers/IndexController.php b/app/Http/Controllers/IndexController.php index 777bca95..25edaef1 100755 --- a/app/Http/Controllers/IndexController.php +++ b/app/Http/Controllers/IndexController.php @@ -3,6 +3,8 @@ namespace App\Http\Controllers; use App\Module\Base; +use App\Tasks\DeleteTmpTask; +use Hhxsv5\LaravelS\Swoole\Task\Task; use Redirect; @@ -42,4 +44,21 @@ class IndexController extends InvokeController { return Redirect::to(Base::fillUrl('docs/index.html'), 301); } + + /** + * 系统定时任务,限制内网访问(1分钟/次) + * @return string + */ + public function crontab() + { + if (!Base::is_internal_ip()) { + // 限制内网访问 + return "Forbidden Access"; + } + // 删除过期的临时表数据 + Task::deliver(new DeleteTmpTask('wg_tmp_msgs', 1)); + Task::deliver(new DeleteTmpTask('tmp', 24)); + + return "success"; + } } diff --git a/app/Models/WebSocket.php b/app/Models/WebSocket.php index 790995d4..49827d7e 100644 --- a/app/Models/WebSocket.php +++ b/app/Models/WebSocket.php @@ -30,21 +30,17 @@ class WebSocket extends AbstractModel { /** - * 获取我自己当前以外的所有连接fd + * 获取其他fd(获取其他设备) * @return array */ - public static function getMyFd() + public static function getOtherFd($fd) { - $fd = 0; - $userid = 0; - try { - $fd = Request::header('fd'); - $userid = User::token2userid(); - } catch (\Throwable $e) { - + if (empty($fd)) { + return []; } - if ($userid && $fd) { - return self::whereUserid($userid)->where('fd', '!=', $fd)->pluck('fd')->toArray(); + $row = self::whereFd($fd)->first(); + if ($row) { + return self::whereUserid($row->userid)->where('id', '!=', $row->id)->pluck('fd')->toArray(); } return []; } diff --git a/app/Module/Base.php b/app/Module/Base.php index cb5ba5b3..dc9f636c 100755 --- a/app/Module/Base.php +++ b/app/Module/Base.php @@ -6,7 +6,6 @@ use App\Models\Setting; use App\Models\Tmp; use Cache; use Exception; -use Illuminate\Contracts\Pagination\LengthAwarePaginator; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Config; use Redirect; @@ -110,8 +109,11 @@ class Base * @param string $ip * @return bool */ - public static function is_ipv4($ip) + public static function is_ipv4($ip = '') { + if (empty($ip)) { + $ip = Base::getIp(); + } return filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) !== false; } @@ -120,8 +122,11 @@ class Base * @param string $ip * @return bool */ - public static function is_extranet_ip($ip) + public static function is_extranet_ip($ip = '') { + if (empty($ip)) { + $ip = Base::getIp(); + } if (!filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4)) { return false; } @@ -133,8 +138,11 @@ class Base * @param string $ip * @return bool */ - public static function is_internal_ip($ip) + public static function is_internal_ip($ip = '') { + if (empty($ip)) { + $ip = Base::getIp(); + } if (!filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4)) { return false; } diff --git a/app/Tasks/DeleteTmpTask.php b/app/Tasks/DeleteTmpTask.php new file mode 100644 index 00000000..503f158d --- /dev/null +++ b/app/Tasks/DeleteTmpTask.php @@ -0,0 +1,59 @@ +data = $data; + $this->hours = $hours; + } + + public function start() + { + switch ($this->data) { + /** + * 表pre_wg_tmp_msgs + */ + case 'wg_tmp_msgs': + { + WebSocketTmpMsg::where('created_at', '<', Carbon::now()->subHours($this->hours)->toDateTimeString()) + ->orderBy('id') + ->chunk(500, function ($msgs) { + foreach ($msgs as $msg) { + $msg->delete(); + } + }); + } + break; + + /** + * 表pre_wg_tmp + */ + case 'tmp': + { + Tmp::where('created_at', '<', Carbon::now()->subHours($this->hours)->toDateTimeString()) + ->orderBy('id') + ->chunk(2000, function ($tmps) { + foreach ($tmps as $tmp) { + $tmp->delete(); + } + }); + } + break; + } + } +} diff --git a/app/Tasks/PushTask.php b/app/Tasks/PushTask.php index 79634f7e..5e9cad89 100644 --- a/app/Tasks/PushTask.php +++ b/app/Tasks/PushTask.php @@ -99,9 +99,8 @@ class PushTask extends AbstractTask * @param string|int $key 延迟推送key依据,留空立即推送(延迟推送时发给同一人同一种消息类型只发送最新的一条) * @param int $delay 延迟推送时间,默认:1秒($key填写时有效) * @param bool $retryOffline 如果会员不在线,等上线后继续发送 - * @param bool $andMyself 同时发送给自己其他设备 */ - public static function push(array $lists, $key = '', $delay = 1, $retryOffline = true, $andMyself = false) + public static function push(array $lists, $key = '', $delay = 1, $retryOffline = true) { if (!is_array($lists) || empty($lists)) { return; @@ -127,7 +126,7 @@ class PushTask extends AbstractTask } // 发送对象 $offline_user = []; - $array = $andMyself ? WebSocket::getMyFd() : []; + $array = []; if ($fd) { if (is_array($fd)) { $array = array_merge($array, $fd); @@ -184,13 +183,4 @@ class PushTask extends AbstractTask { self::push($lists, $key, $delay, false); } - - /** - * 推送消息(同时发送给自己其他设备) - * @param array $lists 消息列表 - */ - public static function pushM(array $lists, $key = '', $delay = 1) - { - self::push($lists, $key, $delay, false, true); - } } diff --git a/app/Tasks/WebSocketDialogMsgTask.php b/app/Tasks/WebSocketDialogMsgTask.php index f0888107..a49aa827 100644 --- a/app/Tasks/WebSocketDialogMsgTask.php +++ b/app/Tasks/WebSocketDialogMsgTask.php @@ -2,8 +2,10 @@ namespace App\Tasks; +use App\Models\WebSocket; use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsgRead; +use Request; @error_reporting(E_ALL & ~E_NOTICE); @@ -17,16 +19,19 @@ class WebSocketDialogMsgTask extends AbstractTask { protected $userid; protected $dialogMsgArray; + protected $currentFd; /** * WebSocketDialogMsgTask constructor. - * @param $userid - * @param array $dialogMsgArray + * @param int|array $userid 发送对象ID 或 ID组 + * @param array $dialogMsgArray 发送的内容 + * @param null $currentFd 当前发送会员的 websocket fd (用于给其他设备发送消息,留空通过header获取) */ - public function __construct($userid, array $dialogMsgArray) + public function __construct($userid, array $dialogMsgArray, $currentFd = null) { $this->userid = $userid; $this->dialogMsgArray = $dialogMsgArray; + $this->currentFd = $currentFd ?: Request::header('fd'); } public function start() @@ -38,6 +43,7 @@ class WebSocketDialogMsgTask extends AbstractTask if (empty($userids) || empty($msgId)) { return; } + // 推送目标 $pushIds = []; foreach ($userids AS $userid) { $msgRead = WebSocketDialogMsgRead::createInstance([ @@ -52,6 +58,7 @@ class WebSocketDialogMsgTask extends AbstractTask // } } + $fd = WebSocket::getOtherFd($this->currentFd); // 更新已发送数量 if ($send != count($pushIds)) { $send = WebSocketDialogMsgRead::whereMsgId($msgId)->count(); @@ -59,9 +66,10 @@ class WebSocketDialogMsgTask extends AbstractTask $this->dialogMsgArray['send'] = $send; } // 开始推送消息 - if ($pushIds) { - PushTask::pushM([ + if ($pushIds || $fd) { + PushTask::push([ 'userid' => $pushIds, + 'fd' => $fd, 'msg' => [ 'type' => 'dialog', 'mode' => 'add', diff --git a/docker/crontab/crontab.sh b/docker/crontab/crontab.sh index 44dba911..5003848a 100755 --- a/docker/crontab/crontab.sh +++ b/docker/crontab/crontab.sh @@ -1,3 +1,3 @@ #!/bin/sh -curl "http://127.0.0.1:20000/api/publish/crontab" >> /dev/null 2>&1 +curl "http://127.0.0.1:20000/crontab" >> /dev/null 2>&1