Commit c429db10 authored by tufengqi's avatar tufengqi

异步逻辑修改

parent e181f790
<?php
namespace fpf\thrift;
use Yii;
use fpf\response\ResponseApi;
use fpf\response\BaseConstant;
use Thrift\Protocol\TBinaryProtocolAccelerated;
use Thrift\Transport\TMemoryBuffer;
use Thrift\Type\TMessageType;
use Thrift\Exception\TApplicationException;
class ThriftAsyncServiceFactoryProxy
{
private $instance = null;
private $transport = null;
private $socket = null;
const BINARY = 0;
const JSON = 1;
/**
* @var \Thrift\Factory\TBinaryProtocolFactory|\Thrift\Factory\TJSONProtocolFactory
*/
private $protocal_factory_class;
public function __construct($class_name)
{
$service_default_protocol = Yii::$app->fpf->getConfig('service_default_protocol', 'thrift_service');
$service_default_hosts = Yii::$app->fpf->getConfig('service_default_hosts', 'thrift_service');
$service_default_failure_policy = Yii::$app->fpf->getConfig('service_default_failure_policy', 'thrift_service');
$service_module_hosts = Yii::$app->fpf->getConfig('service_module_hosts', 'thrift_service');
$check_ret = $this->checkService($class_name);
if (true === $check_ret[BaseConstant::ERROR]) {
throw new \Exception($check_ret[BaseConstant::MSG]);
}
$service_check_arr = $check_ret[BaseConstant::MSG];
$service_name_space_str = $service_check_arr[0];
$service_str = $service_check_arr[1];
if (empty($service_module_hosts[$service_name_space_str . '-' . $service_str])) {
$service_config = $service_default_hosts;
} else {
$service_config = $service_module_hosts[$service_name_space_str . '-' . $service_str];
}
$count_service = count($service_config);
if ($count_service > 1) {
$service_index = rand(0, $count_service - 1);
} else {
$service_index = 0;
}
$service_real = $service_config[$service_index];
if (isset($service_real['protocol'])) {
$content_type = $service_real['protocol'];
} else {
$content_type = $service_default_protocol;
}
$content_type_mapping = [
self::BINARY => 'application/thrift-binary',
self::JSON => 'application/thrift-json',
];
$this->socket = new TGuzzleClient($service_real['host'], $service_real['port'], '/' . $service_name_space_str . '/' . $service_str . '/');
$this->socket->addHeaders(['Content-type' => $content_type_mapping[$content_type]]);
$this->transport = new \Thrift\Transport\TBufferedTransport($this->socket, 1024, 1024);
if (self::BINARY === $content_type) {
$strict_read = true;
$strict_write = true;
$this->protocal_factory_class = new \Thrift\Factory\TBinaryProtocolFactory($this->transport, $strict_read, $strict_write);
$protocol = new \Thrift\Protocol\TBinaryProtocol($this->transport, $strict_read, $strict_write);
} elseif (self::JSON === $content_type) {
$this->protocal_factory_class = new \Thrift\Factory\TJSONProtocolFactory();
$protocol = new \Thrift\Protocol\TJSONProtocol($this->transport);
}
$this->instance = new $class_name($protocol);
}
private function checkService($class_name)
{
$tem_arr = explode('\\', trim($class_name, '\\'));
$error = '';
if (3 !== count($tem_arr)) {
$error = 'service split must be have 3 items';
goto doEnd;
}
if ('service' !== $tem_arr[0]) {
$error = 'service first spit item must be \'service\'';
goto doEnd;
}
if (!preg_match("/^([a-zA-Z0-9]+)Client$/", $tem_arr[2], $matches)) {
$error = 'service must ending with \'Client\'';
goto doEnd;
}
doEnd :
if ($error) {
return ResponseApi::arrFail($error);
}
return ResponseApi::arrSuccess([$tem_arr[1], $matches[1]]);
}
public function __call($name, $arguments)
{
$this->transport->open();
$this->socket->setFuncUri($name);
$ret = call_user_func_array([$this->instance, $name], $arguments);
$reflection = new \ReflectionMethod($this->tem_class . 'If', $name);
$params = array_map(function ($p) {
return $p->name;
}, $reflection->getParameters());
$args_class_name = $this->tem_class . '_' . $name . '_args';
$args_obj = new $args_class_name(array_combine($params, $arguments));
$protocol = $this->protocal_factory_class->getProtocol($this->transport);
$bin_accel = ($protocol instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
if ($bin_accel) {
thrift_protocol_write_binary($protocol, $name, TMessageType::CALL, $argsObj, 0, $protocol->isStrictWrite());
} else {
$protocol->writeMessageBegin($name, TMessageType::CALL, 0);
$args_obj->write($protocol);
$protocol->writeMessageEnd();
}
$promise = $this->transport->async();
$promise = $promise->then(function ($response) use ($name) {
$body = $response->getBody();
$rseqid = 0;
$fname = null;
$mtype = 0;
$input = $this->protocal_factory_class->getProtocol(new TMemoryBuffer($body));
$input->readMessageBegin($fname, $mtype, $rseqid);
if ($mtype == TMessageType::EXCEPTION) {
$x = new TApplicationException();
$x->read($input);
$input->readMessageEnd();
throw $x;
}
$result_class = $this->tem_class . '_' . $name . '_result';
$result = new $result_class;
$result->read($input);
$input->readMessageEnd();
if ($result->success !== null) {
return $result->success;
} else {
return false;
}
});
// $this->transport->close();
return $promise;
}
}
\ No newline at end of file
......@@ -2,10 +2,13 @@
namespace fpf\thrift;
use fpf\thrift\ThriftAsyncServiceFactoryProxy;
class ThriftServiceFactory
{
private static $sync_services = [];
private static $async_services = [];
public static function getService($class_name)
{
......@@ -15,4 +18,12 @@ class ThriftServiceFactory
self::$sync_services[$class_name] = new ThriftServiceFactoryProxy($class_name);
return self::$sync_services[$class_name];
}
public static function getAsyncService($class_name)
{
if (!empty(self::$async_services[$class_name])) {
return self::$async_services[$class_name];
}
self::$async_services[$class_name] = new ThriftAsyncServiceFactoryProxy($class_name);
return self::$async_services[$class_name];
}
}
\ No newline at end of file
......@@ -11,6 +11,7 @@ class ThriftServiceFactoryProxy
private $instance = null;
private $transport = null;
private $socket = null;
private $tem_class = '';
const BINARY = 0;
const JSON = 1;
......@@ -27,6 +28,7 @@ class ThriftServiceFactoryProxy
$service_check_arr = $check_ret[BaseConstant::MSG];
$service_name_space_str = $service_check_arr[0];
$service_str = $service_check_arr[1];
$this->tem_class = '\\service\\' . $$service_name_space_str . '\\' . $service_str;
if (empty($service_module_hosts[$service_name_space_str . '-' . $service_str])) {
$service_config = $service_default_hosts;
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment