no message

This commit is contained in:
kuaifan 2021-06-08 10:39:31 +08:00
parent 7c6dd3a45b
commit 0f658c724a
7 changed files with 113 additions and 33 deletions

View File

@ -3,6 +3,8 @@
namespace App\Http\Controllers;
use App\Module\Base;
use App\Tasks\DeleteTmpTask;
use Hhxsv5\LaravelS\Swoole\Task\Task;
use Redirect;
@ -42,4 +44,21 @@ class IndexController extends InvokeController
{
return Redirect::to(Base::fillUrl('docs/index.html'), 301);
}
/**
* 系统定时任务限制内网访问1分钟/次)
* @return string
*/
public function crontab()
{
if (!Base::is_internal_ip()) {
// 限制内网访问
return "Forbidden Access";
}
// 删除过期的临时表数据
Task::deliver(new DeleteTmpTask('wg_tmp_msgs', 1));
Task::deliver(new DeleteTmpTask('tmp', 24));
return "success";
}
}

View File

@ -30,21 +30,17 @@ class WebSocket extends AbstractModel
{
/**
* 获取我自己当前以外的所有连接fd
* 获取其他fd获取其他设备
* @return array
*/
public static function getMyFd()
public static function getOtherFd($fd)
{
$fd = 0;
$userid = 0;
try {
$fd = Request::header('fd');
$userid = User::token2userid();
} catch (\Throwable $e) {
if (empty($fd)) {
return [];
}
if ($userid && $fd) {
return self::whereUserid($userid)->where('fd', '!=', $fd)->pluck('fd')->toArray();
$row = self::whereFd($fd)->first();
if ($row) {
return self::whereUserid($row->userid)->where('id', '!=', $row->id)->pluck('fd')->toArray();
}
return [];
}

View File

@ -6,7 +6,6 @@ use App\Models\Setting;
use App\Models\Tmp;
use Cache;
use Exception;
use Illuminate\Contracts\Pagination\LengthAwarePaginator;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Config;
use Redirect;
@ -110,8 +109,11 @@ class Base
* @param string $ip
* @return bool
*/
public static function is_ipv4($ip)
public static function is_ipv4($ip = '')
{
if (empty($ip)) {
$ip = Base::getIp();
}
return filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) !== false;
}
@ -120,8 +122,11 @@ class Base
* @param string $ip
* @return bool
*/
public static function is_extranet_ip($ip)
public static function is_extranet_ip($ip = '')
{
if (empty($ip)) {
$ip = Base::getIp();
}
if (!filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4)) {
return false;
}
@ -133,8 +138,11 @@ class Base
* @param string $ip
* @return bool
*/
public static function is_internal_ip($ip)
public static function is_internal_ip($ip = '')
{
if (empty($ip)) {
$ip = Base::getIp();
}
if (!filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4)) {
return false;
}

View File

@ -0,0 +1,59 @@
<?php
namespace App\Tasks;
use App\Models\Tmp;
use App\Models\WebSocketTmpMsg;
use Carbon\Carbon;
/**
* 删除过期临时数据任务
* Class DeleteTmpTask
* @package App\Tasks
*/
class DeleteTmpTask extends AbstractTask
{
protected $data;
protected $hours; // 多久后删除,单位小时
public function __construct(string $data, int $hours)
{
$this->data = $data;
$this->hours = $hours;
}
public function start()
{
switch ($this->data) {
/**
* 表pre_wg_tmp_msgs
*/
case 'wg_tmp_msgs':
{
WebSocketTmpMsg::where('created_at', '<', Carbon::now()->subHours($this->hours)->toDateTimeString())
->orderBy('id')
->chunk(500, function ($msgs) {
foreach ($msgs as $msg) {
$msg->delete();
}
});
}
break;
/**
* 表pre_wg_tmp
*/
case 'tmp':
{
Tmp::where('created_at', '<', Carbon::now()->subHours($this->hours)->toDateTimeString())
->orderBy('id')
->chunk(2000, function ($tmps) {
foreach ($tmps as $tmp) {
$tmp->delete();
}
});
}
break;
}
}
}

View File

@ -99,9 +99,8 @@ class PushTask extends AbstractTask
* @param string|int $key 延迟推送key依据留空立即推送延迟推送时发给同一人同一种消息类型只发送最新的一条
* @param int $delay 延迟推送时间默认1秒$key填写时有效
* @param bool $retryOffline 如果会员不在线,等上线后继续发送
* @param bool $andMyself 同时发送给自己其他设备
*/
public static function push(array $lists, $key = '', $delay = 1, $retryOffline = true, $andMyself = false)
public static function push(array $lists, $key = '', $delay = 1, $retryOffline = true)
{
if (!is_array($lists) || empty($lists)) {
return;
@ -127,7 +126,7 @@ class PushTask extends AbstractTask
}
// 发送对象
$offline_user = [];
$array = $andMyself ? WebSocket::getMyFd() : [];
$array = [];
if ($fd) {
if (is_array($fd)) {
$array = array_merge($array, $fd);
@ -184,13 +183,4 @@ class PushTask extends AbstractTask
{
self::push($lists, $key, $delay, false);
}
/**
* 推送消息(同时发送给自己其他设备)
* @param array $lists 消息列表
*/
public static function pushM(array $lists, $key = '', $delay = 1)
{
self::push($lists, $key, $delay, false, true);
}
}

View File

@ -2,8 +2,10 @@
namespace App\Tasks;
use App\Models\WebSocket;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogMsgRead;
use Request;
@error_reporting(E_ALL & ~E_NOTICE);
@ -17,16 +19,19 @@ class WebSocketDialogMsgTask extends AbstractTask
{
protected $userid;
protected $dialogMsgArray;
protected $currentFd;
/**
* WebSocketDialogMsgTask constructor.
* @param $userid
* @param array $dialogMsgArray
* @param int|array $userid 发送对象ID ID组
* @param array $dialogMsgArray 发送的内容
* @param null $currentFd 当前发送会员的 websocket fd 用于给其他设备发送消息留空通过header获取
*/
public function __construct($userid, array $dialogMsgArray)
public function __construct($userid, array $dialogMsgArray, $currentFd = null)
{
$this->userid = $userid;
$this->dialogMsgArray = $dialogMsgArray;
$this->currentFd = $currentFd ?: Request::header('fd');
}
public function start()
@ -38,6 +43,7 @@ class WebSocketDialogMsgTask extends AbstractTask
if (empty($userids) || empty($msgId)) {
return;
}
// 推送目标
$pushIds = [];
foreach ($userids AS $userid) {
$msgRead = WebSocketDialogMsgRead::createInstance([
@ -52,6 +58,7 @@ class WebSocketDialogMsgTask extends AbstractTask
//
}
}
$fd = WebSocket::getOtherFd($this->currentFd);
// 更新已发送数量
if ($send != count($pushIds)) {
$send = WebSocketDialogMsgRead::whereMsgId($msgId)->count();
@ -59,9 +66,10 @@ class WebSocketDialogMsgTask extends AbstractTask
$this->dialogMsgArray['send'] = $send;
}
// 开始推送消息
if ($pushIds) {
PushTask::pushM([
if ($pushIds || $fd) {
PushTask::push([
'userid' => $pushIds,
'fd' => $fd,
'msg' => [
'type' => 'dialog',
'mode' => 'add',

View File

@ -1,3 +1,3 @@
#!/bin/sh
curl "http://127.0.0.1:20000/api/publish/crontab" >> /dev/null 2>&1
curl "http://127.0.0.1:20000/crontab" >> /dev/null 2>&1