'send', 'd' => 'daemon', 'g' => 'gracefully', ]; } public function actionIndex() { if ('start' == $this->send) { try { $this->start($this->daemon); } catch (\Exception $e) { $this->stderr($e->getMessage() . "\n", Console::FG_RED); } } else if ('stop' == $this->send) { $this->stop(); } else if ('restart' == $this->send) { $this->restart(); } else if ('reload' == $this->send) { $this->reload(); } else if ('status' == $this->send) { $this->status(); } else if ('connections' == $this->send) { $this->connections(); } } public function initWorker() { $ip = isset($this->config['ip']) ? $this->config['ip'] : $this->ip; $port = isset($this->config['port']) ? $this->config['port'] : $this->port; define('HEARTBEAT_TIME', 5); $wsWorker = new Worker("websocket://{$ip}:{$port}"); // 4 processes $wsWorker->count = 4; $wsWorker->uidConnections = []; global $uids; // Emitted when new connection come $wsWorker->onConnect = function ($connection) { echo "New connection\n"; }; // $wsWorker->onConnect = function($connection) { // // 给链接对象临时赋值一个lastTime属性记录上次接收消息的时间 // $connection->lastTime = time(); // }; // 进程启动后设置一个每秒运行一次的定时器 $wsWorker->onWorkerStart = function ($worker) { \Workerman\Lib\Timer::add(1, function () use ($worker) { $time_now = time(); foreach ($worker->connections as $connection) { // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 if (empty($connection->lastMessageTime)) { $connection->lastMessageTime = $time_now; continue; } // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) { $connection->close(); } } }); }; // Emitted when data received $wsWorker->onMessage = function ($connection, $data) { if ('binance' == $data) { $con = new \Workerman\Connection\AsyncTcpConnection("ws://stream.binance.com:9443/ws/!ticker@arr"); $con->transport = 'ssl'; $con->onMessage = function ($con, $data) use ($connection) { $base_coin = [ 'ETH', 'BTC', 'USDT', 'BTY' ]; $result = json_decode($data, true); $ticker = []; foreach ($result as $val) { foreach ($base_coin as $k => $coin) { $explode_arr = explode($coin, $val['s']); if (2 == count($explode_arr) && empty($explode_arr[1])) { $temp = []; $temp['symbol'] = $explode_arr[0] . '/' . $coin; $temp['close'] = (float)sprintf("%0.4f", $val['c']); $temp['change'] = (float)sprintf("%0.4f", $val['p'] * 100); $temp['high'] = (float)sprintf("%0.4f", $val['h']); $temp['low'] = (float)sprintf("%0.4f", $val['l']); $temp['vol'] = (float)sprintf("%0.4f", $val['v']); array_push($ticker, $temp); break; } } } $connection->send('binance : ' . json_encode($ticker)); }; $con->connect(); } elseif ('huobi' == $data) { //$connection->websocketType = Websocket::BINARY_TYPE_ARRAYBUFFER; // ssl需要访问443端口 $con = new \Workerman\Connection\AsyncTcpConnection('ws://api.huobi.pro:443/ws'); // 设置以ssl加密方式访问,使之成为wss $con->transport = 'ssl'; $con->onConnect = function ($con) { $data = json_encode([ #'sub' => 'market.btcusdt.kline.1min', #'id' => 'depth' . time(), 'sub' => 'market.overview' ]); $con->send($data); }; $con->onMessage = function ($con, $data) use ($connection) { $data = gzdecode($data); $data = json_decode($data, true); if (isset($data['ping'])) { $con->send(json_encode([ "pong" => $data['ping'] ])); } else if (isset($data['ch']) && 'market.overview' == $data['ch']) { $base_coin = [ 'ETH', 'BTC', 'USDT', 'BTY' ]; $ticker = []; foreach ($data['data'] as $val) { foreach ($base_coin as $k => $coin) { $explode_arr = explode($coin, strtoupper($val['symbol'])); if (2 == count($explode_arr) && empty($explode_arr[1])) { $temp = []; $temp['symbol'] = $explode_arr[0] . '/' . $coin; $temp['close'] = (float)sprintf("%0.4f", $val['close']); $temp['change'] = (0 == $val['open']) ? 0 : (float)sprintf("%0.4f", ($val['close'] - $val['open']) / $val['open'] * 100); $temp['high'] = (float)sprintf("%0.4f", $val['high']); $temp['low'] = (float)sprintf("%0.4f", $val['low']); $temp['vol'] = (float)sprintf("%0.4f", $val['vol']); array_push($ticker, $temp); break; } } } $connection->send('huobi : ' . json_encode($ticker)); } else { } }; $con->connect(); } elseif ('okex' == $data) { $connection->websocketType = Websocket::BINARY_TYPE_ARRAYBUFFER; $con = new \Workerman\Connection\AsyncTcpConnection("ws://real.okex.com:8443/ws/v3"); $con->transport = 'ssl'; $con->onConnect = function ($con) { $data = json_encode([ 'op' => 'subscribe', 'args' => [ 'spot/all_ticker_3s', 'futures/all_ticker_3s', 'index/all_ticker_3s', 'futures/delivery:BTC' ] ]); $con->send("{\"op\":\"subscribe\",\"args\":[\"spot/all_ticker_3s\",\"futures/all_ticker_3s\",\"index/all_ticker_3s\",\"futures/delivery:BTC\"]}"); }; $con->onMessage = function ($con, $data) use ($connection) { $data = gzdecode($data); $connection->send(date("Y-m-d H:i:s") . ' : ' . json_encode($data)); }; $con->connect(); } else { $connection->send('other'); } }; // Emitted when connection closed $wsWorker->onClose = function ($connection) { echo "Connection closed\n"; }; } /** * workman websocket start */ public function start() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'start'; if ($this->daemon) { $argv[2] = '-d'; } // Run worker Worker::runAll(); } /** * workman websocket restart */ public function restart() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'restart'; if ($this->daemon) { $argv[2] = '-d'; } if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket stop */ public function stop() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'stop'; if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket reload */ public function reload() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'reload'; if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket status */ public function status() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'status'; if ($this->daemon) { $argv[2] = '-d'; } // Run worker Worker::runAll(); } /** * workman websocket connections */ public function connections() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'connections'; // Run worker Worker::runAll(); } }