Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
S
system
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
JIRA
JIRA
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
_site-res
system
Commits
d28f74f5
Commit
d28f74f5
authored
May 04, 2018
by
tufengqi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
test
parent
73c53c21
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
239 additions
and
25 deletions
+239
-25
TBufferedTransport.php
classes/fpf/thrift/TBufferedTransport.php
+234
-0
ThriftAsyncServiceFactoryProxy.php
classes/fpf/thrift/ThriftAsyncServiceFactoryProxy.php
+5
-25
No files found.
classes/fpf/thrift/TBufferedTransport.php
0 → 100644
View file @
d28f74f5
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace
fpf\thrift
;
use
Thrift\Exception\TTransportException
;
use
Thrift\Factory\TStringFuncFactory
;
/**
* Buffered transport. Stores data to an internal buffer that it doesn't
* actually write out until flush is called. For reading, we do a greedy
* read and then serve data out of the internal buffer.
*
* @package thrift.transport
*/
class
TBufferedTransport
extends
TTransport
{
/**
* The underlying transport
*
* @var TTransport
*/
protected
$transport_
;
/**
* The receive buffer size
*
* @var int
*/
protected
$rBufSize_
=
512
;
/**
* The write buffer size
*
* @var int
*/
protected
$wBufSize_
=
512
;
/**
* The write buffer.
*
* @var string
*/
protected
$wBuf_
=
''
;
/**
* The read buffer.
*
* @var string
*/
protected
$rBuf_
=
''
;
/**
* Constructor. Creates a buffered transport around an underlying transport
*/
public
function
__construct
(
$transport
,
$rBufSize
=
512
,
$wBufSize
=
512
)
{
$this
->
transport_
=
$transport
;
$this
->
rBufSize_
=
$rBufSize
;
$this
->
wBufSize_
=
$wBufSize
;
}
public
function
isOpen
()
{
return
$this
->
transport_
->
isOpen
();
}
/**
* @inheritdoc
*
* @throws TTransportException
*/
public
function
open
()
{
$this
->
transport_
->
open
();
}
public
function
close
()
{
$this
->
transport_
->
close
();
}
public
function
putBack
(
$data
)
{
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
rBuf_
)
===
0
)
{
$this
->
rBuf_
=
$data
;
}
else
{
$this
->
rBuf_
=
(
$data
.
$this
->
rBuf_
);
}
}
/**
* The reason that we customize readAll here is that the majority of PHP
* streams are already internally buffered by PHP. The socket stream, for
* example, buffers internally and blocks if you call read with $len greater
* than the amount of data available, unlike recv() in C.
*
* Therefore, use the readAll method of the wrapped transport inside
* the buffered readAll.
*
* @throws TTransportException
*/
public
function
readAll
(
$len
)
{
$have
=
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
rBuf_
);
if
(
$have
==
0
)
{
$data
=
$this
->
transport_
->
readAll
(
$len
);
}
elseif
(
$have
<
$len
)
{
$data
=
$this
->
rBuf_
;
$this
->
rBuf_
=
''
;
$data
.=
$this
->
transport_
->
readAll
(
$len
-
$have
);
}
elseif
(
$have
==
$len
)
{
$data
=
$this
->
rBuf_
;
$this
->
rBuf_
=
''
;
}
elseif
(
$have
>
$len
)
{
$data
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
rBuf_
,
0
,
$len
);
$this
->
rBuf_
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
rBuf_
,
$len
);
}
return
$data
;
}
/**
* @inheritdoc
*
* @param int $len
* @return string
* @throws TTransportException
*/
public
function
readWBuf
(
$len
)
{
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
wBuf_
)
===
0
)
{
$this
->
wBuf_
=
$this
->
transport_
->
read
(
$this
->
wBufSize_
);
}
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
rBuf_
)
<=
$len
)
{
$ret
=
$this
->
wBuf_
;
$this
->
wBuf_
=
''
;
return
$ret
;
}
$ret
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
wBuf_
,
0
,
$len
);
$this
->
wBuf_
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
wBuf_
,
$len
);
return
$ret
;
}
/**
* @inheritdoc
*
* @param int $len
* @return string
* @throws TTransportException
*/
public
function
read
(
$len
)
{
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
rBuf_
)
===
0
)
{
$this
->
rBuf_
=
$this
->
transport_
->
read
(
$this
->
rBufSize_
);
}
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
rBuf_
)
<=
$len
)
{
$ret
=
$this
->
rBuf_
;
$this
->
rBuf_
=
''
;
return
$ret
;
}
$ret
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
rBuf_
,
0
,
$len
);
$this
->
rBuf_
=
TStringFuncFactory
::
create
()
->
substr
(
$this
->
rBuf_
,
$len
);
return
$ret
;
}
/**
* @inheritdoc
*
* @param string $buf
* @throws TTransportException
*/
public
function
write
(
$buf
)
{
$this
->
wBuf_
.=
$buf
;
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
wBuf_
)
>=
$this
->
wBufSize_
)
{
$out
=
$this
->
wBuf_
;
// Note that we clear the internal wBuf_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
$this
->
wBuf_
=
''
;
$this
->
transport_
->
write
(
$out
);
}
}
/**
* @inheritdoc
*
* @throws TTransportException
*/
public
function
flush
()
{
if
(
TStringFuncFactory
::
create
()
->
strlen
(
$this
->
wBuf_
)
>
0
)
{
$out
=
$this
->
wBuf_
;
// Note that we clear the internal wBuf_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
$this
->
wBuf_
=
''
;
$this
->
transport_
->
write
(
$out
);
}
$this
->
transport_
->
flush
();
}
}
classes/fpf/thrift/ThriftAsyncServiceFactoryProxy.php
View file @
d28f74f5
...
...
@@ -9,6 +9,7 @@ use Thrift\Protocol\TBinaryProtocolAccelerated;
use
Thrift\Transport\TMemoryBuffer
;
use
Thrift\Type\TMessageType
;
use
Thrift\Exception\TApplicationException
;
use
Thrift\Factory\TStringFuncFactory
;
class
ThriftAsyncServiceFactoryProxy
{
...
...
@@ -61,7 +62,7 @@ class ThriftAsyncServiceFactoryProxy
];
$this
->
socket
=
new
TGuzzleClient
(
$service_real
[
'host'
],
$service_real
[
'port'
],
'/'
.
$service_name_space_str
.
'/'
.
$service_str
.
'/'
);
$this
->
socket
->
addHeaders
([
'Content-type'
=>
$content_type_mapping
[
$content_type
]]);
$this
->
transport
=
new
\
Thrift\Transpor
t\TBufferedTransport
(
$this
->
socket
,
1024
,
1024
);
$this
->
transport
=
new
\
fpf\thrif
t\TBufferedTransport
(
$this
->
socket
,
1024
,
1024
);
if
(
self
::
BINARY
===
$content_type
)
{
$strict_read
=
true
;
$strict_write
=
true
;
...
...
@@ -117,30 +118,9 @@ class ThriftAsyncServiceFactoryProxy
$args_obj
->
write
(
$protocol
);
$protocol
->
writeMessageEnd
();
}
$promise
=
$this
->
transport
->
async
();
$promise
=
$promise
->
then
(
function
(
$response
)
use
(
$name
)
{
$body
=
$response
->
getBody
();
$rseqid
=
0
;
$fname
=
null
;
$mtype
=
0
;
$input
=
$this
->
protocal_factory_class
->
getProtocol
(
new
TMemoryBuffer
(
$body
));
$input
->
readMessageBegin
(
$fname
,
$mtype
,
$rseqid
);
if
(
$mtype
==
TMessageType
::
EXCEPTION
)
{
$x
=
new
TApplicationException
();
$x
->
read
(
$input
);
$input
->
readMessageEnd
();
throw
$x
;
}
$result_class
=
$this
->
tem_class
.
'_'
.
$name
.
'_result'
;
$result
=
new
$result_class
;
$result
->
read
(
$input
);
$input
->
readMessageEnd
();
if
(
$result
->
success
!==
null
)
{
return
$result
->
success
;
}
else
{
return
false
;
}
});
$read_buffer
=
$this
->
transport
->
readWBuf
(
1024
);
var_dump
(
TStringFuncFactory
::
create
()
->
strlen
(
$read_buffer
));
exit
;
// $this->transport->close();
return
$promise
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment