diff --git a/include/Version.h b/include/Version.h index 74916e5..85a13f5 100644 --- a/include/Version.h +++ b/include/Version.h @@ -1,3 +1,3 @@ #pragma once -#define DPS_SCRIPT_VERSION "1.3" \ No newline at end of file +#define DPS_SCRIPT_VERSION "1.4" \ No newline at end of file diff --git a/include/l_socket.h b/include/l_socket.h index 36e2218..374f9b3 100644 --- a/include/l_socket.h +++ b/include/l_socket.h @@ -1,11 +1,20 @@ #pragma once #include "Singleton.h" #include "l_squirrel.h" -#include +#include #include -#include -#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include extern HSQUIRRELVM v; extern std::recursive_mutex SqMtx; @@ -16,24 +25,34 @@ using asio::ip::tcp; class Client { private: - asio::io_context io_context; + struct PendingEvent + { + enum class Type + { + Connect, + Message + }; + + Type type = Type::Message; + std::string payload; + }; + +private: tcp::resolver resolver; tcp::socket socket; tcp::resolver::results_type endpoint; std::array buffer; std::array PackData; - std::size_t writeDataSize = 0; - std::size_t receivedDataSize = 0; + std::size_t bufferedDataSize = 0; + std::deque pendingEvents; + std::size_t queuedMessageBytes = 0; + std::mutex pendingEventsMutex; std::string Ip = "192.168.200.27"; // std::string Ip = "127.0.0.1"; std::string Port = "65109"; - std::mutex SizeMtx; - std::mutex ClearFlagMtx; public: bool ConnectState = false; - bool ClearFlag = false; - int LogiThreadSize = 0; std::fstream file; @@ -48,24 +67,162 @@ public: PackData.fill(0); } +private: + enum : std::size_t + { + HeaderSize = 4, + MaxQueuedMessages = 10000, + MaxQueuedMessageBytes = 8 * 1024 * 1024 + }; + + void ResetPacketBuffer() + { + bufferedDataSize = 0; + } + + void InvokeGatewayConnectCallback() + { + std::lock_guard lock(SqMtx); + SQInteger top = sq_gettop(v); // saves the stack size before the call + sq_pushroottable(v); // pushes the global table + sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1); + if (SQ_SUCCEEDED(sq_get(v, -2))) + { + sq_pushroottable(v); // push the global table as this + sq_call(v, 1, SQFalse, SQTrue); // calls the function + } + sq_settop(v, top); // restores the original stack size + } + + void InvokeGatewaySocketMsg(const std::string &message) + { + std::lock_guard lock(SqMtx); + SQInteger top = sq_gettop(v); // saves the stack size before the call + sq_pushroottable(v); // pushes the global table + sq_pushstring(v, _SC("OnGatewaySocketMsg"), -1); + if (SQ_SUCCEEDED(sq_get(v, -2))) + { + sq_pushroottable(v); + sq_pushstring(v, message.c_str(), static_cast(message.size())); + sq_call(v, 2, SQFalse, SQTrue); + } + sq_settop(v, top); + } + + bool EnqueueConnectEvent() + { + std::lock_guard lock(pendingEventsMutex); + PendingEvent event; + event.type = PendingEvent::Type::Connect; + pendingEvents.push_back(std::move(event)); + return true; + } + + bool EnqueueMessageEvent(const char *message, std::size_t length) + { + std::lock_guard lock(pendingEventsMutex); + if (pendingEvents.size() >= MaxQueuedMessages) + { + spdlog::error("Gateway pending message queue is full. queued_events={}, limit={}", + static_cast(pendingEvents.size()), + static_cast(MaxQueuedMessages)); + return false; + } + + if ((queuedMessageBytes + length) > MaxQueuedMessageBytes) + { + spdlog::error("Gateway pending message bytes exceeded. queued_bytes={}, incoming={}, limit={}", + static_cast(queuedMessageBytes), + static_cast(length), + static_cast(MaxQueuedMessageBytes)); + return false; + } + + PendingEvent event; + event.type = PendingEvent::Type::Message; + event.payload.assign(message, length); + queuedMessageBytes += length; + pendingEvents.push_back(std::move(event)); + return true; + } + + void DrainPendingEvents(std::deque &events) + { + std::lock_guard lock(pendingEventsMutex); + if (pendingEvents.empty()) + return; + events.swap(pendingEvents); + queuedMessageBytes = 0; + } + + bool AppendReceivedData(std::size_t bytes_transferred) + { + if (bytes_transferred == 0) + return true; + + if ((bufferedDataSize + bytes_transferred) > PackData.size()) + { + spdlog::error("Gateway receive buffer overflow. buffered={}, incoming={}, capacity={}", + static_cast(bufferedDataSize), + static_cast(bytes_transferred), + static_cast(PackData.size())); + return false; + } + + std::memcpy(PackData.data() + bufferedDataSize, buffer.data(), bytes_transferred); + bufferedDataSize += bytes_transferred; + return true; + } + + bool ProcessBufferedPackets() + { + std::size_t offset = 0; + while ((bufferedDataSize - offset) >= HeaderSize) + { + const int packSize = ByteLittleToInt(reinterpret_cast(PackData.data() + offset)); + if (packSize <= 0) + { + spdlog::error("Gateway packet length is invalid: {}", packSize); + return false; + } + + const std::size_t payloadSize = static_cast(packSize); + if (payloadSize > (PackData.size() - HeaderSize)) + { + spdlog::error("Gateway packet length exceeds buffer. payload={}, capacity={}", + static_cast(payloadSize), + static_cast(PackData.size() - HeaderSize)); + return false; + } + + if ((bufferedDataSize - offset - HeaderSize) < payloadSize) + break; + + if (!EnqueueMessageEvent(PackData.data() + offset + HeaderSize, payloadSize)) + return false; + offset += HeaderSize + payloadSize; + } + + if (offset == 0) + return true; + + const std::size_t remaining = bufferedDataSize - offset; + if (remaining > 0) + std::memmove(PackData.data(), PackData.data() + offset, remaining); + bufferedDataSize = remaining; + return true; + } + +public: void start() { - async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &endpoint) + async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &connected_endpoint) { if (!error) { ConnectState = true; + ResetPacketBuffer(); + EnqueueConnectEvent(); read(); - std::lock_guard lock(SqMtx); - SQInteger top = sq_gettop(v); // saves the stack size before the call - sq_pushroottable(v); // pushes the global table - sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1); - if (SQ_SUCCEEDED(sq_get(v, -2))) - { // gets the field 'foo' from the global table - sq_pushroottable(v); // push the 'this' (in this case is the global table) - sq_call(v, 1, SQFalse, SQTrue); // calls the function - } - sq_settop(v, top); // restores the original stack size - io_context.poll(); // 处理一次事件循环,避免主线程阻塞 } else { // std::cerr << "Error connecting to server: " << error.message() << std::endl; start(); @@ -77,16 +234,11 @@ public: socket.async_read_some(asio::buffer(buffer), [this](const asio::error_code &error, std::size_t bytes_transferred) { if (!error) { - if (ClearFlag) - ClearPack(); - for (std::size_t i = 0; i < bytes_transferred; ++i) - { - PackData[writeDataSize + i] = buffer[i]; - } - SizeMtx.lock(); - writeDataSize += bytes_transferred; - SizeMtx.unlock(); - read(); + if (!AppendReceivedData(bytes_transferred) || !ProcessBufferedPackets()) { + reconnect(); + return; + } + read(); } else { // std::cerr << "Error reading data: " << error.message() << std::endl; usleep(10); @@ -97,39 +249,27 @@ public: void reconnect() { ConnectState = false; - std::cout << "服务器断开连接,尝试重连." << std::endl; - // 执行重连操作 - // 关闭当前连接 - socket.close(); - // 重新解析端点 + ResetPacketBuffer(); + std::cout << "Gateway socket disconnected, reconnecting." << std::endl; + asio::error_code close_error; + socket.close(close_error); endpoint = resolver.resolve(Ip, Port); - // 重新连接 - async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &endpoint) + async_connect(socket, endpoint, [this](const asio::error_code &error, const tcp::endpoint &connected_endpoint) { if (!error) { ConnectState = true; - std::lock_guard lock(SqMtx); - SQInteger top = sq_gettop(v); // saves the stack size before the call - sq_pushroottable(v); // pushes the global table - sq_pushstring(v, _SC("OnGatewaySocketConnect"), -1); - if (SQ_SUCCEEDED(sq_get(v, -2))) - { // gets the field 'foo' from the global table - sq_pushroottable(v); // push the 'this' (in this case is the global table) - sq_call(v, 1, SQFalse, SQTrue); // calls the function - } - sq_settop(v, top); // restores the original stack size - + ResetPacketBuffer(); + EnqueueConnectEvent(); read(); } else { - // std::cerr << "Error reconnecting to server: " << error.message() << std::endl; - // 可以在此处添加重连失败的处理逻辑 reconnect(); } }); } void send(unsigned char *message, int Length) { - async_write(socket, asio::buffer(message, Length), [this](const asio::error_code &error, std::size_t bytes_transferred) + std::shared_ptr > payload(new std::vector(message, message + Length)); + async_write(socket, asio::buffer(*payload), [payload](const asio::error_code &error, std::size_t bytes_transferred) { if (!error) { // std::cout << "Message sent" << std::endl; @@ -140,85 +280,25 @@ public: void ClearPack() { - SizeMtx.lock(); - // 复制已读大小开始到结束到 开始 作用删除前已读大小的数据 - std::copy(PackData.begin() + receivedDataSize, PackData.end(), PackData.begin()); - // 将最后元素设置为0 - std::fill(PackData.end() - receivedDataSize, PackData.end(), 0); - // 写的大小重置 - writeDataSize -= receivedDataSize; - // 读的大小重置 - receivedDataSize = 0; - SizeMtx.unlock(); - ClearFlagMtx.lock(); - ClearFlag = false; - ClearFlagMtx.unlock(); + ResetPacketBuffer(); } void PackLogic() { - int RealSize = 0; - int RealReadSize = -1; - - if (SizeMtx.try_lock()) + std::deque events; + DrainPendingEvents(events); + while (!events.empty()) { - receivedDataSize += LogiThreadSize; - LogiThreadSize = 0; - RealSize = writeDataSize; - RealReadSize = receivedDataSize; - SizeMtx.unlock(); - } - // 如果包长度不够直接返回 - if (RealSize < 4 || RealReadSize == -1) - return; - // 如果里面已经读了超过一半的大小了 - if (RealSize >= 40960) - { - if (!ClearFlag) + PendingEvent event = std::move(events.front()); + events.pop_front(); + if (event.type == PendingEvent::Type::Connect) { - ClearFlagMtx.lock(); - ClearFlag = true; - ClearFlagMtx.unlock(); + InvokeGatewayConnectCallback(); + continue; } + + InvokeGatewaySocketMsg(event.payload); } - - unsigned char *Count = new unsigned char[4]; - - std::copy(PackData.begin() + RealReadSize, PackData.begin() + 4 + RealReadSize, Count); - - int PackSize = ByteLittleToInt(Count); - delete[] Count; - - // 如果包长度不够或者缓冲区长度直接返回 - if (PackSize <= 0 || ((RealSize - RealReadSize - 4) < PackSize)) - return; - - char *StrBuffer = new char[PackSize]; - - std::copy(PackData.begin() + 4 + RealReadSize, PackData.begin() + 4 + PackSize + RealReadSize, StrBuffer); - std::string Str(StrBuffer, PackSize); - delete[] StrBuffer; - - // 包数据大小读取的偏移 这次读了多少 - LogiThreadSize += (4 + PackSize); - - // std::cout << "包大小: " << PackSize << std::endl; - // std::cout << "包内容: " << Str << std::endl; - // std::cout << "收到了第: " << TestCode << "个包" << std::endl; - // spdlog::info(Str); - - std::lock_guard lock(SqMtx); - SQInteger top = sq_gettop(v); // saves the stack size before the call - sq_pushroottable(v); // pushes the global table - sq_pushstring(v, _SC("OnGatewaySocketMsg"), -1); - if (SQ_SUCCEEDED(sq_get(v, -2))) - { // gets the field 'foo' from the global table - sq_pushroottable(v); // push the 'this' (in this case is the global table) - sq_pushstring(v, Str.c_str(), PackSize); - sq_call(v, 2, SQFalse, SQTrue); // calls the function - } - sq_settop(v, top); // restores the original stack size - // TestCode++; } static int ByteLittleToInt(unsigned char *Count) @@ -239,7 +319,7 @@ private: l_socket() {} // private constructor to prevent instantiation l_socket(const l_socket &) = delete; // disable copy constructor l_socket &operator=(const l_socket &) = delete; // disable assignment operator - Client *ClientObj; + Client *ClientObj = nullptr; public: bool InitState = false; diff --git a/src/df_main.cpp b/src/df_main.cpp index 7a74501..45c9313 100644 --- a/src/df_main.cpp +++ b/src/df_main.cpp @@ -16,6 +16,7 @@ SUBHOOK_INIT(doDispatch, 0x08594922); SUBHOOK_INIT(Inter_LoadGeolocation_dispatch_sig, 0x0816088A); SUBHOOK_INIT(History_Log, 0x854F990); SUBHOOK_INIT(Neof_startupGlobalInstances, 0x0829F7C5); +SUBHOOK_INIT(TimerQueue_GetTimerMess, 0x08630ECC); int checkGame(const char *pName) { char path[256]; @@ -308,15 +309,6 @@ int _Inter_LoadGeolocation_dispatch_sig(void *pThis, void *pUser, char *a3) { } sq_settop(v, top); // restores the original stack size - sq_pushroottable(v); // pushes the global table - sq_pushstring(v, _SC("main"), -1); - if (SQ_SUCCEEDED( - sq_get(v, -2))) { // gets the field 'foo' from the global table - sq_pushroottable(v); // push the 'this' (in this case is the global table) - sq_call(v, 1, SQFalse, SQTrue); // calls the function - } - sq_settop(v, top); // restores the original stack size - int Ret = Inter_LoadGeolocation_dispatch_sig(pThis, pUser, a3); return Ret; @@ -366,6 +358,12 @@ int _Neof_startupGlobalInstances() { return ret; } +int _TimerQueue_GetTimerMess(void *thisC, void *timerEntry) { + int ret = TimerQueue_GetTimerMess(thisC, timerEntry); + l_socket::getInstance().Logic(); + return ret; +} + void Lenheart() { // 主程序才加载插件 if (!checkGame("df_game_r")) { @@ -374,7 +372,8 @@ void Lenheart() { SUBHOOK_SETUP(Inter_LoadGeolocation_dispatch_sig); SUBHOOK_SETUP(History_Log); - SUBHOOK_SETUP(doDispatch); // 收包注册 + SUBHOOK_SETUP(doDispatch); + SUBHOOK_SETUP(TimerQueue_GetTimerMess); // 收包注册 } } diff --git a/src/import.h b/src/import.h index 95855d2..a8719b8 100644 --- a/src/import.h +++ b/src/import.h @@ -110,6 +110,8 @@ typedef void (*fnCUserWorkPerFiveMin)(void *CUser); // 线程执行 typedef int (*fnTimerDispatcher_dispatch)(void *A, void *B); +typedef int (*fnTimerQueue_GetTimerMess)(void *thisC, void *timerEntry); + // 线程执行 typedef void *(*fnSetUserMaxLevel)(void *CUser, int level); @@ -121,4 +123,4 @@ typedef int (*fncusermake_basic_info)(void *a1, void *a2, char a3); typedef int (*fnNeof_startupGlobalInstances)(); -__END_DECLS \ No newline at end of file +__END_DECLS diff --git a/src/l_socket.cpp b/src/l_socket.cpp index 86be2f8..ed88af6 100644 --- a/src/l_socket.cpp +++ b/src/l_socket.cpp @@ -64,7 +64,7 @@ void l_socket::IntToByteLittle(unsigned char *b, int Count) void l_socket::Send(const SQChar *Pck) { - if (ClientObj->ConnectState == false) + if (ClientObj == nullptr || ClientObj->ConnectState == false) return; int Length = strlen(Pck); unsigned char *PckData = new unsigned char[Length + 4]; @@ -76,7 +76,7 @@ void l_socket::Send(const SQChar *Pck) void l_socket::Logic() { - if (ClientObj->ConnectState == false) + if (ClientObj == nullptr) return; ClientObj->PackLogic(); }