Commit 84e7dbe9 authored by tufengqi's avatar tufengqi

finish

parents
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* ClassLoader to load Thrift library and definitions
* Inspired from UniversalClassLoader from Symfony 2
*
* @package thrift.classloader
*/
namespace Thrift\ClassLoader;
class ThriftClassLoader
{
/**
* Namespaces path
* @var array
*/
protected $namespaces = array();
/**
* Thrift definition paths
* @var type
*/
protected $definitions = array();
/**
* Do we use APC cache ?
* @var boolean
*/
protected $apc = false;
/**
* APC Cache prefix
* @var string
*/
protected $apc_prefix;
/**
* Set autoloader to use APC cache
* @param boolean $apc
* @param string $apc_prefix
*/
public function __construct($apc = false, $apc_prefix = null)
{
$this->apc = $apc;
$this->apc_prefix = $apc_prefix;
}
/**
* Registers a namespace.
*
* @param string $namespace The namespace
* @param array|string $paths The location(s) of the namespace
*/
public function registerNamespace($namespace, $paths)
{
$this->namespaces[$namespace] = (array)$paths;
}
/**
* Registers a Thrift definition namespace.
*
* @param string $namespace The definition namespace
* @param array|string $paths The location(s) of the definition namespace
*/
public function registerDefinition($namespace, $paths)
{
$this->definitions[$namespace] = (array)$paths;
}
/**
* Registers this instance as an autoloader.
*
* @param Boolean $prepend Whether to prepend the autoloader or not
*/
public function register($prepend = false)
{
echo 111;
spl_autoload_register(array($this, 'loadClass'), true, $prepend);
}
/**
* Loads the given class, definition or interface.
*
* @param string $class The name of the class
*/
public function loadClass($class)
{
echo 222;
var_dump($this->apc);
if ((true === $this->apc && ($file = $this->findFileInApc($class))) or
($file = $this->findFile($class))
) {
echo 333;
require_once $file;
}
}
/**
* Loads the given class or interface in APC.
* @param string $class The name of the class
* @return string
*/
protected function findFileInApc($class)
{
if (false === $file = apc_fetch($this->apc_prefix . $class)) {
apc_store($this->apc_prefix . $class, $file = $this->findFile($class));
}
return $file;
}
/**
* Find class in namespaces or definitions directories
* @param string $class
* @return string
*/
public function findFile($class)
{
var_dump($class);
// Remove first backslash
if ('\\' == $class[0]) {
$class = substr($class, 1);
}
if (false !== $pos = strrpos($class, '\\')) {
// Namespaced class name
$namespace = substr($class, 0, $pos);
// Iterate in normal namespaces
foreach ($this->namespaces as $ns => $dirs) {
//Don't interfere with other autoloaders
if (0 !== strpos($namespace, $ns)) {
continue;
}
foreach ($dirs as $dir) {
$className = substr($class, $pos + 1);
var_dump($namespace);
var_dump(str_replace('\\', DIRECTORY_SEPARATOR, $namespace));
$file = $dir . DIRECTORY_SEPARATOR .
str_replace('\\', DIRECTORY_SEPARATOR, $namespace) .
DIRECTORY_SEPARATOR .
$className . '.php';
var_dump($file);
if (file_exists($file)) {
return $file;
}
}
}
// Iterate in Thrift namespaces
// Remove first part of namespace
$m = explode('\\', $class);
var_dump($m);
// Ignore wrong call
if (count($m) <= 1) {
return;
}
$class = array_pop($m);
$namespace = implode('\\', $m);
var_dump($this->definitions);
foreach ($this->definitions as $ns => $dirs) {
//Don't interfere with other autoloaders
if (0 !== strpos($namespace, $ns)) {
continue;
}
foreach ($dirs as $dir) {
/**
* Available in service: Interface, Client, Processor, Rest
* And every service methods (_.+)
*/
if (0 === preg_match('#(.+)(if|client|processor|rest)$#i', $class, $n) and
0 === preg_match('#(.+)_[a-z0-9]+_(args|result)$#i', $class, $n)
) {
$className = 'Types';
} else {
$className = $n[1];
}
$file = $dir . DIRECTORY_SEPARATOR .
str_replace('\\', DIRECTORY_SEPARATOR, $namespace) .
DIRECTORY_SEPARATOR .
$className . '.php';
var_dump($file);
if (file_exists($file)) {
return $file;
}
}
}
}
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift
*/
namespace Thrift\Exception;
use Thrift\Type\TType;
class TApplicationException extends TException
{
static public $_TSPEC =
array(1 => array('var' => 'message',
'type' => TType::STRING),
2 => array('var' => 'code',
'type' => TType::I32));
const UNKNOWN = 0;
const UNKNOWN_METHOD = 1;
const INVALID_MESSAGE_TYPE = 2;
const WRONG_METHOD_NAME = 3;
const BAD_SEQUENCE_ID = 4;
const MISSING_RESULT = 5;
const INTERNAL_ERROR = 6;
const PROTOCOL_ERROR = 7;
const INVALID_TRANSFORM = 8;
const INVALID_PROTOCOL = 9;
const UNSUPPORTED_CLIENT_TYPE = 10;
public function __construct($message = null, $code = 0)
{
parent::__construct($message, $code);
}
public function read($output)
{
return $this->_read('TApplicationException', self::$_TSPEC, $output);
}
public function write($output)
{
$xfer = 0;
$xfer += $output->writeStructBegin('TApplicationException');
if ($message = $this->getMessage()) {
$xfer += $output->writeFieldBegin('message', TType::STRING, 1);
$xfer += $output->writeString($message);
$xfer += $output->writeFieldEnd();
}
if ($code = $this->getCode()) {
$xfer += $output->writeFieldBegin('type', TType::I32, 2);
$xfer += $output->writeI32($code);
$xfer += $output->writeFieldEnd();
}
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
}
}
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
* @author: rmarin (marin.radu@facebook.com)
*/
namespace Thrift\Exception;
/**
* Protocol module. Contains all the types and definitions needed to implement
* a protocol encoder/decoder.
*
* @package thrift.protocol
*/
/**
* Protocol exceptions
*/
class TProtocolException extends TException
{
const UNKNOWN = 0;
const INVALID_DATA = 1;
const NEGATIVE_SIZE = 2;
const SIZE_LIMIT = 3;
const BAD_VERSION = 4;
const NOT_IMPLEMENTED = 5;
const DEPTH_LIMIT = 6;
public function __construct($message = null, $code = 0)
{
parent::__construct($message, $code);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Exception;
/**
* Transport exceptions
*/
class TTransportException extends TException
{
const UNKNOWN = 0;
const NOT_OPEN = 1;
const ALREADY_OPEN = 2;
const TIMED_OUT = 3;
const END_OF_FILE = 4;
public function __construct($message = null, $code = 0)
{
parent::__construct($message, $code);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Factory;
use Thrift\Protocol\TBinaryProtocol;
/**
* Binary Protocol Factory
*/
class TBinaryProtocolFactory implements TProtocolFactory
{
private $strictRead_ = false;
private $strictWrite_ = false;
public function __construct($strictRead = false, $strictWrite = false)
{
$this->strictRead_ = $strictRead;
$this->strictWrite_ = $strictWrite;
}
public function getProtocol($trans)
{
return new TBinaryProtocol($trans, $this->strictRead_, $this->strictWrite_);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Factory;
use Thrift\Protocol\TCompactProtocol;
/**
* Compact Protocol Factory
*/
class TCompactProtocolFactory implements TProtocolFactory
{
public function __construct()
{
}
public function getProtocol($trans)
{
return new TCompactProtocol($trans);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Factory;
use Thrift\Protocol\TJSONProtocol;
/**
* JSON Protocol Factory
*/
class TJSONProtocolFactory implements TProtocolFactory
{
public function __construct()
{
}
public function getProtocol($trans)
{
return new TJSONProtocol($trans);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Factory;
/**
* Protocol factory creates protocol objects from transports
*/
interface TProtocolFactory
{
/**
* Build a protocol from the base transport
*
* @return Thrift\Protocol\TProtocol protocol
*/
public function getProtocol($trans);
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
namespace Thrift\Factory;
use Thrift\StringFunc\Core;
use Thrift\StringFunc\Mbstring;
use Thrift\StringFunc\TStringFunc;
class TStringFuncFactory
{
private static $_instance;
/**
* Get the Singleton instance of TStringFunc implementation that is
* compatible with the current system's mbstring.func_overload settings.
*
* @return TStringFunc
*/
public static function create()
{
if (!self::$_instance) {
self::_setInstance();
}
return self::$_instance;
}
private static function _setInstance()
{
/**
* Cannot use str* functions for byte counting because multibyte
* characters will be read a single bytes.
*
* See: http://php.net/manual/en/mbstring.overload.php
*/
if (ini_get('mbstring.func_overload') & 2) {
self::$_instance = new Mbstring();
} else {
/**
* mbstring is not installed or does not have function overloading
* of the str* functions enabled so use PHP core str* functions for
* byte counting.
*/
self::$_instance = new Core();
}
}
}
<?php
namespace Thrift\Factory;
use Thrift\Transport\TTransport;
class TTransportFactory
{
/**
* @static
* @param TTransport $transport
* @return TTransport
*/
public static function getTransport(TTransport $transport)
{
return $transport;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\JSON;
class BaseContext
{
public function escapeNum()
{
return false;
}
public function write()
{
}
public function read()
{
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\JSON;
use Thrift\Protocol\TJSONProtocol;
class ListContext extends BaseContext
{
private $first_ = true;
private $p_;
public function __construct($p)
{
$this->p_ = $p;
}
public function write()
{
if ($this->first_) {
$this->first_ = false;
} else {
$this->p_->getTransport()->write(TJSONProtocol::COMMA);
}
}
public function read()
{
if ($this->first_) {
$this->first_ = false;
} else {
$this->p_->readJSONSyntaxChar(TJSONProtocol::COMMA);
}
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\JSON;
class LookaheadReader
{
private $hasData_ = false;
private $data_ = array();
private $p_;
public function __construct($p)
{
$this->p_ = $p;
}
public function read()
{
if ($this->hasData_) {
$this->hasData_ = false;
} else {
$this->data_ = $this->p_->getTransport()->readAll(1);
}
return substr($this->data_, 0, 1);
}
public function peek()
{
if (!$this->hasData_) {
$this->data_ = $this->p_->getTransport()->readAll(1);
}
$this->hasData_ = true;
return substr($this->data_, 0, 1);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\JSON;
use Thrift\Protocol\TJSONProtocol;
class PairContext extends BaseContext
{
private $first_ = true;
private $colon_ = true;
private $p_ = null;
public function __construct($p)
{
$this->p_ = $p;
}
public function write()
{
if ($this->first_) {
$this->first_ = false;
$this->colon_ = true;
} else {
$this->p_->getTransport()->write($this->colon_ ? TJSONProtocol::COLON : TJSONProtocol::COMMA);
$this->colon_ = !$this->colon_;
}
}
public function read()
{
if ($this->first_) {
$this->first_ = false;
$this->colon_ = true;
} else {
$this->p_->readJSONSyntaxChar($this->colon_ ? TJSONProtocol::COLON : TJSONProtocol::COMMA);
$this->colon_ = !$this->colon_;
}
}
public function escapeNum()
{
return $this->colon_;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\SimpleJSON;
use Thrift\Exception\TException;
class CollectionMapKeyException extends TException
{
public function __construct($message)
{
parent::__construct($message);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\SimpleJSON;
class Context
{
public function write()
{
}
public function isMapKey()
{
return false;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\SimpleJSON;
use Thrift\Protocol\TSimpleJSONProtocol;
class ListContext extends Context
{
protected $first_ = true;
private $p_;
public function __construct($p)
{
$this->p_ = $p;
}
public function write()
{
if ($this->first_) {
$this->first_ = false;
} else {
$this->p_->getTransport()->write(TSimpleJSONProtocol::COMMA);
}
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\SimpleJSON;
class MapContext extends StructContext
{
protected $isKey = true;
private $p_;
public function __construct($p)
{
parent::__construct($p);
}
public function write()
{
parent::write();
$this->isKey = !$this->isKey;
}
public function isMapKey()
{
// we want to coerce map keys to json strings regardless
// of their type
return $this->isKey;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol\SimpleJSON;
use Thrift\Protocol\TSimpleJSONProtocol;
class StructContext extends Context
{
protected $first_ = true;
protected $colon_ = true;
private $p_;
public function __construct($p)
{
$this->p_ = $p;
}
public function write()
{
if ($this->first_) {
$this->first_ = false;
$this->colon_ = true;
} else {
$this->p_->getTransport()->write(
$this->colon_ ?
TSimpleJSONProtocol::COLON :
TSimpleJSONProtocol::COMMA
);
$this->colon_ = !$this->colon_;
}
}
}
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol;
use Thrift\Transport\TBufferedTransport;
/**
* Accelerated binary protocol: used in conjunction with the thrift_protocol
* extension for faster deserialization
*/
class TBinaryProtocolAccelerated extends TBinaryProtocol
{
public function __construct($trans, $strictRead = false, $strictWrite = true)
{
// If the transport doesn't implement putBack, wrap it in a
// TBufferedTransport (which does)
// NOTE (t.heintz): This is very evil to do, because the TBufferedTransport may swallow bytes, which
// are then never written to the underlying transport. This happens precisely when a number of bytes
// less than the max buffer size (512 by default) is written to the transport and then flush() is NOT
// called. In that case the data stays in the writeBuffer of the transport, from where it can never be
// accessed again (for example through read()).
//
// Since the caller of this method does not know about the wrapping transport, this creates bugs which
// are very difficult to find. Hence the wrapping of a transport in a buffer should be left to the
// calling code. An interface could used to mandate the presence of the putBack() method in the transport.
//
// I am leaving this code in nonetheless, because there may be applications depending on this behavior.
//
// @see THRIFT-1579
if (!method_exists($trans, 'putBack')) {
$trans = new TBufferedTransport($trans);
}
parent::__construct($trans, $strictRead, $strictWrite);
}
public function isStrictRead()
{
return $this->strictRead_;
}
public function isStrictWrite()
{
return $this->strictWrite_;
}
}
This diff is collapsed.
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol;
use Thrift\Type\TMessageType;
/**
* <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
* that allows a Thrift client to communicate with a multiplexing Thrift server,
* by prepending the service name to the function name during function calls.
*
* @package Thrift\Protocol
*/
class TMultiplexedProtocol extends TProtocolDecorator
{
/**
* Separator between service name and function name.
* Should be the same as used at multiplexed Thrift server.
*
* @var string
*/
const SEPARATOR = ":";
/**
* The name of service.
*
* @var string
*/
private $serviceName_;
/**
* Constructor of <code>TMultiplexedProtocol</code> class.
*
* Wrap the specified protocol, allowing it to be used to communicate with a
* multiplexing server. The <code>$serviceName</code> is required as it is
* prepended to the message header so that the multiplexing server can broker
* the function call to the proper service.
*
* @param TProtocol $protocol
* @param string $serviceName The name of service.
*/
public function __construct(TProtocol $protocol, $serviceName)
{
parent::__construct($protocol);
$this->serviceName_ = $serviceName;
}
/**
* Writes the message header.
* Prepends the service name to the function name, separated by <code>TMultiplexedProtocol::SEPARATOR</code>.
*
* @param string $name Function name.
* @param int $type Message type.
* @param int $seqid The sequence id of this message.
*/
public function writeMessageBegin($name, $type, $seqid)
{
if ($type == TMessageType::CALL || $type == TMessageType::ONEWAY) {
$nameWithService = $this->serviceName_ . self::SEPARATOR . $name;
parent::writeMessageBegin($nameWithService, $type, $seqid);
} else {
parent::writeMessageBegin($name, $type, $seqid);
}
}
}
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol;
use Thrift\Exception\TException;
/**
* <code>TProtocolDecorator</code> forwards all requests to an enclosed
* <code>TProtocol</code> instance, providing a way to author concise
* concrete decorator subclasses. While it has no abstract methods, it
* is marked abstract as a reminder that by itself, it does not modify
* the behaviour of the enclosed <code>TProtocol</code>.
*
* @package Thrift\Protocol
*/
abstract class TProtocolDecorator extends TProtocol
{
/**
* Instance of protocol, to which all operations will be forwarded.
*
* @var TProtocol
*/
private $concreteProtocol_;
/**
* Constructor of <code>TProtocolDecorator</code> class.
* Encloses the specified protocol.
*
* @param TProtocol $protocol All operations will be forward to this instance. Must be non-null.
*/
protected function __construct(TProtocol $protocol)
{
parent::__construct($protocol->getTransport());
$this->concreteProtocol_ = $protocol;
}
/**
* Writes the message header.
*
* @param string $name Function name
* @param int $type message type TMessageType::CALL or TMessageType::REPLY
* @param int $seqid The sequence id of this message
*/
public function writeMessageBegin($name, $type, $seqid)
{
return $this->concreteProtocol_->writeMessageBegin($name, $type, $seqid);
}
/**
* Closes the message.
*/
public function writeMessageEnd()
{
return $this->concreteProtocol_->writeMessageEnd();
}
/**
* Writes a struct header.
*
* @param string $name Struct name
*
* @throws TException on write error
* @return int How many bytes written
*/
public function writeStructBegin($name)
{
return $this->concreteProtocol_->writeStructBegin($name);
}
/**
* Close a struct.
*
* @throws TException on write error
* @return int How many bytes written
*/
public function writeStructEnd()
{
return $this->concreteProtocol_->writeStructEnd();
}
public function writeFieldBegin($fieldName, $fieldType, $fieldId)
{
return $this->concreteProtocol_->writeFieldBegin($fieldName, $fieldType, $fieldId);
}
public function writeFieldEnd()
{
return $this->concreteProtocol_->writeFieldEnd();
}
public function writeFieldStop()
{
return $this->concreteProtocol_->writeFieldStop();
}
public function writeMapBegin($keyType, $valType, $size)
{
return $this->concreteProtocol_->writeMapBegin($keyType, $valType, $size);
}
public function writeMapEnd()
{
return $this->concreteProtocol_->writeMapEnd();
}
public function writeListBegin($elemType, $size)
{
return $this->concreteProtocol_->writeListBegin($elemType, $size);
}
public function writeListEnd()
{
return $this->concreteProtocol_->writeListEnd();
}
public function writeSetBegin($elemType, $size)
{
return $this->concreteProtocol_->writeSetBegin($elemType, $size);
}
public function writeSetEnd()
{
return $this->concreteProtocol_->writeSetEnd();
}
public function writeBool($bool)
{
return $this->concreteProtocol_->writeBool($bool);
}
public function writeByte($byte)
{
return $this->concreteProtocol_->writeByte($byte);
}
public function writeI16($i16)
{
return $this->concreteProtocol_->writeI16($i16);
}
public function writeI32($i32)
{
return $this->concreteProtocol_->writeI32($i32);
}
public function writeI64($i64)
{
return $this->concreteProtocol_->writeI64($i64);
}
public function writeDouble($dub)
{
return $this->concreteProtocol_->writeDouble($dub);
}
public function writeString($str)
{
return $this->concreteProtocol_->writeString($str);
}
/**
* Reads the message header
*
* @param string $name Function name
* @param int $type message type TMessageType::CALL or TMessageType::REPLY
* @param int $seqid The sequence id of this message
*/
public function readMessageBegin(&$name, &$type, &$seqid)
{
return $this->concreteProtocol_->readMessageBegin($name, $type, $seqid);
}
/**
* Read the close of message
*/
public function readMessageEnd()
{
return $this->concreteProtocol_->readMessageEnd();
}
public function readStructBegin(&$name)
{
return $this->concreteProtocol_->readStructBegin($name);
}
public function readStructEnd()
{
return $this->concreteProtocol_->readStructEnd();
}
public function readFieldBegin(&$name, &$fieldType, &$fieldId)
{
return $this->concreteProtocol_->readFieldBegin($name, $fieldType, $fieldId);
}
public function readFieldEnd()
{
return $this->concreteProtocol_->readFieldEnd();
}
public function readMapBegin(&$keyType, &$valType, &$size)
{
$this->concreteProtocol_->readMapBegin($keyType, $valType, $size);
}
public function readMapEnd()
{
return $this->concreteProtocol_->readMapEnd();
}
public function readListBegin(&$elemType, &$size)
{
$this->concreteProtocol_->readListBegin($elemType, $size);
}
public function readListEnd()
{
return $this->concreteProtocol_->readListEnd();
}
public function readSetBegin(&$elemType, &$size)
{
return $this->concreteProtocol_->readSetBegin($elemType, $size);
}
public function readSetEnd()
{
return $this->concreteProtocol_->readSetEnd();
}
public function readBool(&$bool)
{
return $this->concreteProtocol_->readBool($bool);
}
public function readByte(&$byte)
{
return $this->concreteProtocol_->readByte($byte);
}
public function readI16(&$i16)
{
return $this->concreteProtocol_->readI16($i16);
}
public function readI32(&$i32)
{
return $this->concreteProtocol_->readI32($i32);
}
public function readI64(&$i64)
{
return $this->concreteProtocol_->readI64($i64);
}
public function readDouble(&$dub)
{
return $this->concreteProtocol_->readDouble($dub);
}
public function readString(&$str)
{
return $this->concreteProtocol_->readString($str);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
*/
namespace Thrift\Protocol;
use Thrift\Exception\TException;
use Thrift\Exception\TProtocolException;
use Thrift\Protocol\SimpleJSON\Context;
use Thrift\Protocol\SimpleJSON\ListContext;
use Thrift\Protocol\SimpleJSON\StructContext;
use Thrift\Protocol\SimpleJSON\MapContext;
use Thrift\Protocol\SimpleJSON\CollectionMapKeyException;
/**
* SimpleJSON implementation of thrift protocol, ported from Java.
*/
class TSimpleJSONProtocol extends TProtocol
{
const COMMA = ',';
const COLON = ':';
const LBRACE = '{';
const RBRACE = '}';
const LBRACKET = '[';
const RBRACKET = ']';
const QUOTE = '"';
const NAME_MAP = "map";
const NAME_LIST = "lst";
const NAME_SET = "set";
protected $writeContext_ = null;
protected $writeContextStack_ = [];
/**
* Push a new write context onto the stack.
*/
protected function pushWriteContext(Context $c)
{
$this->writeContextStack_[] = $this->writeContext_;
$this->writeContext_ = $c;
}
/**
* Pop the last write context off the stack
*/
protected function popWriteContext()
{
$this->writeContext_ = array_pop($this->writeContextStack_);
}
/**
* Used to make sure that we are not encountering a map whose keys are containers
*/
protected function assertContextIsNotMapKey($invalidKeyType)
{
if ($this->writeContext_->isMapKey()) {
throw new CollectionMapKeyException(
"Cannot serialize a map with keys that are of type " .
$invalidKeyType
);
}
}
private function writeJSONString($b)
{
$this->writeContext_->write();
$this->trans_->write(json_encode((string)$b));
}
private function writeJSONInteger($num)
{
$isMapKey = $this->writeContext_->isMapKey();
$this->writeContext_->write();
if ($isMapKey) {
$this->trans_->write(self::QUOTE);
}
$this->trans_->write((int)$num);
if ($isMapKey) {
$this->trans_->write(self::QUOTE);
}
}
private function writeJSONDouble($num)
{
$isMapKey = $this->writeContext_->isMapKey();
$this->writeContext_->write();
if ($isMapKey) {
$this->trans_->write(self::QUOTE);
}
$this->trans_->write(json_encode((float)$num));
if ($isMapKey) {
$this->trans_->write(self::QUOTE);
}
}
/**
* Constructor
*/
public function __construct($trans)
{
parent::__construct($trans);
$this->writeContext_ = new Context();
}
/**
* Writes the message header
*
* @param string $name Function name
* @param int $type message type TMessageType::CALL or TMessageType::REPLY
* @param int $seqid The sequence id of this message
*/
public function writeMessageBegin($name, $type, $seqid)
{
$this->trans_->write(self::LBRACKET);
$this->pushWriteContext(new ListContext($this));
$this->writeJSONString($name);
$this->writeJSONInteger($type);
$this->writeJSONInteger($seqid);
}
/**
* Close the message
*/
public function writeMessageEnd()
{
$this->popWriteContext();
$this->trans_->write(self::RBRACKET);
}
/**
* Writes a struct header.
*
* @param string $name Struct name
*/
public function writeStructBegin($name)
{
$this->writeContext_->write();
$this->trans_->write(self::LBRACE);
$this->pushWriteContext(new StructContext($this));
}
/**
* Close a struct.
*/
public function writeStructEnd()
{
$this->popWriteContext();
$this->trans_->write(self::RBRACE);
}
public function writeFieldBegin($fieldName, $fieldType, $fieldId)
{
$this->writeJSONString($fieldName);
}
public function writeFieldEnd()
{
}
public function writeFieldStop()
{
}
public function writeMapBegin($keyType, $valType, $size)
{
$this->assertContextIsNotMapKey(self::NAME_MAP);
$this->writeContext_->write();
$this->trans_->write(self::LBRACE);
$this->pushWriteContext(new MapContext($this));
}
public function writeMapEnd()
{
$this->popWriteContext();
$this->trans_->write(self::RBRACE);
}
public function writeListBegin($elemType, $size)
{
$this->assertContextIsNotMapKey(self::NAME_LIST);
$this->writeContext_->write();
$this->trans_->write(self::LBRACKET);
$this->pushWriteContext(new ListContext($this));
// No metadata!
}
public function writeListEnd()
{
$this->popWriteContext();
$this->trans_->write(self::RBRACKET);
}
public function writeSetBegin($elemType, $size)
{
$this->assertContextIsNotMapKey(self::NAME_SET);
$this->writeContext_->write();
$this->trans_->write(self::LBRACKET);
$this->pushWriteContext(new ListContext($this));
// No metadata!
}
public function writeSetEnd()
{
$this->popWriteContext();
$this->trans_->write(self::RBRACKET);
}
public function writeBool($bool)
{
$this->writeJSONInteger($bool ? 1 : 0);
}
public function writeByte($byte)
{
$this->writeJSONInteger($byte);
}
public function writeI16($i16)
{
$this->writeJSONInteger($i16);
}
public function writeI32($i32)
{
$this->writeJSONInteger($i32);
}
public function writeI64($i64)
{
$this->writeJSONInteger($i64);
}
public function writeDouble($dub)
{
$this->writeJSONDouble($dub);
}
public function writeString($str)
{
$this->writeJSONString($str);
}
/**
* Reading methods.
*
* simplejson is not meant to be read back into thrift
* - see http://wiki.apache.org/thrift/ThriftUsageJava
* - use JSON instead
*/
public function readMessageBegin(&$name, &$type, &$seqid)
{
throw new TException("Not implemented");
}
public function readMessageEnd()
{
throw new TException("Not implemented");
}
public function readStructBegin(&$name)
{
throw new TException("Not implemented");
}
public function readStructEnd()
{
throw new TException("Not implemented");
}
public function readFieldBegin(&$name, &$fieldType, &$fieldId)
{
throw new TException("Not implemented");
}
public function readFieldEnd()
{
throw new TException("Not implemented");
}
public function readMapBegin(&$keyType, &$valType, &$size)
{
throw new TException("Not implemented");
}
public function readMapEnd()
{
throw new TException("Not implemented");
}
public function readListBegin(&$elemType, &$size)
{
throw new TException("Not implemented");
}
public function readListEnd()
{
throw new TException("Not implemented");
}
public function readSetBegin(&$elemType, &$size)
{
throw new TException("Not implemented");
}
public function readSetEnd()
{
throw new TException("Not implemented");
}
public function readBool(&$bool)
{
throw new TException("Not implemented");
}
public function readByte(&$byte)
{
throw new TException("Not implemented");
}
public function readI16(&$i16)
{
throw new TException("Not implemented");
}
public function readI32(&$i32)
{
throw new TException("Not implemented");
}
public function readI64(&$i64)
{
throw new TException("Not implemented");
}
public function readDouble(&$dub)
{
throw new TException("Not implemented");
}
public function readString(&$str)
{
throw new TException("Not implemented");
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.protocol
* @author: rmarin (marin.radu@facebook.com)
*/
namespace Thrift\Serializer;
use Thrift\Transport\TMemoryBuffer;
use Thrift\Protocol\TBinaryProtocolAccelerated;
use Thrift\Type\TMessageType;
/**
* Utility class for serializing and deserializing
* a thrift object using TBinaryProtocolAccelerated.
*/
class TBinarySerializer
{
// NOTE(rmarin): Because thrift_protocol_write_binary
// adds a begin message prefix, you cannot specify
// a transport in which to serialize an object. It has to
// be a string. Otherwise we will break the compatibility with
// normal deserialization.
public static function serialize($object)
{
$transport = new TMemoryBuffer();
$protocol = new TBinaryProtocolAccelerated($transport);
if (function_exists('thrift_protocol_write_binary')) {
thrift_protocol_write_binary(
$protocol,
$object->getName(),
TMessageType::REPLY,
$object,
0,
$protocol->isStrictWrite()
);
$protocol->readMessageBegin($unused_name, $unused_type, $unused_seqid);
} else {
$object->write($protocol);
}
$protocol->getTransport()->flush();
return $transport->getBuffer();
}
public static function deserialize($string_object, $class_name, $buffer_size = 8192)
{
$transport = new TMemoryBuffer();
$protocol = new TBinaryProtocolAccelerated($transport);
if (function_exists('thrift_protocol_read_binary')) {
// NOTE (t.heintz) TBinaryProtocolAccelerated internally wraps our TMemoryBuffer in a
// TBufferedTransport, so we have to retrieve it again or risk losing data when writing
// less than 512 bytes to the transport (see the comment there as well).
// @see THRIFT-1579
$protocol->writeMessageBegin('', TMessageType::REPLY, 0);
$protocolTransport = $protocol->getTransport();
$protocolTransport->write($string_object);
$protocolTransport->flush();
return thrift_protocol_read_binary($protocol, $class_name, $protocol->isStrictRead(), $buffer_size);
} else {
$transport->write($string_object);
$object = new $class_name();
$object->read($protocol);
return $object;
}
}
}
<?php
namespace Thrift\Server;
use Thrift\Transport\TTransport;
use Thrift\Exception\TException;
use Thrift\Exception\TTransportException;
/**
* A forking implementation of a Thrift server.
*
* @package thrift.server
*/
class TForkingServer extends TServer
{
/**
* Flag for the main serving loop
*
* @var bool
*/
private $stop_ = false;
/**
* List of children.
*
* @var array
*/
protected $children_ = array();
/**
* Listens for new client using the supplied
* transport. We fork when a new connection
* arrives.
*
* @return void
*/
public function serve()
{
$this->transport_->listen();
while (!$this->stop_) {
try {
$transport = $this->transport_->accept();
if ($transport != null) {
$pid = pcntl_fork();
if ($pid > 0) {
$this->handleParent($transport, $pid);
} elseif ($pid === 0) {
$this->handleChild($transport);
} else {
throw new TException('Failed to fork');
}
}
} catch (TTransportException $e) {
}
$this->collectChildren();
}
}
/**
* Code run by the parent
*
* @param TTransport $transport
* @param int $pid
* @return void
*/
private function handleParent(TTransport $transport, $pid)
{
$this->children_[$pid] = $transport;
}
/**
* Code run by the child.
*
* @param TTransport $transport
* @return void
*/
private function handleChild(TTransport $transport)
{
try {
$inputTransport = $this->inputTransportFactory_->getTransport($transport);
$outputTransport = $this->outputTransportFactory_->getTransport($transport);
$inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);
$outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);
while ($this->processor_->process($inputProtocol, $outputProtocol)) {
}
@$transport->close();
} catch (TTransportException $e) {
}
exit(0);
}
/**
* Collects any children we may have
*
* @return void
*/
private function collectChildren()
{
foreach ($this->children_ as $pid => $transport) {
if (pcntl_waitpid($pid, $status, WNOHANG) > 0) {
unset($this->children_[$pid]);
if ($transport) {
@$transport->close();
}
}
}
}
/**
* Stops the server running. Kills the transport
* and then stops the main serving loop
*
* @return void
*/
public function stop()
{
$this->transport_->close();
$this->stop_ = true;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
namespace Thrift\Server;
use Thrift\Transport\TSSLSocket;
/**
* Socket implementation of a server agent.
*
* @package thrift.transport
*/
class TSSLServerSocket extends TServerSocket
{
/**
* Remote port
*
* @var resource
*/
protected $context_ = null;
/**
* ServerSocket constructor
*
* @param string $host Host to listen on
* @param int $port Port to listen on
* @param resource $context Stream context
* @return void
*/
public function __construct($host = 'localhost', $port = 9090, $context = null)
{
$ssl_host = $this->getSSLHost($host);
parent::__construct($ssl_host, $port);
$this->context_ = $context;
}
public function getSSLHost($host)
{
$transport_protocol_loc = strpos($host, "://");
if ($transport_protocol_loc === false) {
$host = 'ssl://' . $host;
}
return $host;
}
/**
* Opens a new socket server handle
*
* @return void
*/
public function listen()
{
$this->listener_ = @stream_socket_server(
$this->host_ . ':' . $this->port_,
$errno,
$errstr,
STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,
$this->context_
);
}
/**
* Implementation of accept. If not client is accepted in the given time
*
* @return TSocket
*/
protected function acceptImpl()
{
$handle = @stream_socket_accept($this->listener_, $this->acceptTimeout_ / 1000.0);
if (!$handle) {
return null;
}
$socket = new TSSLSocket();
$socket->setHandle($handle);
return $socket;
}
}
<?php
namespace Thrift\Server;
use Thrift\Factory\TTransportFactory;
use Thrift\Factory\TProtocolFactory;
/**
* Generic class for a Thrift server.
*
* @package thrift.server
*/
abstract class TServer
{
/**
* Processor to handle new clients
*
* @var TProcessor
*/
protected $processor_;
/**
* Server transport to be used for listening
* and accepting new clients
*
* @var TServerTransport
*/
protected $transport_;
/**
* Input transport factory
*
* @var TTransportFactory
*/
protected $inputTransportFactory_;
/**
* Output transport factory
*
* @var TTransportFactory
*/
protected $outputTransportFactory_;
/**
* Input protocol factory
*
* @var TProtocolFactory
*/
protected $inputProtocolFactory_;
/**
* Output protocol factory
*
* @var TProtocolFactory
*/
protected $outputProtocolFactory_;
/**
* Sets up all the factories, etc
*
* @param object $processor
* @param TServerTransport $transport
* @param TTransportFactory $inputTransportFactory
* @param TTransportFactory $outputTransportFactory
* @param TProtocolFactory $inputProtocolFactory
* @param TProtocolFactory $outputProtocolFactory
* @return void
*/
public function __construct(
$processor,
TServerTransport $transport,
TTransportFactory $inputTransportFactory,
TTransportFactory $outputTransportFactory,
TProtocolFactory $inputProtocolFactory,
TProtocolFactory $outputProtocolFactory
) {
$this->processor_ = $processor;
$this->transport_ = $transport;
$this->inputTransportFactory_ = $inputTransportFactory;
$this->outputTransportFactory_ = $outputTransportFactory;
$this->inputProtocolFactory_ = $inputProtocolFactory;
$this->outputProtocolFactory_ = $outputProtocolFactory;
}
/**
* Serves the server. This should never return
* unless a problem permits it to do so or it
* is interrupted intentionally
*
* @abstract
* @return void
*/
abstract public function serve();
/**
* Stops the server serving
*
* @abstract
* @return void
*/
abstract public function stop();
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Server;
use Thrift\Transport\TSocket;
/**
* Socket implementation of a server agent.
*
* @package thrift.transport
*/
class TServerSocket extends TServerTransport
{
/**
* Handle for the listener socket
*
* @var resource
*/
protected $listener_;
/**
* Port for the listener to listen on
*
* @var int
*/
protected $port_;
/**
* Timeout when listening for a new client
*
* @var int
*/
protected $acceptTimeout_ = 30000;
/**
* Host to listen on
*
* @var string
*/
protected $host_;
/**
* ServerSocket constructor
*
* @param string $host Host to listen on
* @param int $port Port to listen on
* @return void
*/
public function __construct($host = 'localhost', $port = 9090)
{
$this->host_ = $host;
$this->port_ = $port;
}
/**
* Sets the accept timeout
*
* @param int $acceptTimeout
* @return void
*/
public function setAcceptTimeout($acceptTimeout)
{
$this->acceptTimeout_ = $acceptTimeout;
}
/**
* Opens a new socket server handle
*
* @return void
*/
public function listen()
{
$this->listener_ = stream_socket_server('tcp://' . $this->host_ . ':' . $this->port_);
}
/**
* Closes the socket server handle
*
* @return void
*/
public function close()
{
@fclose($this->listener_);
$this->listener_ = null;
}
/**
* Implementation of accept. If not client is accepted in the given time
*
* @return TSocket
*/
protected function acceptImpl()
{
$handle = @stream_socket_accept($this->listener_, $this->acceptTimeout_ / 1000.0);
if (!$handle) {
return null;
}
$socket = new TSocket();
$socket->setHandle($handle);
return $socket;
}
}
<?php
namespace Thrift\Server;
use Thrift\Exception\TTransportException;
/**
* Generic class for Server agent.
*
* @package thrift.transport
*/
abstract class TServerTransport
{
/**
* List for new clients
*
* @abstract
* @return void
*/
abstract public function listen();
/**
* Close the server
*
* @abstract
* @return void
*/
abstract public function close();
/**
* Subclasses should use this to implement
* accept.
*
* @abstract
* @return TTransport
*/
abstract protected function acceptImpl();
/**
* Uses the accept implemtation. If null is returned, an
* exception is thrown.
*
* @throws TTransportException
* @return TTransport
*/
public function accept()
{
$transport = $this->acceptImpl();
if ($transport == null) {
throw new TTransportException("accept() may not return NULL");
}
return $transport;
}
}
<?php
namespace Thrift\Server;
use Thrift\Exception\TTransportException;
/**
* Simple implemtation of a Thrift server.
*
* @package thrift.server
*/
class TSimpleServer extends TServer
{
/**
* Flag for the main serving loop
*
* @var bool
*/
private $stop_ = false;
/**
* Listens for new client using the supplied
* transport. It handles TTransportExceptions
* to avoid timeouts etc killing it
*
* @return void
*/
public function serve()
{
$this->transport_->listen();
while (!$this->stop_) {
try {
$transport = $this->transport_->accept();
if ($transport != null) {
$inputTransport = $this->inputTransportFactory_->getTransport($transport);
$outputTransport = $this->outputTransportFactory_->getTransport($transport);
$inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);
$outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);
while ($this->processor_->process($inputProtocol, $outputProtocol)) {
}
}
} catch (TTransportException $e) {
}
}
}
/**
* Stops the server running. Kills the transport
* and then stops the main serving loop
*
* @return void
*/
public function stop()
{
$this->transport_->close();
$this->stop_ = true;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.processor
*/
namespace Thrift;
use Thrift\Protocol\TProtocol;
use Thrift\Protocol\TProtocolDecorator;
/**
* Our goal was to work with any protocol. In order to do that, we needed
* to allow them to call readMessageBegin() and get the Message in exactly
* the standard format, without the service name prepended to the Message name.
*/
class StoredMessageProtocol extends TProtocolDecorator
{
private $fname_;
private $mtype_;
private $rseqid_;
public function __construct(TProtocol $protocol, $fname, $mtype, $rseqid)
{
parent::__construct($protocol);
$this->fname_ = $fname;
$this->mtype_ = $mtype;
$this->rseqid_ = $rseqid;
}
public function readMessageBegin(&$name, &$type, &$seqid)
{
$name = $this->fname_;
$type = $this->mtype_;
$seqid = $this->rseqid_;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
namespace Thrift\StringFunc;
class Core implements TStringFunc
{
public function substr($str, $start, $length = null)
{
// specifying a null $length would return an empty string
if ($length === null) {
return substr($str, $start);
}
return substr($str, $start, $length);
}
public function strlen($str)
{
return strlen($str);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
namespace Thrift\StringFunc;
class Mbstring implements TStringFunc
{
public function substr($str, $start, $length = null)
{
/**
* We need to set the charset parameter, which is the second
* optional parameter and the first optional parameter can't
* be null or false as a "magic" value because that would
* cause an empty string to be returned, so we need to
* actually calculate the proper length value.
*/
if ($length === null) {
$length = $this->strlen($str) - $start;
}
return mb_substr($str, $start, $length, '8bit');
}
public function strlen($str)
{
return mb_strlen($str, '8bit');
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
namespace Thrift\StringFunc;
interface TStringFunc
{
public function substr($str, $start, $length = null);
public function strlen($str);
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.processor
*/
namespace Thrift;
use Thrift\Exception\TException;
use Thrift\Protocol\TProtocol;
use Thrift\Protocol\TMultiplexedProtocol;
use Thrift\Type\TMessageType;
/**
* <code>TMultiplexedProcessor</code> is a Processor allowing
* a single <code>TServer</code> to provide multiple services.
*
* <p>To do so, you instantiate the processor and then register additional
* processors with it, as shown in the following example:</p>
*
* <blockquote><code>
* $processor = new TMultiplexedProcessor();
*
* processor->registerProcessor(
* "Calculator",
* new \tutorial\CalculatorProcessor(new CalculatorHandler()));
*
* processor->registerProcessor(
* "WeatherReport",
* new \tutorial\WeatherReportProcessor(new WeatherReportHandler()));
*
* $processor->process($protocol, $protocol);
* </code></blockquote>
*/
class TMultiplexedProcessor
{
private $serviceProcessorMap_;
/**
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
* allows us to broker requests to individual services by using the service
* name to select them at request time.
*
* @param serviceName Name of a service, has to be identical to the name
* declared in the Thrift IDL, e.g. "WeatherReport".
* @param processor Implementation of a service, usually referred to
* as "handlers", e.g. WeatherReportHandler implementing WeatherReport.Iface.
*/
public function registerProcessor($serviceName, $processor)
{
$this->serviceProcessorMap_[$serviceName] = $processor;
}
/**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
* <li>Read the beginning of the message.</li>
* <li>Extract the service name from the message.</li>
* <li>Using the service name to locate the appropriate processor.</li>
* <li>Dispatch to the processor, with a decorated instance of TProtocol
* that allows readMessageBegin() to return the original Message.</li>
* </ol>
*
* @throws TException If the message type is not CALL or ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map.
*/
public function process(TProtocol $input, TProtocol $output)
{
/*
Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
message header. This pulls the message "off the wire", which we'll
deal with at the end of this method.
*/
$input->readMessageBegin($fname, $mtype, $rseqid);
if ($mtype !== TMessageType::CALL && $mtype != TMessageType::ONEWAY) {
throw new TException("This should not have happened!?");
}
// Extract the service name and the new Message name.
if (strpos($fname, TMultiplexedProtocol::SEPARATOR) === false) {
throw new TException("Service name not found in message name: {$fname}. Did you " .
"forget to use a TMultiplexProtocol in your client?");
}
list($serviceName, $messageName) = explode(':', $fname, 2);
if (!array_key_exists($serviceName, $this->serviceProcessorMap_)) {
throw new TException("Service name not found: {$serviceName}. Did you forget " .
"to call registerProcessor()?");
}
// Dispatch processing to the stored processor
$processor = $this->serviceProcessorMap_[$serviceName];
return $processor->process(
new StoredMessageProtocol($input, $messageName, $mtype, $rseqid),
$output
);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* Buffered transport. Stores data to an internal buffer that it doesn't
* actually write out until flush is called. For reading, we do a greedy
* read and then serve data out of the internal buffer.
*
* @package thrift.transport
*/
class TBufferedTransport extends TTransport
{
/**
* The underlying transport
*
* @var TTransport
*/
protected $transport_;
/**
* The receive buffer size
*
* @var int
*/
protected $rBufSize_ = 512;
/**
* The write buffer size
*
* @var int
*/
protected $wBufSize_ = 512;
/**
* The write buffer.
*
* @var string
*/
protected $wBuf_ = '';
/**
* The read buffer.
*
* @var string
*/
protected $rBuf_ = '';
/**
* Constructor. Creates a buffered transport around an underlying transport
*/
public function __construct($transport, $rBufSize = 512, $wBufSize = 512)
{
$this->transport_ = $transport;
$this->rBufSize_ = $rBufSize;
$this->wBufSize_ = $wBufSize;
}
public function isOpen()
{
return $this->transport_->isOpen();
}
/**
* @inheritdoc
*
* @throws TTransportException
*/
public function open()
{
$this->transport_->open();
}
public function close()
{
$this->transport_->close();
}
public function putBack($data)
{
if (TStringFuncFactory::create()->strlen($this->rBuf_) === 0) {
$this->rBuf_ = $data;
} else {
$this->rBuf_ = ($data . $this->rBuf_);
}
}
/**
* The reason that we customize readAll here is that the majority of PHP
* streams are already internally buffered by PHP. The socket stream, for
* example, buffers internally and blocks if you call read with $len greater
* than the amount of data available, unlike recv() in C.
*
* Therefore, use the readAll method of the wrapped transport inside
* the buffered readAll.
*
* @throws TTransportException
*/
public function readAll($len)
{
$have = TStringFuncFactory::create()->strlen($this->rBuf_);
if ($have == 0) {
$data = $this->transport_->readAll($len);
} elseif ($have < $len) {
$data = $this->rBuf_;
$this->rBuf_ = '';
$data .= $this->transport_->readAll($len - $have);
} elseif ($have == $len) {
$data = $this->rBuf_;
$this->rBuf_ = '';
} elseif ($have > $len) {
$data = TStringFuncFactory::create()->substr($this->rBuf_, 0, $len);
$this->rBuf_ = TStringFuncFactory::create()->substr($this->rBuf_, $len);
}
return $data;
}
/**
* @inheritdoc
*
* @param int $len
* @return string
* @throws TTransportException
*/
public function read($len)
{
if (TStringFuncFactory::create()->strlen($this->rBuf_) === 0) {
$this->rBuf_ = $this->transport_->read($this->rBufSize_);
}
if (TStringFuncFactory::create()->strlen($this->rBuf_) <= $len) {
$ret = $this->rBuf_;
$this->rBuf_ = '';
return $ret;
}
$ret = TStringFuncFactory::create()->substr($this->rBuf_, 0, $len);
$this->rBuf_ = TStringFuncFactory::create()->substr($this->rBuf_, $len);
return $ret;
}
/**
* @inheritdoc
*
* @param string $buf
* @throws TTransportException
*/
public function write($buf)
{
$this->wBuf_ .= $buf;
if (TStringFuncFactory::create()->strlen($this->wBuf_) >= $this->wBufSize_) {
$out = $this->wBuf_;
// Note that we clear the internal wBuf_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
$this->wBuf_ = '';
$this->transport_->write($out);
}
}
/**
* @inheritdoc
*
* @throws TTransportException
*/
public function flush()
{
if (TStringFuncFactory::create()->strlen($this->wBuf_) > 0) {
$out = $this->wBuf_;
// Note that we clear the internal wBuf_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
$this->wBuf_ = '';
$this->transport_->write($out);
}
$this->transport_->flush();
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* HTTP client for Thrift
*
* @package thrift.transport
*/
class TCurlClient extends TTransport
{
private static $curlHandle;
/**
* The host to connect to
*
* @var string
*/
protected $host_;
/**
* The port to connect on
*
* @var int
*/
protected $port_;
/**
* The URI to request
*
* @var string
*/
protected $uri_;
/**
* The scheme to use for the request, i.e. http, https
*
* @var string
*/
protected $scheme_;
/**
* Buffer for the HTTP request data
*
* @var string
*/
protected $request_;
/**
* Buffer for the HTTP response data.
*
* @var binary string
*/
protected $response_;
/**
* Read timeout
*
* @var float
*/
protected $timeout_;
/**
* http headers
*
* @var array
*/
protected $headers_;
/**
* Make a new HTTP client.
*
* @param string $host
* @param int $port
* @param string $uri
*/
public function __construct($host, $port = 80, $uri = '', $scheme = 'http')
{
if ((TStringFuncFactory::create()->strlen($uri) > 0) && ($uri{0} != '/')) {
$uri = '/' . $uri;
}
$this->scheme_ = $scheme;
$this->host_ = $host;
$this->port_ = $port;
$this->uri_ = $uri;
$this->request_ = '';
$this->response_ = null;
$this->timeout_ = null;
$this->headers_ = array();
}
/**
* Set read timeout
*
* @param float $timeout
*/
public function setTimeoutSecs($timeout)
{
$this->timeout_ = $timeout;
}
/**
* Whether this transport is open.
*
* @return boolean true if open
*/
public function isOpen()
{
return true;
}
/**
* Open the transport for reading/writing
*
* @throws TTransportException if cannot open
*/
public function open()
{
}
/**
* Close the transport.
*/
public function close()
{
$this->request_ = '';
$this->response_ = null;
}
/**
* Read some data into the array.
*
* @param int $len How much to read
* @return string The data that has been read
* @throws TTransportException if cannot read any more data
*/
public function read($len)
{
if ($len >= strlen($this->response_)) {
return $this->response_;
} else {
$ret = substr($this->response_, 0, $len);
$this->response_ = substr($this->response_, $len);
return $ret;
}
}
/**
* Writes some data into the pending buffer
*
* @param string $buf The data to write
* @throws TTransportException if writing fails
*/
public function write($buf)
{
$this->request_ .= $buf;
}
/**
* Opens and sends the actual request over the HTTP connection
*
* @throws TTransportException if a writing error occurs
*/
public function flush()
{
if (!self::$curlHandle) {
register_shutdown_function(array('Thrift\\Transport\\TCurlClient', 'closeCurlHandle'));
self::$curlHandle = curl_init();
curl_setopt(self::$curlHandle, CURLOPT_RETURNTRANSFER, true);
curl_setopt(self::$curlHandle, CURLOPT_BINARYTRANSFER, true);
curl_setopt(self::$curlHandle, CURLOPT_USERAGENT, 'PHP/TCurlClient');
curl_setopt(self::$curlHandle, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt(self::$curlHandle, CURLOPT_FOLLOWLOCATION, true);
curl_setopt(self::$curlHandle, CURLOPT_MAXREDIRS, 1);
}
// God, PHP really has some esoteric ways of doing simple things.
$host = $this->host_ . ($this->port_ != 80 ? ':' . $this->port_ : '');
$fullUrl = $this->scheme_ . "://" . $host . $this->uri_;
$headers = array();
$defaultHeaders = array('Accept' => 'application/x-thrift',
'Content-Type' => 'application/x-thrift',
'Content-Length' => TStringFuncFactory::create()->strlen($this->request_));
foreach (array_merge($defaultHeaders, $this->headers_) as $key => $value) {
$headers[] = "$key: $value";
}
curl_setopt(self::$curlHandle, CURLOPT_HTTPHEADER, $headers);
if ($this->timeout_ > 0) {
curl_setopt(self::$curlHandle, CURLOPT_TIMEOUT, $this->timeout_);
}
curl_setopt(self::$curlHandle, CURLOPT_POSTFIELDS, $this->request_);
$this->request_ = '';
curl_setopt(self::$curlHandle, CURLOPT_URL, $fullUrl);
$this->response_ = curl_exec(self::$curlHandle);
// Connect failed?
if (!$this->response_) {
curl_close(self::$curlHandle);
self::$curlHandle = null;
$error = 'TCurlClient: Could not connect to ' . $fullUrl;
throw new TTransportException($error, TTransportException::NOT_OPEN);
}
}
public static function closeCurlHandle()
{
try {
if (self::$curlHandle) {
curl_close(self::$curlHandle);
self::$curlHandle = null;
}
} catch (\Exception $x) {
error_log('There was an error closing the curl handle: ' . $x->getMessage());
}
}
public function addHeaders($headers)
{
$this->headers_ = array_merge($this->headers_, $headers);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Factory\TStringFuncFactory;
/**
* Framed transport. Writes and reads data in chunks that are stamped with
* their length.
*
* @package thrift.transport
*/
class TFramedTransport extends TTransport
{
/**
* Underlying transport object.
*
* @var TTransport
*/
private $transport_;
/**
* Buffer for read data.
*
* @var string
*/
private $rBuf_;
/**
* Buffer for queued output data
*
* @var string
*/
private $wBuf_;
/**
* Whether to frame reads
*
* @var bool
*/
private $read_;
/**
* Whether to frame writes
*
* @var bool
*/
private $write_;
/**
* Constructor.
*
* @param TTransport $transport Underlying transport
*/
public function __construct($transport = null, $read = true, $write = true)
{
$this->transport_ = $transport;
$this->read_ = $read;
$this->write_ = $write;
}
public function isOpen()
{
return $this->transport_->isOpen();
}
public function open()
{
$this->transport_->open();
}
public function close()
{
$this->transport_->close();
}
/**
* Reads from the buffer. When more data is required reads another entire
* chunk and serves future reads out of that.
*
* @param int $len How much data
*/
public function read($len)
{
if (!$this->read_) {
return $this->transport_->read($len);
}
if (TStringFuncFactory::create()->strlen($this->rBuf_) === 0) {
$this->readFrame();
}
// Just return full buff
if ($len >= TStringFuncFactory::create()->strlen($this->rBuf_)) {
$out = $this->rBuf_;
$this->rBuf_ = null;
return $out;
}
// Return TStringFuncFactory::create()->substr
$out = TStringFuncFactory::create()->substr($this->rBuf_, 0, $len);
$this->rBuf_ = TStringFuncFactory::create()->substr($this->rBuf_, $len);
return $out;
}
/**
* Put previously read data back into the buffer
*
* @param string $data data to return
*/
public function putBack($data)
{
if (TStringFuncFactory::create()->strlen($this->rBuf_) === 0) {
$this->rBuf_ = $data;
} else {
$this->rBuf_ = ($data . $this->rBuf_);
}
}
/**
* Reads a chunk of data into the internal read buffer.
*/
private function readFrame()
{
$buf = $this->transport_->readAll(4);
$val = unpack('N', $buf);
$sz = $val[1];
$this->rBuf_ = $this->transport_->readAll($sz);
}
/**
* Writes some data to the pending output buffer.
*
* @param string $buf The data
* @param int $len Limit of bytes to write
*/
public function write($buf, $len = null)
{
if (!$this->write_) {
return $this->transport_->write($buf, $len);
}
if ($len !== null && $len < TStringFuncFactory::create()->strlen($buf)) {
$buf = TStringFuncFactory::create()->substr($buf, 0, $len);
}
$this->wBuf_ .= $buf;
}
/**
* Writes the output buffer to the stream in the format of a 4-byte length
* followed by the actual data.
*/
public function flush()
{
if (!$this->write_ || TStringFuncFactory::create()->strlen($this->wBuf_) == 0) {
return $this->transport_->flush();
}
$out = pack('N', TStringFuncFactory::create()->strlen($this->wBuf_));
$out .= $this->wBuf_;
// Note that we clear the internal wBuf_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
$this->wBuf_ = '';
$this->transport_->write($out);
$this->transport_->flush();
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* HTTP client for Thrift
*
* @package thrift.transport
*/
class THttpClient extends TTransport
{
/**
* The host to connect to
*
* @var string
*/
protected $host_;
/**
* The port to connect on
*
* @var int
*/
protected $port_;
/**
* The URI to request
*
* @var string
*/
protected $uri_;
/**
* The scheme to use for the request, i.e. http, https
*
* @var string
*/
protected $scheme_;
/**
* Buffer for the HTTP request data
*
* @var string
*/
protected $buf_;
/**
* Input socket stream.
*
* @var resource
*/
protected $handle_;
/**
* Read timeout
*
* @var float
*/
protected $timeout_;
/**
* http headers
*
* @var array
*/
protected $headers_;
/**
* Make a new HTTP client.
*
* @param string $host
* @param int $port
* @param string $uri
*/
public function __construct($host, $port = 80, $uri = '', $scheme = 'http')
{
if ((TStringFuncFactory::create()->strlen($uri) > 0) && ($uri{0} != '/')) {
$uri = '/' . $uri;
}
$this->scheme_ = $scheme;
$this->host_ = $host;
$this->port_ = $port;
$this->uri_ = $uri;
$this->buf_ = '';
$this->handle_ = null;
$this->timeout_ = null;
$this->headers_ = array();
}
/**
* Set read timeout
*
* @param float $timeout
*/
public function setTimeoutSecs($timeout)
{
$this->timeout_ = $timeout;
}
/**
* Whether this transport is open.
*
* @return boolean true if open
*/
public function isOpen()
{
return true;
}
/**
* Open the transport for reading/writing
*
* @throws TTransportException if cannot open
*/
public function open()
{
}
/**
* Close the transport.
*/
public function close()
{
if ($this->handle_) {
@fclose($this->handle_);
$this->handle_ = null;
}
}
/**
* Read some data into the array.
*
* @param int $len How much to read
* @return string The data that has been read
* @throws TTransportException if cannot read any more data
*/
public function read($len)
{
$data = @fread($this->handle_, $len);
if ($data === false || $data === '') {
$md = stream_get_meta_data($this->handle_);
if ($md['timed_out']) {
throw new TTransportException(
'THttpClient: timed out reading ' . $len . ' bytes from ' .
$this->host_ . ':' . $this->port_ . $this->uri_,
TTransportException::TIMED_OUT
);
} else {
throw new TTransportException(
'THttpClient: Could not read ' . $len . ' bytes from ' .
$this->host_ . ':' . $this->port_ . $this->uri_,
TTransportException::UNKNOWN
);
}
}
return $data;
}
/**
* Writes some data into the pending buffer
*
* @param string $buf The data to write
* @throws TTransportException if writing fails
*/
public function write($buf)
{
$this->buf_ .= $buf;
}
/**
* Opens and sends the actual request over the HTTP connection
*
* @throws TTransportException if a writing error occurs
*/
public function flush()
{
// God, PHP really has some esoteric ways of doing simple things.
$host = $this->host_ . ($this->port_ != 80 ? ':' . $this->port_ : '');
$headers = array();
$defaultHeaders = array('Host' => $host,
'Accept' => 'application/x-thrift',
'User-Agent' => 'PHP/THttpClient',
'Content-Type' => 'application/x-thrift',
'Content-Length' => TStringFuncFactory::create()->strlen($this->buf_));
foreach (array_merge($defaultHeaders, $this->headers_) as $key => $value) {
$headers[] = "$key: $value";
}
$options = array('method' => 'POST',
'header' => implode("\r\n", $headers),
'max_redirects' => 1,
'content' => $this->buf_);
if ($this->timeout_ > 0) {
$options['timeout'] = $this->timeout_;
}
$this->buf_ = '';
$contextid = stream_context_create(array('http' => $options));
$this->handle_ = @fopen(
$this->scheme_ . '://' . $host . $this->uri_,
'r',
false,
$contextid
);
// Connect failed?
if ($this->handle_ === false) {
$this->handle_ = null;
$error = 'THttpClient: Could not connect to ' . $host . $this->uri_;
throw new TTransportException($error, TTransportException::NOT_OPEN);
}
}
public function addHeaders($headers)
{
$this->headers_ = array_merge($this->headers_, $headers);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* A memory buffer is a tranpsort that simply reads from and writes to an
* in-memory string buffer. Anytime you call write on it, the data is simply
* placed into a buffer, and anytime you call read, data is read from that
* buffer.
*
* @package thrift.transport
*/
class TMemoryBuffer extends TTransport
{
/**
* Constructor. Optionally pass an initial value
* for the buffer.
*/
public function __construct($buf = '')
{
$this->buf_ = $buf;
}
protected $buf_ = '';
public function isOpen()
{
return true;
}
public function open()
{
}
public function close()
{
}
public function write($buf)
{
$this->buf_ .= $buf;
}
public function read($len)
{
$bufLength = TStringFuncFactory::create()->strlen($this->buf_);
if ($bufLength === 0) {
throw new TTransportException(
'TMemoryBuffer: Could not read ' .
$len . ' bytes from buffer.',
TTransportException::UNKNOWN
);
}
if ($bufLength <= $len) {
$ret = $this->buf_;
$this->buf_ = '';
return $ret;
}
$ret = TStringFuncFactory::create()->substr($this->buf_, 0, $len);
$this->buf_ = TStringFuncFactory::create()->substr($this->buf_, $len);
return $ret;
}
public function getBuffer()
{
return $this->buf_;
}
public function available()
{
return TStringFuncFactory::create()->strlen($this->buf_);
}
public function putBack($data)
{
$this->buf_ = $data . $this->buf_;
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
/**
* Transport that only accepts writes and ignores them.
* This is useful for measuring the serialized size of structures.
*
* @package thrift.transport
*/
class TNullTransport extends TTransport
{
public function isOpen()
{
return true;
}
public function open()
{
}
public function close()
{
}
public function read($len)
{
throw new TTransportException("Can't read from TNullTransport.");
}
public function write($buf)
{
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TException;
use Thrift\Factory\TStringFuncFactory;
/**
* Php stream transport. Reads to and writes from the php standard streams
* php://input and php://output
*
* @package thrift.transport
*/
class TPhpStream extends TTransport
{
const MODE_R = 1;
const MODE_W = 2;
private $inStream_ = null;
private $outStream_ = null;
private $read_ = false;
private $write_ = false;
public function __construct($mode)
{
$this->read_ = $mode & self::MODE_R;
$this->write_ = $mode & self::MODE_W;
}
public function open()
{
if ($this->read_) {
$this->inStream_ = @fopen(self::inStreamName(), 'r');
if (!is_resource($this->inStream_)) {
throw new TException('TPhpStream: Could not open php://input');
}
}
if ($this->write_) {
$this->outStream_ = @fopen('php://output', 'w');
if (!is_resource($this->outStream_)) {
throw new TException('TPhpStream: Could not open php://output');
}
}
}
public function close()
{
if ($this->read_) {
@fclose($this->inStream_);
$this->inStream_ = null;
}
if ($this->write_) {
@fclose($this->outStream_);
$this->outStream_ = null;
}
}
public function isOpen()
{
return
(!$this->read_ || is_resource($this->inStream_)) &&
(!$this->write_ || is_resource($this->outStream_));
}
public function read($len)
{
$data = @fread($this->inStream_, $len);
if ($data === false || $data === '') {
throw new TException('TPhpStream: Could not read ' . $len . ' bytes');
}
return $data;
}
public function write($buf)
{
while (TStringFuncFactory::create()->strlen($buf) > 0) {
$got = @fwrite($this->outStream_, $buf);
if ($got === 0 || $got === false) {
throw new TException(
'TPhpStream: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes'
);
}
$buf = TStringFuncFactory::create()->substr($buf, $got);
}
}
public function flush()
{
@fflush($this->outStream_);
}
private static function inStreamName()
{
if (php_sapi_name() == 'cli') {
return 'php://stdin';
}
return 'php://input';
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TException;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* Sockets implementation of the TTransport interface.
*
* @package thrift.transport
*/
class TSSLSocket extends TSocket
{
/**
* Remote port
*
* @var resource
*/
protected $context_ = null;
/**
* Socket constructor
*
* @param string $host Remote hostname
* @param int $port Remote port
* @param resource $context Stream context
* @param bool $persist Whether to use a persistent socket
* @param string $debugHandler Function to call for error logging
*/
public function __construct(
$host = 'localhost',
$port = 9090,
$context = null,
$debugHandler = null
) {
$this->host_ = $this->getSSLHost($host);
$this->port_ = $port;
$this->context_ = $context;
$this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log';
}
/**
* Creates a host name with SSL transport protocol
* if no transport protocol already specified in
* the host name.
*
* @param string $host Host to listen on
* @return string $host Host name with transport protocol
*/
private function getSSLHost($host)
{
$transport_protocol_loc = strpos($host, "://");
if ($transport_protocol_loc === false) {
$host = 'ssl://' . $host;
}
return $host;
}
/**
* Connects the socket.
*/
public function open()
{
if ($this->isOpen()) {
throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN);
}
if (empty($this->host_)) {
throw new TTransportException('Cannot open null host', TTransportException::NOT_OPEN);
}
if ($this->port_ <= 0) {
throw new TTransportException('Cannot open without port', TTransportException::NOT_OPEN);
}
$this->handle_ = @stream_socket_client(
$this->host_ . ':' . $this->port_,
$errno,
$errstr,
$this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000),
STREAM_CLIENT_CONNECT,
$this->context_
);
// Connect failed?
if ($this->handle_ === false) {
$error = 'TSocket: Could not connect to ' .
$this->host_ . ':' . $this->port_ . ' (' . $errstr . ' [' . $errno . '])';
if ($this->debug_) {
call_user_func($this->debugHandler_, $error);
}
throw new TException($error);
}
}
}
This diff is collapsed.
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TException;
/**
* This library makes use of APC cache to make hosts as down in a web
* environment. If you are running from the CLI or on a system without APC
* installed, then these null functions will step in and act like cache
* misses.
*/
if (!function_exists('apc_fetch')) {
function apc_fetch($key)
{
return false;
}
function apc_store($key, $var, $ttl = 0)
{
return false;
}
}
/**
* Sockets implementation of the TTransport interface that allows connection
* to a pool of servers.
*
* @package thrift.transport
*/
class TSocketPool extends TSocket
{
/**
* Remote servers. Array of associative arrays with 'host' and 'port' keys
*/
private $servers_ = array();
/**
* How many times to retry each host in connect
*
* @var int
*/
private $numRetries_ = 1;
/**
* Retry interval in seconds, how long to not try a host if it has been
* marked as down.
*
* @var int
*/
private $retryInterval_ = 60;
/**
* Max consecutive failures before marking a host down.
*
* @var int
*/
private $maxConsecutiveFailures_ = 1;
/**
* Try hosts in order? or Randomized?
*
* @var bool
*/
private $randomize_ = true;
/**
* Always try last host, even if marked down?
*
* @var bool
*/
private $alwaysTryLast_ = true;
/**
* Socket pool constructor
*
* @param array $hosts List of remote hostnames
* @param mixed $ports Array of remote ports, or a single common port
* @param bool $persist Whether to use a persistent socket
* @param mixed $debugHandler Function for error logging
*/
public function __construct(
$hosts = array('localhost'),
$ports = array(9090),
$persist = false,
$debugHandler = null
) {
parent::__construct(null, 0, $persist, $debugHandler);
if (!is_array($ports)) {
$port = $ports;
$ports = array();
foreach ($hosts as $key => $val) {
$ports[$key] = $port;
}
}
foreach ($hosts as $key => $host) {
$this->servers_ [] = array('host' => $host,
'port' => $ports[$key]);
}
}
/**
* Add a server to the pool
*
* This function does not prevent you from adding a duplicate server entry.
*
* @param string $host hostname or IP
* @param int $port port
*/
public function addServer($host, $port)
{
$this->servers_[] = array('host' => $host, 'port' => $port);
}
/**
* Sets how many time to keep retrying a host in the connect function.
*
* @param int $numRetries
*/
public function setNumRetries($numRetries)
{
$this->numRetries_ = $numRetries;
}
/**
* Sets how long to wait until retrying a host if it was marked down
*
* @param int $numRetries
*/
public function setRetryInterval($retryInterval)
{
$this->retryInterval_ = $retryInterval;
}
/**
* Sets how many time to keep retrying a host before marking it as down.
*
* @param int $numRetries
*/
public function setMaxConsecutiveFailures($maxConsecutiveFailures)
{
$this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
}
/**
* Turns randomization in connect order on or off.
*
* @param bool $randomize
*/
public function setRandomize($randomize)
{
$this->randomize_ = $randomize;
}
/**
* Whether to always try the last server.
*
* @param bool $alwaysTryLast
*/
public function setAlwaysTryLast($alwaysTryLast)
{
$this->alwaysTryLast_ = $alwaysTryLast;
}
/**
* Connects the socket by iterating through all the servers in the pool
* and trying to find one that works.
*/
public function open()
{
// Check if we want order randomization
if ($this->randomize_) {
shuffle($this->servers_);
}
// Count servers to identify the "last" one
$numServers = count($this->servers_);
for ($i = 0; $i < $numServers; ++$i) {
// This extracts the $host and $port variables
extract($this->servers_[$i]);
// Check APC cache for a record of this server being down
$failtimeKey = 'thrift_failtime:' . $host . ':' . $port . '~';
// Cache miss? Assume it's OK
$lastFailtime = apc_fetch($failtimeKey);
if ($lastFailtime === false) {
$lastFailtime = 0;
}
$retryIntervalPassed = false;
// Cache hit...make sure enough the retry interval has elapsed
if ($lastFailtime > 0) {
$elapsed = time() - $lastFailtime;
if ($elapsed > $this->retryInterval_) {
$retryIntervalPassed = true;
if ($this->debug_) {
call_user_func(
$this->debugHandler_,
'TSocketPool: retryInterval ' .
'(' . $this->retryInterval_ . ') ' .
'has passed for host ' . $host . ':' . $port
);
}
}
}
// Only connect if not in the middle of a fail interval, OR if this
// is the LAST server we are trying, just hammer away on it
$isLastServer = false;
if ($this->alwaysTryLast_) {
$isLastServer = ($i == ($numServers - 1));
}
if (($lastFailtime === 0) ||
($isLastServer) ||
($lastFailtime > 0 && $retryIntervalPassed)) {
// Set underlying TSocket params to this one
$this->host_ = $host;
$this->port_ = $port;
// Try up to numRetries_ connections per server
for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
try {
// Use the underlying TSocket open function
parent::open();
// Only clear the failure counts if required to do so
if ($lastFailtime > 0) {
apc_store($failtimeKey, 0);
}
// Successful connection, return now
return;
} catch (TException $tx) {
// Connection failed
}
}
// Mark failure of this host in the cache
$consecfailsKey = 'thrift_consecfails:' . $host . ':' . $port . '~';
// Ignore cache misses
$consecfails = apc_fetch($consecfailsKey);
if ($consecfails === false) {
$consecfails = 0;
}
// Increment by one
$consecfails++;
// Log and cache this failure
if ($consecfails >= $this->maxConsecutiveFailures_) {
if ($this->debug_) {
call_user_func(
$this->debugHandler_,
'TSocketPool: marking ' . $host . ':' . $port .
' as down for ' . $this->retryInterval_ . ' secs ' .
'after ' . $consecfails . ' failed attempts.'
);
}
// Store the failure time
apc_store($failtimeKey, time());
// Clear the count of consecutive failures
apc_store($consecfailsKey, 0);
} else {
apc_store($consecfailsKey, $consecfails);
}
}
}
// Oh no; we failed them all. The system is totally ill!
$error = 'TSocketPool: All hosts in pool are down. ';
$hosts = array();
foreach ($this->servers_ as $server) {
$hosts [] = $server['host'] . ':' . $server['port'];
}
$hostlist = implode(',', $hosts);
$error .= '(' . $hostlist . ')';
if ($this->debug_) {
call_user_func($this->debugHandler_, $error);
}
throw new TException($error);
}
}
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* Base interface for a transport agent.
*
* @package thrift.transport
*/
abstract class TTransport
{
/**
* Whether this transport is open.
*
* @return boolean true if open
*/
abstract public function isOpen();
/**
* Open the transport for reading/writing
*
* @throws TTransportException if cannot open
*/
abstract public function open();
/**
* Close the transport.
*/
abstract public function close();
/**
* Read some data into the array.
*
* @param int $len How much to read
* @return string The data that has been read
* @throws TTransportException if cannot read any more data
*/
abstract public function read($len);
/**
* Guarantees that the full amount of data is read.
*
* @return string The data, of exact length
* @throws TTransportException if cannot read data
*/
public function readAll($len)
{
// return $this->read($len);
$data = '';
$got = 0;
while (($got = TStringFuncFactory::create()->strlen($data)) < $len) {
$data .= $this->read($len - $got);
}
return $data;
}
/**
* Writes the given data out.
*
* @param string $buf The data to write
* @throws TTransportException if writing fails
*/
abstract public function write($buf);
/**
* Flushes any pending data out of a buffer
*
* @throws TTransportException if a writing error occurs
*/
public function flush()
{
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment