Commit 9f18ca35 authored by tufengqi's avatar tufengqi

异步优化

parent d28f74f5
......@@ -32,7 +32,7 @@ use Thrift\Factory\TStringFuncFactory;
*
* @package thrift.transport
*/
class TBufferedTransport extends TTransport
class TBufferedTransport extends \Thrift\Transport\TTransport
{
/**
* The underlying transport
......
......@@ -113,7 +113,8 @@ class TGuzzleClient extends \Thrift\Transport\TTransport
$this->request_ = '';
$this->response_ = null;
$this->timeout_ = null;
$this->headers_ = array();
$this->response_body = '';
$this->headers_ = [];
}
public function setFuncUri($uri_func)
......@@ -220,7 +221,7 @@ class TGuzzleClient extends \Thrift\Transport\TTransport
$promise = $client->sendAsync($request, $options)->then(function ($response) {
$this->response_ = $response->getBody()->getContents();
});
$promise->wait();
return $promise;
}
public static function closeCurlHandle()
......
<?php
namespace fpf\thrift;
class ThriftAsyncDetailInstanse
{
private $protocol = null;
private $promise = null;
private $class_name = '';
private $func_name = '';
public function ___construct($protocol, $promise, $class_name, $func_name)
{
$this->protocol = $protocol;
$this->promise = $promise;
$this->class_name = $class_name;
$this->func_name = $func_name;
}
public function getProtocol()
{
return $this->protocol;
}
public function getPromise()
{
return $this->promise;
}
public function wait()
{
$this->promise->wait();
return $this->readResult();
}
public function readResult()
{
$result_class_name = $this->class_name . '_' . $this->func_name . '_result';
$bin_accel = ($this->protocol instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary');
if ($bin_accel) {
$result = thrift_protocol_read_binary($this->protocol, $result_class_name, $this->protocol->isStrictRead());
} else {
$rseqid = 0;
$fname = null;
$mtype = 0;
$this->protocol->readMessageBegin($fname, $mtype, $rseqid);
if ($mtype == TMessageType::EXCEPTION) {
$x = new TApplicationException();
$x->read($this->protocol);
$this->protocol->readMessageEnd();
throw $x;
}
$result = new $result_class_name();
$result->read($this->protocol);
$this->protocol->readMessageEnd();
}
if ($result->success !== null) {
return $result->success;
}
throw new \Exception("getList failed: unknown result");
}
}
\ No newline at end of file
......@@ -16,14 +16,19 @@ class ThriftAsyncServiceFactoryProxy
private $instance = null;
private $transport = null;
private $socket = null;
private $tem_class = '';
private $promise = null;
private $protocol_list = null;
private $request_base_params = null;
private $tem_class_name = '';
private $class_name = '';
private $func_name = '';
const BINARY = 0;
const JSON = 1;
/**
* @var \Thrift\Factory\TBinaryProtocolFactory|\Thrift\Factory\TJSONProtocolFactory
*/
private $protocal_factory_class;
private $protocol_factory_instanse;
public function __construct($class_name)
{
......@@ -38,7 +43,8 @@ class ThriftAsyncServiceFactoryProxy
$service_check_arr = $check_ret[BaseConstant::MSG];
$service_name_space_str = $service_check_arr[0];
$service_str = $service_check_arr[1];
$this->tem_class = '\\service\\' . $service_name_space_str . '\\' . $service_str;
$this->tem_class_name = '\\service\\' . $service_name_space_str . '\\' . $service_str;
$this->class_name = $class_name;
if (empty($service_module_hosts[$service_name_space_str . '-' . $service_str])) {
$service_config = $service_default_hosts;
} else {
......@@ -60,19 +66,13 @@ class ThriftAsyncServiceFactoryProxy
self::BINARY => 'application/thrift-binary',
self::JSON => 'application/thrift-json',
];
$this->socket = new TGuzzleClient($service_real['host'], $service_real['port'], '/' . $service_name_space_str . '/' . $service_str . '/');
$this->socket->addHeaders(['Content-type' => $content_type_mapping[$content_type]]);
$this->transport = new \fpf\thrift\TBufferedTransport($this->socket, 1024, 1024);
if (self::BINARY === $content_type) {
$strict_read = true;
$strict_write = true;
$this->protocal_factory_class = new \Thrift\Factory\TBinaryProtocolFactory($this->transport, $strict_read, $strict_write);
$protocol = new \Thrift\Protocol\TBinaryProtocol($this->transport, $strict_read, $strict_write);
} elseif (self::JSON === $content_type) {
$this->protocal_factory_class = new \Thrift\Factory\TJSONProtocolFactory();
$protocol = new \Thrift\Protocol\TJSONProtocol($this->transport);
}
$this->instance = new $class_name($protocol);
$this->request_base_params = [
'host' => $service_real['host'],
'port' => $service_real['port'],
'uri' => '/' . $service_name_space_str . '/' . $service_str . '/',
'content_type_str' => $content_type_mapping[$content_type],
'content_type' => $content_type,
];
}
private function checkService($class_name)
......@@ -98,30 +98,43 @@ class ThriftAsyncServiceFactoryProxy
return ResponseApi::arrSuccess([$tem_arr[1], $matches[1]]);
}
public function __call($name, $arguments)
public function __call($func_name, $arguments)
{
$this->transport->open();
$this->socket->setFuncUri($name);
$ret = call_user_func_array([$this->instance, $name], $arguments);
$reflection = new \ReflectionMethod($this->tem_class . 'If', $name);
$socket = new TGuzzleClient($this->request_base_params['host'], $this->request_base_params['port'], $this->request_base_params['uri']);
$socket->addHeaders(['Content-type' => $this->request_base_params['content_type_str']]);
$transport = new \fpf\thrift\TBufferedTransport($socket, 1024, 1024);
if (self::BINARY === $this->request_base_params['content_type']) {
$strict_read = true;
$strict_write = true;
$this->protocol_factory_instanse = new \Thrift\Factory\TBinaryProtocolFactory($transport, $strict_read, $strict_write);
$protocol = new \Thrift\Protocol\TBinaryProtocol($transport, $strict_read, $strict_write);
} elseif (self::JSON === $this->request_base_params['content_type']) {
$this->protocol_factory_instanse = new \Thrift\Factory\TJSONProtocolFactory();
$protocol = new \Thrift\Protocol\TJSONProtocol($transport);
}
$this->instance = new $this->class_name($protocol);
$transport->open();
$socket->setFuncUri($func_name);
// $ret = call_user_func_array([$this->instance, $func_name], $arguments);
$reflection = new \ReflectionMethod($this->tem_class_name . 'If', $func_name);
$params = array_map(function ($p) {
return $p->name;
}, $reflection->getParameters());
$args_class_name = $this->tem_class . '_' . $name . '_args';
$args_class_name = $this->tem_class_name . '_' . $func_name . '_args';
$args_obj = new $args_class_name(array_combine($params, $arguments));
$protocol = $this->protocal_factory_class->getProtocol($this->transport);
$protocol = $this->protocol_factory_instanse->getProtocol($transport);
$bin_accel = ($protocol instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
if ($bin_accel) {
thrift_protocol_write_binary($protocol, $name, TMessageType::CALL, $argsObj, 0, $protocol->isStrictWrite());
thrift_protocol_write_binary($protocol, $func_name, TMessageType::CALL, $argsObj, 0, $protocol->isStrictWrite());
} else {
$protocol->writeMessageBegin($name, TMessageType::CALL, 0);
$protocol->writeMessageBegin($func_name, TMessageType::CALL, 0);
$args_obj->write($protocol);
$protocol->writeMessageEnd();
}
$read_buffer = $this->transport->readWBuf(1024);
var_dump(TStringFuncFactory::create()->strlen($read_buffer));
exit;
// $this->transport->close();
return $promise;
$read_buffer = $transport->readWBuf(1024);
$socket->write($read_buffer);
$promise = $socket->flush();
$transport->close();
return new ThriftAsyncDetailInstanse($promise, $protocol, $this->tem_class_name, $func_name);
}
}
\ No newline at end of file
......@@ -48,11 +48,13 @@ class ThriftServiceFactoryProxy
self::BINARY => 'application/thrift-binary',
self::JSON => 'application/thrift-json',
];
$this->socket = new TGuzzleClient($service_real['host'], $service_real['port'], '/' . $service_name_space_str . '/' . $service_str . '/');
$this->socket = new TCurlClient($service_real['host'], $service_real['port'], '/' . $service_name_space_str . '/' . $service_str . '/');
$this->socket->addHeaders(['Content-type' => $content_type_mapping[$content_type]]);
$this->transport = new \Thrift\Transport\TBufferedTransport($this->socket, 1024, 1024);
if (self::BINARY === $content_type) {
$protocol = new \Thrift\Protocol\TBinaryProtocol($this->transport);
$strict_read = true;
$strict_write = true;
$protocol = new \Thrift\Protocol\TBinaryProtocol($this->transport, $strict_read, $strict_write);
} elseif (self::JSON === $content_type) {
$protocol = new \Thrift\Protocol\TJSONProtocol($this->transport);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment