'send', 'd' => 'daemon', 'g' => 'gracefully', ]; } public function actionIndex() { if ('start' == $this->send) { try { $this->start($this->daemon); } catch (\Exception $e) { $this->stderr($e->getMessage() . "\n", Console::FG_RED); } } else if ('stop' == $this->send) { $this->stop(); } else if ('restart' == $this->send) { $this->restart(); } else if ('reload' == $this->send) { $this->reload(); } else if ('status' == $this->send) { $this->status(); } else if ('connections' == $this->send) { $this->connections(); } } public function initWorker() { $ip = isset($this->config['ip']) ? $this->config['ip'] : $this->ip; $port = isset($this->config['port']) ? $this->config['port'] : $this->port; $wsWorker = new Worker("websocket://{$ip}:{$port}"); // 4 processes $wsWorker->count = 4; // Emitted when new connection come $wsWorker->onConnect = function ($connection) { echo "New connection\n"; }; // Emitted when data received $wsWorker->onMessage = function ($connection, $data) { if ('binance' == $data) { $con = new \Workerman\Connection\AsyncTcpConnection("ws://stream.binance.com:9443/ws/!ticker@arr"); $con->transport = 'ssl'; $con->onMessage = function ($con, $data) use ($connection) { $base_coin = [ 'ETH', 'BTC', 'USDT', 'BTC' ]; $result = json_decode($data, true); $ticker = []; foreach ($result as $val) { foreach ($base_coin as $k => $coin) { $explode_arr = explode($coin, $val['s']); if (2 == count($explode_arr) && empty($explode_arr[1])) { $ticker[$val['s']]['symbol'] = $explode_arr[0] . '/' . $coin; $ticker[$val['s']]['close'] = (float)sprintf("%0.4f", $val['c']); $ticker[$val['s']]['change'] = (float)sprintf("%0.4f", $val['p'] * 100); $ticker[$val['s']]['high'] = (float)sprintf("%0.4f", $val['h']); $ticker[$val['s']]['low'] = (float)sprintf("%0.4f", $val['l']); $ticker[$val['s']]['vol'] = (float)sprintf("%0.4f", $val['v']); } } } $connection->send(date("Y-m-d H:i:s") . ' : ' . json_encode($ticker)); }; $con->connect(); } elseif ('huobi' == $data) { $connection->send('huobi'); } elseif ('okex' == $data) { $connection->send('okex'); } else { $connection->send('other'); } }; // Emitted when connection closed $wsWorker->onClose = function ($connection) { echo "Connection closed\n"; }; } /** * workman websocket start */ public function start() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'start'; if ($this->daemon) { $argv[2] = '-d'; } // Run worker Worker::runAll(); } /** * workman websocket restart */ public function restart() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'restart'; if ($this->daemon) { $argv[2] = '-d'; } if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket stop */ public function stop() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'stop'; if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket reload */ public function reload() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'reload'; if ($this->gracefully) { $argv[2] = '-g'; } // Run worker Worker::runAll(); } /** * workman websocket status */ public function status() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'status'; if ($this->daemon) { $argv[2] = '-d'; } // Run worker Worker::runAll(); } /** * workman websocket connections */ public function connections() { $this->initWorker(); // 重置参数以匹配Worker global $argv; $argv[0] = $argv[1]; $argv[1] = 'connections'; // Run worker Worker::runAll(); } }