2026/4/6 14:46:48
网站建设
项目流程
1. 实时消息推送的技术选型实时消息推送在现代Web应用中越来越重要无论是电商的订单状态更新、社交软件的即时聊天还是在线客服系统都需要稳定高效的实时通信能力。传统的HTTP协议基于请求-响应模式服务器无法主动向客户端推送数据这就引出了对实时通信技术的需求。在PHP生态中Workerman是一个高性能的PHP Socket服务器框架它支持长连接通信能够轻松处理数十万甚至上百万的并发连接。ThinkPHP作为国内流行的PHP框架以其简洁优雅的代码风格和丰富的功能受到开发者喜爱。将两者结合既能利用ThinkPHP快速开发业务逻辑又能借助Workerman实现高性能的实时通信。为什么选择Workerman而不是其他方案首先Workerman纯PHP实现无需安装额外扩展部署简单。其次它支持多种协议WebSocket、HTTP、自定义协议等灵活性高。最重要的是Workerman的性能表现出色在4核8G服务器上可以轻松支持数万并发连接。2. 环境准备与基础配置2.1 安装ThinkPHP8.0和Workerman首先创建一个新的ThinkPHP8.0项目composer create-project topthink/think tp8-workerman-demo进入项目目录安装Workermancomposer require workerman/workerman对于生产环境建议使用Supervisor来管理Workerman进程确保服务稳定运行。创建一个简单的supervisor配置[program:workerman] commandphp think worker:server directory/path/to/your/project autostarttrue autorestarttrue userwww numprocs1 redirect_stderrtrue stdout_logfile/var/log/workerman.log2.2 基础WebSocket服务搭建在ThinkPHP中创建一个自定义命令来启动Workerman服务。在app/command目录下创建WebSocketServer.phpnamespace app\command; use think\console\Command; use think\console\Input; use think\console\Output; use Workerman\Worker; class WebSocketServer extends Command { protected function configure() { $this-setName(websocket:start) -setDescription(Start WebSocket server); } protected function execute(Input $input, Output $output) { $worker new Worker(websocket://0.0.0.0:2345); $worker-count 4; // 根据CPU核心数设置 $worker-onWorkerStart function($worker) { echo Worker starting...\n; }; $worker-onMessage function($connection, $data) { $connection-send(Received: . $data); }; Worker::runAll(); } }然后在config/console.php中注册这个命令return [ commands [ websocket:start app\command\WebSocketServer, ], ];现在可以通过以下命令启动WebSocket服务php think websocket:start3. ThinkPHP与Workerman深度整合3.1 双向通信架构设计一个完整的实时消息系统需要支持双向通信客户端通过WebSocket连接到Workerman服务ThinkPHP处理业务逻辑后通过内部端口推送消息到WorkermanWorkerman将消息实时推送给指定客户端关键点在于建立内部通信机制。我们在Workerman中创建一个内部Text Worker$worker-onWorkerStart function($worker) { if ($worker-id 0) { $innerWorker new Worker(text://0.0.0.0:1234); $innerWorker-onMessage function($conn, $buffer) use ($worker) { $data json_decode($buffer, true); foreach($worker-connections as $connection) { $connection-send($buffer); } $conn-send(ok); }; $innerWorker-listen(); } };3.2 用户连接管理与消息路由实际应用中需要识别不同用户连接。我们可以通过UID映射实现$worker-uidConnections []; $worker-onMessage function($connection, $data) use ($worker) { $data json_decode($data, true); if (isset($data[type]) $data[type] login) { $connection-uid $data[uid]; $worker-uidConnections[$data[uid]] $connection; return; } // 其他消息处理... }; $worker-onClose function($connection) use ($worker) { if (isset($connection-uid)) { unset($worker-uidConnections[$connection-uid]); } };在ThinkPHP控制器中可以通过以下方式推送消息public function pushMessage($uid, $content) { $client stream_socket_client(tcp://127.0.0.1:1234, $errno, $errstr, 1); if (!$client) { return false; } $data [ uid $uid, content $content ]; fwrite($client, json_encode($data).\n); $result fgets($client, 1024); fclose($client); return trim($result) ok; }4. 性能优化与生产实践4.1 连接保活与心跳检测长连接需要维护连接状态防止因网络问题导致僵尸连接define(HEARTBEAT_TIME, 55); $worker-onWorkerStart function($worker) { Timer::add(10, function() use ($worker) { $time_now time(); foreach($worker-connections as $connection) { if (empty($connection-lastMessageTime)) { $connection-lastMessageTime $time_now; continue; } if ($time_now - $connection-lastMessageTime HEARTBEAT_TIME) { $connection-close(); } } }); };客户端需要定期发送心跳包setInterval(() { ws.send(JSON.stringify({type: ping})); }, 30000);4.2 多进程数据共享问题Workerman多进程模式下进程间内存隔离。有几种解决方案使用Redis存储连接映射使用Channel组件进行进程间通信只使用一个内部Worker处理推送推荐使用Redis方案$worker-onMessage function($connection, $data) { $redis new Redis(); $redis-connect(127.0.0.1, 6379); if (isset($data[uid])) { $redis-set(ws:uid:{$data[uid]}, $connection-id); } };4.3 监控与日志记录完善的监控对生产环境至关重要。可以记录以下指标当前连接数消息吞吐量系统资源占用$worker-onWorkerStart function($worker) { Timer::add(60, function() use ($worker) { $stats [ time date(Y-m-d H:i:s), memory round(memory_get_usage()/1024/1024, 2).MB, connections count($worker-connections) ]; file_put_contents(/tmp/workerman_stats.log, json_encode($stats).\n, FILE_APPEND); }); };5. 典型应用场景实现5.1 在线客服系统客服系统需要支持客户与客服一对一聊天客服状态管理在线/忙碌/离线消息历史记录实现核心代码// 客服分配逻辑 $worker-onMessage function($connection, $data) use ($worker) { $data json_decode($data, true); if ($data[type] customer_connect) { $customerId $data[customer_id]; $serviceId $this-assignServiceStaff(); $worker-uidConnections[$customerId][service_id] $serviceId; $worker-uidConnections[$serviceId][customers][] $customerId; $connection-send(json_encode([ type service_assigned, service_id $serviceId ])); } };5.2 即时通讯应用实现类似微信的即时通讯功能// 消息转发逻辑 if ($data[type] private_msg) { $targetUid $data[to_uid]; if (isset($worker-uidConnections[$targetUid])) { $worker-uidConnections[$targetUid]-send(json_encode([ type new_msg, from $connection-uid, content $data[content] ])); } else { // 离线消息存储 Db::name(offline_messages)-insert([ from_uid $connection-uid, to_uid $targetUid, content $data[content], create_time time() ]); } }5.3 实时数据监控大屏对于需要实时展示数据的场景// 数据广播 function broadcast($worker, $data) { foreach ($worker-connections as $connection) { $connection-send(json_encode($data)); } } // 定时从数据库获取最新数据 Timer::add(5, function() use ($worker) { $stats Db::name(real_time_stats) -order(id, desc) -limit(10) -select(); broadcast($worker, [ type stats_update, data $stats ]); });6. 安全与异常处理6.1 连接认证与加密WebSocket连接需要安全认证$worker-onWebSocketConnect function($connection, $header) { $token $_GET[token] ?? ; if (!$this-validateToken($token)) { $connection-close(); } };建议启用SSL加密$worker new Worker(websocket://0.0.0.0:443, [ ssl [ local_cert /path/to/cert.pem, local_pk /path/to/key.pem, verify_peer false ] ]);6.2 异常处理与重连机制客户端需要处理连接中断情况function connectWebSocket() { const ws new WebSocket(wss://yoursite.com); ws.onclose function() { setTimeout(connectWebSocket, 5000); // 5秒后重连 }; ws.onerror function(err) { console.error(WebSocket error:, err); }; }服务端也需要处理异常$worker-onError function($connection, $code, $msg) { echo Error [$code] $msg\n; // 记录错误日志 };7. 部署与扩展7.1 负载均衡方案高并发场景下需要多机部署使用Nginx做WebSocket负载均衡使用Redis存储全局连接信息Nginx配置示例upstream websocket { server 127.0.0.1:2345; server 127.0.0.1:2346; } server { location /ws { proxy_pass http://websocket; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection upgrade; } }7.2 水平扩展策略当单机性能不足时可以考虑按业务拆分不同Worker服务使用GatewayWorker商业版引入消息队列削峰填谷Redis发布订阅示例$worker-onWorkerStart function() { $redis new Redis(); $redis-connect(127.0.0.1, 6379); $redis-subscribe([system_notice], function($redis, $channel, $msg) { broadcast($msg); }); };在实际项目中我们曾遇到过连接数突增导致服务不稳定的情况。通过优化Worker配置和引入连接数限制机制最终实现了平稳运行。关键是要做好压力测试了解系统的瓶颈所在。