feat(网络): 重构网关socket连接与消息处理逻辑

- 重构Client类,使用事件队列管理连接和消息事件
- 增加消息队列大小限制和错误处理
- 优化数据包处理流程,减少内存拷贝
- 添加nullptr检查防止空指针访问
- 更新版本号至1.4
- 新增TimerQueue_GetTimerMess函数声明和hook
This commit is contained in:
2026-05-08 17:28:59 +08:00
parent bb062ecfb4
commit a9af16962c
5 changed files with 220 additions and 139 deletions

View File

@@ -1,3 +1,3 @@
#pragma once
#define DPS_SCRIPT_VERSION "1.3"
#define DPS_SCRIPT_VERSION "1.4"

View File

@@ -1,11 +1,20 @@
#pragma once
#include "Singleton.h"
#include "l_squirrel.h"
#include <iostream>
#include <array>
#include <asio.hpp>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <cstring>
#include <deque>
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
#include <string>
#include <unistd.h>
#include <vector>
extern HSQUIRRELVM v;
extern std::recursive_mutex SqMtx;
@@ -16,24 +25,34 @@ using asio::ip::tcp;
class Client
{
private:
asio::io_context io_context;
struct PendingEvent
{
enum class Type
{
Connect,
Message
};
Type type = Type::Message;
std::string payload;
};
private:
tcp::resolver resolver;
tcp::socket socket;
tcp::resolver::results_type endpoint;
std::array<char, 8192> buffer;
std::array<char, 8192000> PackData;
std::size_t writeDataSize = 0;
std::size_t receivedDataSize = 0;
std::size_t bufferedDataSize = 0;
std::deque<PendingEvent> pendingEvents;
std::size_t queuedMessageBytes = 0;
std::mutex pendingEventsMutex;
std::string Ip = "192.168.200.27";
// std::string Ip = "127.0.0.1";
std::string Port = "65109";
std::mutex SizeMtx;
std::mutex ClearFlagMtx;
public:
bool ConnectState = false;
bool ClearFlag = false;
int LogiThreadSize = 0;
std::fstream file;
@@ -48,24 +67,162 @@ public:
PackData.fill(0);
}
private:
enum : std::size_t
{
HeaderSize = 4,
MaxQueuedMessages = 10000,
MaxQueuedMessageBytes = 8 * 1024 * 1024
};
void ResetPacketBuffer()
{
bufferedDataSize = 0;
}
void InvokeGatewayConnectCallback()
{
std::lock_guard<std::recursive_mutex> lock(SqMtx);
SQInteger top = sq_gettop(v); // saves the stack size before the call
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1);
if (SQ_SUCCEEDED(sq_get(v, -2)))
{
sq_pushroottable(v); // push the global table as this
sq_call(v, 1, SQFalse, SQTrue); // calls the function
}
sq_settop(v, top); // restores the original stack size
}
void InvokeGatewaySocketMsg(const std::string &message)
{
std::lock_guard<std::recursive_mutex> lock(SqMtx);
SQInteger top = sq_gettop(v); // saves the stack size before the call
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("OnGatewaySocketMsg"), -1);
if (SQ_SUCCEEDED(sq_get(v, -2)))
{
sq_pushroottable(v);
sq_pushstring(v, message.c_str(), static_cast<SQInteger>(message.size()));
sq_call(v, 2, SQFalse, SQTrue);
}
sq_settop(v, top);
}
bool EnqueueConnectEvent()
{
std::lock_guard<std::mutex> lock(pendingEventsMutex);
PendingEvent event;
event.type = PendingEvent::Type::Connect;
pendingEvents.push_back(std::move(event));
return true;
}
bool EnqueueMessageEvent(const char *message, std::size_t length)
{
std::lock_guard<std::mutex> lock(pendingEventsMutex);
if (pendingEvents.size() >= MaxQueuedMessages)
{
spdlog::error("Gateway pending message queue is full. queued_events={}, limit={}",
static_cast<unsigned long long>(pendingEvents.size()),
static_cast<unsigned long long>(MaxQueuedMessages));
return false;
}
if ((queuedMessageBytes + length) > MaxQueuedMessageBytes)
{
spdlog::error("Gateway pending message bytes exceeded. queued_bytes={}, incoming={}, limit={}",
static_cast<unsigned long long>(queuedMessageBytes),
static_cast<unsigned long long>(length),
static_cast<unsigned long long>(MaxQueuedMessageBytes));
return false;
}
PendingEvent event;
event.type = PendingEvent::Type::Message;
event.payload.assign(message, length);
queuedMessageBytes += length;
pendingEvents.push_back(std::move(event));
return true;
}
void DrainPendingEvents(std::deque<PendingEvent> &events)
{
std::lock_guard<std::mutex> lock(pendingEventsMutex);
if (pendingEvents.empty())
return;
events.swap(pendingEvents);
queuedMessageBytes = 0;
}
bool AppendReceivedData(std::size_t bytes_transferred)
{
if (bytes_transferred == 0)
return true;
if ((bufferedDataSize + bytes_transferred) > PackData.size())
{
spdlog::error("Gateway receive buffer overflow. buffered={}, incoming={}, capacity={}",
static_cast<unsigned long long>(bufferedDataSize),
static_cast<unsigned long long>(bytes_transferred),
static_cast<unsigned long long>(PackData.size()));
return false;
}
std::memcpy(PackData.data() + bufferedDataSize, buffer.data(), bytes_transferred);
bufferedDataSize += bytes_transferred;
return true;
}
bool ProcessBufferedPackets()
{
std::size_t offset = 0;
while ((bufferedDataSize - offset) >= HeaderSize)
{
const int packSize = ByteLittleToInt(reinterpret_cast<unsigned char *>(PackData.data() + offset));
if (packSize <= 0)
{
spdlog::error("Gateway packet length is invalid: {}", packSize);
return false;
}
const std::size_t payloadSize = static_cast<std::size_t>(packSize);
if (payloadSize > (PackData.size() - HeaderSize))
{
spdlog::error("Gateway packet length exceeds buffer. payload={}, capacity={}",
static_cast<unsigned long long>(payloadSize),
static_cast<unsigned long long>(PackData.size() - HeaderSize));
return false;
}
if ((bufferedDataSize - offset - HeaderSize) < payloadSize)
break;
if (!EnqueueMessageEvent(PackData.data() + offset + HeaderSize, payloadSize))
return false;
offset += HeaderSize + payloadSize;
}
if (offset == 0)
return true;
const std::size_t remaining = bufferedDataSize - offset;
if (remaining > 0)
std::memmove(PackData.data(), PackData.data() + offset, remaining);
bufferedDataSize = remaining;
return true;
}
public:
void start()
{
async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &endpoint)
async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &connected_endpoint)
{
if (!error) {
ConnectState = true;
ResetPacketBuffer();
EnqueueConnectEvent();
read();
std::lock_guard<std::recursive_mutex> lock(SqMtx);
SQInteger top = sq_gettop(v); // saves the stack size before the call
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1);
if (SQ_SUCCEEDED(sq_get(v, -2)))
{ // gets the field 'foo' from the global table
sq_pushroottable(v); // push the 'this' (in this case is the global table)
sq_call(v, 1, SQFalse, SQTrue); // calls the function
}
sq_settop(v, top); // restores the original stack size
io_context.poll(); // 处理一次事件循环,避免主线程阻塞
} else {
// std::cerr << "Error connecting to server: " << error.message() << std::endl;
start();
@@ -77,16 +234,11 @@ public:
socket.async_read_some(asio::buffer(buffer), [this](const asio::error_code &error, std::size_t bytes_transferred)
{
if (!error) {
if (ClearFlag)
ClearPack();
for (std::size_t i = 0; i < bytes_transferred; ++i)
{
PackData[writeDataSize + i] = buffer[i];
}
SizeMtx.lock();
writeDataSize += bytes_transferred;
SizeMtx.unlock();
read();
if (!AppendReceivedData(bytes_transferred) || !ProcessBufferedPackets()) {
reconnect();
return;
}
read();
} else {
// std::cerr << "Error reading data: " << error.message() << std::endl;
usleep(10);
@@ -97,39 +249,27 @@ public:
void reconnect()
{
ConnectState = false;
std::cout << "服务器断开连接,尝试重连." << std::endl;
// 执行重连操作
// 关闭当前连接
socket.close();
// 重新解析端点
ResetPacketBuffer();
std::cout << "Gateway socket disconnected, reconnecting." << std::endl;
asio::error_code close_error;
socket.close(close_error);
endpoint = resolver.resolve(Ip, Port);
// 重新连接
async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &endpoint)
async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &connected_endpoint)
{
if (!error) {
ConnectState = true;
std::lock_guard<std::recursive_mutex> lock(SqMtx);
SQInteger top = sq_gettop(v); // saves the stack size before the call
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1);
if (SQ_SUCCEEDED(sq_get(v, -2)))
{ // gets the field 'foo' from the global table
sq_pushroottable(v); // push the 'this' (in this case is the global table)
sq_call(v, 1, SQFalse, SQTrue); // calls the function
}
sq_settop(v, top); // restores the original stack size
ResetPacketBuffer();
EnqueueConnectEvent();
read();
} else {
// std::cerr << "Error reconnecting to server: " << error.message() << std::endl;
// 可以在此处添加重连失败的处理逻辑
reconnect();
} });
}
void send(unsigned char *message, int Length)
{
async_write(socket, asio::buffer(message, Length), [this](const asio::error_code &error, std::size_t bytes_transferred)
std::shared_ptr<std::vector<unsigned char> > payload(new std::vector<unsigned char>(message, message + Length));
async_write(socket, asio::buffer(*payload), [payload](const asio::error_code &error, std::size_t bytes_transferred)
{
if (!error) {
// std::cout << "Message sent" << std::endl;
@@ -140,85 +280,25 @@ public:
void ClearPack()
{
SizeMtx.lock();
// 复制已读大小开始到结束到 开始 作用删除前已读大小的数据
std::copy(PackData.begin() + receivedDataSize, PackData.end(), PackData.begin());
// 将最后元素设置为0
std::fill(PackData.end() - receivedDataSize, PackData.end(), 0);
// 写的大小重置
writeDataSize -= receivedDataSize;
// 读的大小重置
receivedDataSize = 0;
SizeMtx.unlock();
ClearFlagMtx.lock();
ClearFlag = false;
ClearFlagMtx.unlock();
ResetPacketBuffer();
}
void PackLogic()
{
int RealSize = 0;
int RealReadSize = -1;
if (SizeMtx.try_lock())
std::deque<PendingEvent> events;
DrainPendingEvents(events);
while (!events.empty())
{
receivedDataSize += LogiThreadSize;
LogiThreadSize = 0;
RealSize = writeDataSize;
RealReadSize = receivedDataSize;
SizeMtx.unlock();
}
// 如果包长度不够直接返回
if (RealSize < 4 || RealReadSize == -1)
return;
// 如果里面已经读了超过一半的大小了
if (RealSize >= 40960)
{
if (!ClearFlag)
PendingEvent event = std::move(events.front());
events.pop_front();
if (event.type == PendingEvent::Type::Connect)
{
ClearFlagMtx.lock();
ClearFlag = true;
ClearFlagMtx.unlock();
InvokeGatewayConnectCallback();
continue;
}
InvokeGatewaySocketMsg(event.payload);
}
unsigned char *Count = new unsigned char[4];
std::copy(PackData.begin() + RealReadSize, PackData.begin() + 4 + RealReadSize, Count);
int PackSize = ByteLittleToInt(Count);
delete[] Count;
// 如果包长度不够或者缓冲区长度直接返回
if (PackSize <= 0 || ((RealSize - RealReadSize - 4) < PackSize))
return;
char *StrBuffer = new char[PackSize];
std::copy(PackData.begin() + 4 + RealReadSize, PackData.begin() + 4 + PackSize + RealReadSize, StrBuffer);
std::string Str(StrBuffer, PackSize);
delete[] StrBuffer;
// 包数据大小读取的偏移 这次读了多少
LogiThreadSize += (4 + PackSize);
// std::cout << "包大小: " << PackSize << std::endl;
// std::cout << "包内容: " << Str << std::endl;
// std::cout << "收到了第: " << TestCode << "个包" << std::endl;
// spdlog::info(Str);
std::lock_guard<std::recursive_mutex> lock(SqMtx);
SQInteger top = sq_gettop(v); // saves the stack size before the call
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("OnGatewaySocketMsg"), -1);
if (SQ_SUCCEEDED(sq_get(v, -2)))
{ // gets the field 'foo' from the global table
sq_pushroottable(v); // push the 'this' (in this case is the global table)
sq_pushstring(v, Str.c_str(), PackSize);
sq_call(v, 2, SQFalse, SQTrue); // calls the function
}
sq_settop(v, top); // restores the original stack size
// TestCode++;
}
static int ByteLittleToInt(unsigned char *Count)
@@ -239,7 +319,7 @@ private:
l_socket() {} // private constructor to prevent instantiation
l_socket(const l_socket &) = delete; // disable copy constructor
l_socket &operator=(const l_socket &) = delete; // disable assignment operator
Client *ClientObj;
Client *ClientObj = nullptr;
public:
bool InitState = false;

View File

@@ -16,6 +16,7 @@ SUBHOOK_INIT(doDispatch, 0x08594922);
SUBHOOK_INIT(Inter_LoadGeolocation_dispatch_sig, 0x0816088A);
SUBHOOK_INIT(History_Log, 0x854F990);
SUBHOOK_INIT(Neof_startupGlobalInstances, 0x0829F7C5);
SUBHOOK_INIT(TimerQueue_GetTimerMess, 0x08630ECC);
int checkGame(const char *pName) {
char path[256];
@@ -308,15 +309,6 @@ int _Inter_LoadGeolocation_dispatch_sig(void *pThis, void *pUser, char *a3) {
}
sq_settop(v, top); // restores the original stack size
sq_pushroottable(v); // pushes the global table
sq_pushstring(v, _SC("main"), -1);
if (SQ_SUCCEEDED(
sq_get(v, -2))) { // gets the field 'foo' from the global table
sq_pushroottable(v); // push the 'this' (in this case is the global table)
sq_call(v, 1, SQFalse, SQTrue); // calls the function
}
sq_settop(v, top); // restores the original stack size
int Ret = Inter_LoadGeolocation_dispatch_sig(pThis, pUser, a3);
return Ret;
@@ -366,6 +358,12 @@ int _Neof_startupGlobalInstances() {
return ret;
}
int _TimerQueue_GetTimerMess(void *thisC, void *timerEntry) {
int ret = TimerQueue_GetTimerMess(thisC, timerEntry);
l_socket::getInstance().Logic();
return ret;
}
void Lenheart() {
// 主程序才加载插件
if (!checkGame("df_game_r")) {
@@ -374,7 +372,8 @@ void Lenheart() {
SUBHOOK_SETUP(Inter_LoadGeolocation_dispatch_sig);
SUBHOOK_SETUP(History_Log);
SUBHOOK_SETUP(doDispatch); // 收包注册
SUBHOOK_SETUP(doDispatch);
SUBHOOK_SETUP(TimerQueue_GetTimerMess); // 收包注册
}
}

View File

@@ -110,6 +110,8 @@ typedef void (*fnCUserWorkPerFiveMin)(void *CUser);
// 线程执行
typedef int (*fnTimerDispatcher_dispatch)(void *A, void *B);
typedef int (*fnTimerQueue_GetTimerMess)(void *thisC, void *timerEntry);
// 线程执行
typedef void *(*fnSetUserMaxLevel)(void *CUser, int level);
@@ -121,4 +123,4 @@ typedef int (*fncusermake_basic_info)(void *a1, void *a2, char a3);
typedef int (*fnNeof_startupGlobalInstances)();
__END_DECLS
__END_DECLS

View File

@@ -64,7 +64,7 @@ void l_socket::IntToByteLittle(unsigned char *b, int Count)
void l_socket::Send(const SQChar *Pck)
{
if (ClientObj->ConnectState == false)
if (ClientObj == nullptr || ClientObj->ConnectState == false)
return;
int Length = strlen(Pck);
unsigned char *PckData = new unsigned char[Length + 4];
@@ -76,7 +76,7 @@ void l_socket::Send(const SQChar *Pck)
void l_socket::Logic()
{
if (ClientObj->ConnectState == false)
if (ClientObj == nullptr)
return;
ClientObj->PackLogic();
}